Skip to content

Git Integration

Checking Status

import fabias

ws = fabias.workspace("GENESIS")
git = ws.git

status = git.status()
print(f"Synced: {status.is_synced}")
print(f"Pending changes: {len(status.changes)}")

Pulling from Git

ws = fabias.workspace("GENESIS")
git = ws.git

# Simple pull
operation = git.pull()
operation.wait()

# Pull with conflict resolution
operation = git.pull(
    conflict_resolution="PreferRemote",  # or "PreferWorkspace"
    allow_override=True
)
operation.wait()

Pushing to Git

ws = fabias.workspace("GENESIS")
git = ws.git

# Commit and push all changes
operation = git.commitAndPush(comment="Updated ETL pipeline")
operation.wait()

# Commit specific items from status
status = git.status()
workspace_items = status.workspace_changes  # Items changed in workspace
operation = git.commitAndPush(
    comment="Add new datasets",
    items=workspace_items
)
operation.wait()

Git Status Details

status = git.status()

# Check sync state
print(status.is_synced)
print(status.head)              # Workspace head commit hash
print(status.remoteCommitHash)  # Remote commit hash
print(status.syncStatus)        # Overall sync status

# List changes
for item in status.changes:
    print(f"{item.name}: workspace={item.change.workspace}, remote={item.change.remote}")

# Get workspace-only or remote-only changes
workspace_changes = status.workspace_changes
remote_changes = status.remote_changes

Connecting to Git

Connect a workspace to a Git repository:

import fabias

ws = fabias.workspace("GENESIS")
git = ws.git

# Connect to GitHub (public repo)
git.connect(
    repository_url="https://github.com/myorg/myrepo",
    branch="main",
    directory="/fabric-workspace"  # Optional subdirectory
)

# Connect to Azure DevOps (private repo with connection)
conn = fabias.connection("GIT-FLOW-SA")
git.connect(
    repository_url="https://dev.azure.com/myorg/myproject/_git/myrepo",
    branch="main",
    connection=conn
)

Initialization

After connecting, initialize the Git sync:

from fabias import InitializationStrategy

# Initialize with preference for remote content
operation = git.initialize(InitializationStrategy.PREFER_REMOTE)

# Handle immediate vs long-running response
if not operation.longRunning:
    print(f"Required action: {operation.data.get('requiredAction')}")
    print(f"Remote commit: {operation.data.get('remoteCommitHash')}")
else:
    operation.wait()
    print("Initialization complete!")

Disconnecting

git.disconnect()