Git Integration¶
Checking Status¶
import fabias
ws = fabias.workspace("GENESIS")
git = ws.git
status = git.status()
print(f"Synced: {status.is_synced}")
print(f"Pending changes: {len(status.changes)}")
Pulling from Git¶
ws = fabias.workspace("GENESIS")
git = ws.git
# Simple pull
operation = git.pull()
operation.wait()
# Pull with conflict resolution
operation = git.pull(
conflict_resolution="PreferRemote", # or "PreferWorkspace"
allow_override=True
)
operation.wait()
Pushing to Git¶
ws = fabias.workspace("GENESIS")
git = ws.git
# Commit and push all changes
operation = git.commitAndPush(comment="Updated ETL pipeline")
operation.wait()
# Commit specific items from status
status = git.status()
workspace_items = status.workspace_changes # Items changed in workspace
operation = git.commitAndPush(
comment="Add new datasets",
items=workspace_items
)
operation.wait()
Git Status Details¶
status = git.status()
# Check sync state
print(status.is_synced)
print(status.head) # Workspace head commit hash
print(status.remoteCommitHash) # Remote commit hash
print(status.syncStatus) # Overall sync status
# List changes
for item in status.changes:
print(f"{item.name}: workspace={item.change.workspace}, remote={item.change.remote}")
# Get workspace-only or remote-only changes
workspace_changes = status.workspace_changes
remote_changes = status.remote_changes
Connecting to Git¶
Connect a workspace to a Git repository:
import fabias
ws = fabias.workspace("GENESIS")
git = ws.git
# Connect to GitHub (public repo)
git.connect(
repository_url="https://github.com/myorg/myrepo",
branch="main",
directory="/fabric-workspace" # Optional subdirectory
)
# Connect to Azure DevOps (private repo with connection)
conn = fabias.connection("GIT-FLOW-SA")
git.connect(
repository_url="https://dev.azure.com/myorg/myproject/_git/myrepo",
branch="main",
connection=conn
)
Initialization¶
After connecting, initialize the Git sync:
from fabias import InitializationStrategy
# Initialize with preference for remote content
operation = git.initialize(InitializationStrategy.PREFER_REMOTE)
# Handle immediate vs long-running response
if not operation.longRunning:
print(f"Required action: {operation.data.get('requiredAction')}")
print(f"Remote commit: {operation.data.get('remoteCommitHash')}")
else:
operation.wait()
print("Initialization complete!")