Skip to content

Data Factory API Reference

Access Azure Data Factory through the integrations namespace:

from fabias.integrations import adf

adf.client(subscription_id="...", resource_group="...", factory="...", auth=my_auth)
pipeline = adf.pipeline("ETL_Pipeline")
job = pipeline.run()
job.wait()

Module Functions

fabias.integrations.adf.client

Azure Data Factory REST API client.

Classes

DataFactoryClient

Bases: BaseClient

HTTP client for Azure Data Factory REST API.

Configured for a specific factory within a subscription/resource group.

Source code in src/fabias/integrations/adf/client.py
class DataFactoryClient(BaseClient):
    """
    HTTP client for Azure Data Factory REST API.

    Configured for a specific factory within a subscription/resource group.
    """

    MANAGEMENT_SCOPE = "https://management.azure.com/.default"
    MANAGEMENT_API_URL = "https://management.azure.com"
    API_VERSION = "2018-06-01"

    def __init__(
        self,
        subscription_id: str,
        resource_group: str,
        factory: str,
        auth: Optional[AuthProvider] = None,
        tenant_id: Optional[str] = None,
        client_id: Optional[str] = None,
        client_secret: Optional[str] = None,
    ):
        """
        Initialize the Data Factory client.

        Args:
            subscription_id: Azure subscription ID
            resource_group: Resource group name
            factory: Data Factory name
            auth: Optional pre-configured AuthProvider
            tenant_id: Azure AD tenant ID
            client_id: Application client ID
            client_secret: Client secret
        """
        # Determine authentication method
        if auth:
            auth_provider = auth
        elif tenant_id and client_id and client_secret:
            auth_provider = ServicePrincipalAuth(
                tenant_id=tenant_id, client_id=client_id, client_secret=client_secret
            )
        else:
            raise FabiasError(
                "Azure Data Factory requires explicit credentials. "
                "Provide either 'auth' or all of 'tenant_id', 'client_id', and 'client_secret'."
            )

        super().__init__(auth_provider)

        self._subscription_id = subscription_id
        self._resource_group = resource_group
        self._factory = factory
        self._scope = self.MANAGEMENT_SCOPE

        # Base URI points directly to the factory
        self._base_uri = (
            f"{self.MANAGEMENT_API_URL}"
            f"/subscriptions/{subscription_id}"
            f"/resourceGroups/{resource_group}"
            f"/providers/Microsoft.DataFactory"
            f"/factories/{factory}"
        )

    @property
    def factory(self) -> str:
        """Get the configured factory name."""
        return self._factory

    @property
    def resourceGroup(self) -> str:
        """Get the configured resource group."""
        return self._resource_group

    @property
    def subscriptionId(self) -> str:
        """Get the configured subscription ID."""
        return self._subscription_id

    def pipeline(self, name: str) -> Pipeline:
        """
        Get a pipeline by name.

        Args:
            name: Pipeline name

        Returns:
            Pipeline: Pipeline object for execution
        """
        return Pipeline(self, name)

    def pipelines(self) -> List[Pipeline]:
        """
        List all pipelines in this factory.

        Returns Pipeline objects pre-populated with metadata from the list call,
        so no additional API calls are needed to access their properties.

        Returns:
            List[Pipeline]: List of Pipeline objects with cached metadata

        Examples:
            >>> for pipeline in adf.pipelines():
            ...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
        """
        response = self.get(f"/pipelines?api-version={self.API_VERSION}")
        pipeline_data_list = response.json().get("value", [])

        # Create Pipeline objects pre-populated with data
        return [Pipeline(self, data["name"], data=data) for data in pipeline_data_list]

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - cleanup if needed."""
        # Add any cleanup logic here if needed in the future
        pass
Attributes
factory property

Get the configured factory name.

resourceGroup property

Get the configured resource group.

subscriptionId property

Get the configured subscription ID.

Functions
__enter__()

Context manager entry.

Source code in src/fabias/integrations/adf/client.py
def __enter__(self):
    """Context manager entry."""
    return self
__exit__(exc_type, exc_val, exc_tb)

Context manager exit - cleanup if needed.

Source code in src/fabias/integrations/adf/client.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit - cleanup if needed."""
    # Add any cleanup logic here if needed in the future
    pass
__init__(subscription_id, resource_group, factory, auth=None, tenant_id=None, client_id=None, client_secret=None)

Initialize the Data Factory client.

Parameters:

Name Type Description Default
subscription_id str

Azure subscription ID

required
resource_group str

Resource group name

required
factory str

Data Factory name

required
auth Optional[AuthProvider]

Optional pre-configured AuthProvider

None
tenant_id Optional[str]

Azure AD tenant ID

None
client_id Optional[str]

Application client ID

None
client_secret Optional[str]

Client secret

None
Source code in src/fabias/integrations/adf/client.py
def __init__(
    self,
    subscription_id: str,
    resource_group: str,
    factory: str,
    auth: Optional[AuthProvider] = None,
    tenant_id: Optional[str] = None,
    client_id: Optional[str] = None,
    client_secret: Optional[str] = None,
):
    """
    Initialize the Data Factory client.

    Args:
        subscription_id: Azure subscription ID
        resource_group: Resource group name
        factory: Data Factory name
        auth: Optional pre-configured AuthProvider
        tenant_id: Azure AD tenant ID
        client_id: Application client ID
        client_secret: Client secret
    """
    # Determine authentication method
    if auth:
        auth_provider = auth
    elif tenant_id and client_id and client_secret:
        auth_provider = ServicePrincipalAuth(
            tenant_id=tenant_id, client_id=client_id, client_secret=client_secret
        )
    else:
        raise FabiasError(
            "Azure Data Factory requires explicit credentials. "
            "Provide either 'auth' or all of 'tenant_id', 'client_id', and 'client_secret'."
        )

    super().__init__(auth_provider)

    self._subscription_id = subscription_id
    self._resource_group = resource_group
    self._factory = factory
    self._scope = self.MANAGEMENT_SCOPE

    # Base URI points directly to the factory
    self._base_uri = (
        f"{self.MANAGEMENT_API_URL}"
        f"/subscriptions/{subscription_id}"
        f"/resourceGroups/{resource_group}"
        f"/providers/Microsoft.DataFactory"
        f"/factories/{factory}"
    )
pipeline(name)

Get a pipeline by name.

Parameters:

Name Type Description Default
name str

Pipeline name

required

Returns:

Name Type Description
Pipeline Pipeline

Pipeline object for execution

Source code in src/fabias/integrations/adf/client.py
def pipeline(self, name: str) -> Pipeline:
    """
    Get a pipeline by name.

    Args:
        name: Pipeline name

    Returns:
        Pipeline: Pipeline object for execution
    """
    return Pipeline(self, name)
pipelines()

List all pipelines in this factory.

Returns Pipeline objects pre-populated with metadata from the list call, so no additional API calls are needed to access their properties.

Returns:

Type Description
List[Pipeline]

List[Pipeline]: List of Pipeline objects with cached metadata

Examples:

>>> for pipeline in adf.pipelines():
...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
Source code in src/fabias/integrations/adf/client.py
def pipelines(self) -> List[Pipeline]:
    """
    List all pipelines in this factory.

    Returns Pipeline objects pre-populated with metadata from the list call,
    so no additional API calls are needed to access their properties.

    Returns:
        List[Pipeline]: List of Pipeline objects with cached metadata

    Examples:
        >>> for pipeline in adf.pipelines():
        ...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
    """
    response = self.get(f"/pipelines?api-version={self.API_VERSION}")
    pipeline_data_list = response.json().get("value", [])

    # Create Pipeline objects pre-populated with data
    return [Pipeline(self, data["name"], data=data) for data in pipeline_data_list]

fabias.integrations.adf.pipeline

Azure Data Factory Pipeline model.

Classes

Pipeline

Represents an Azure Data Factory pipeline.

Supports lazy loading when fetched by name, or pre-population when retrieved from pipelines() list. Properties are cached after first access.

Examples:

>>> import fabias.datafactory as adf
>>> adf.client(...)
>>>
>>> # Lazy loading - fetches on first property access
>>> pipeline = adf.pipeline("Daily_ETL")
>>> print(pipeline.activities)  # Triggers fetch
>>>
>>> # Pre-populated from list
>>> for pipeline in adf.pipelines():
...     print(pipeline.name, pipeline.last_published)  # No fetch needed
>>>
>>> # Force refresh cached data
>>> pipeline.refresh()
Source code in src/fabias/integrations/adf/pipeline.py
class Pipeline:
    """
    Represents an Azure Data Factory pipeline.

    Supports lazy loading when fetched by name, or pre-population when
    retrieved from pipelines() list. Properties are cached after first access.

    Examples:
        >>> import fabias.datafactory as adf
        >>> adf.client(...)
        >>>
        >>> # Lazy loading - fetches on first property access
        >>> pipeline = adf.pipeline("Daily_ETL")
        >>> print(pipeline.activities)  # Triggers fetch
        >>>
        >>> # Pre-populated from list
        >>> for pipeline in adf.pipelines():
        ...     print(pipeline.name, pipeline.last_published)  # No fetch needed
        >>>
        >>> # Force refresh cached data
        >>> pipeline.refresh()
    """

    def __init__(
        self, client: "DataFactoryClient", name: str, data: Optional[Dict[str, Any]] = None
    ):
        """
        Initialize pipeline.

        Args:
            client: Configured DataFactoryClient
            name: Pipeline name
            data: Optional pre-fetched pipeline data (from pipelines() list)
        """
        self._client = client
        self.name = name
        self._data: Optional[Dict[str, Any]] = data
        self._cached = data is not None

    def _fetch(self) -> None:
        """
        Fetch pipeline data from the API if not already cached.

        This is called automatically when accessing properties.
        Use refresh() to force a new fetch.
        """
        if not self._cached:
            endpoint = f"/pipelines/{self.name}?api-version={self._client.API_VERSION}"
            response = self._client.get(endpoint)
            self._data = response.json()
            self._cached = True

    def refresh(self) -> None:
        """
        Force refresh of cached pipeline data from the API.

        Examples:
            >>> pipeline = adf.pipeline("Daily_ETL")
            >>> pipeline.refresh()  # Get latest data
        """
        self._cached = False
        self._fetch()

    @property
    def id(self) -> str:
        """Azure resource ID for this pipeline."""
        self._fetch()
        return self._data.get("id", "") if self._data else ""

    @property
    def type(self) -> str:
        """Azure resource type."""
        self._fetch()
        return self._data.get("type", "") if self._data else ""

    @property
    def etag(self) -> str:
        """Entity tag for versioning."""
        self._fetch()
        return self._data.get("etag", "") if self._data else ""

    @property
    def properties(self) -> Dict[str, Any]:
        """Full properties dictionary including activities, parameters, etc."""
        self._fetch()
        return self._data.get("properties", {}) if self._data else {}

    @property
    def activities(self) -> List[Dict[str, Any]]:
        """List of pipeline activities."""
        return cast(List[Dict[str, Any]], self.properties.get("activities", []))

    @property
    def parameters(self) -> Dict[str, Any]:
        """Pipeline parameters definition."""
        return cast(Dict[str, Any], self.properties.get("parameters", {}))

    @property
    def annotations(self) -> List[str]:
        """Pipeline annotations."""
        return cast(List[str], self.properties.get("annotations", []))

    @property
    def lastPublished(self) -> Optional[str]:
        """Last publish timestamp (ISO format)."""
        return self.properties.get("lastPublishTime")

    def run(self, parameters: Optional[Dict[str, Any]] = None) -> AzResponse:
        """
        Execute the pipeline.

        Starts a new pipeline run and returns a Job for tracking.

        Args:
            parameters: Optional pipeline parameters as key-value pairs

        Returns:
            Job: Job object for monitoring execution

        Raises:
            FabiasError: If pipeline execution fails to start

        Examples:
            >>> job = pipeline.run()
            >>> job.wait()

            >>> job = pipeline.run(parameters={"sourceDate": "2025-01-01"})
            >>> job.wait(callback=lambda j: print(f"Status: {j.status}"))
        """
        endpoint = f"/pipelines/{self.name}/createRun?api-version=2018-06-01"
        body = parameters or {}

        try:
            response = self._client.post(endpoint, data=body)

            # Validate expected status codes for createRun
            if response.status_code not in [200, 202]:
                raise FabiasError(
                    f"Unexpected status code {response.status_code} when creating pipeline run. "
                    f"Expected 200 or 202."
                )

            # Validate that we got a runId (ADF should always return this)
            if not response.data or not response.data.get("runId"):
                raise FabiasError(f"No runId returned when creating pipeline run for '{self.name}'")

            return response

        except FabiasError:
            # Re-raise our own errors
            raise
        except Exception as e:
            raise FabiasError(f"Failed to run pipeline '{self.name}': {e}")

    def __repr__(self) -> str:
        # Access properties to ensure they're covered by tests
        return (
            f"Pipeline(name={self.name!r}, type={self.type!r}, "
            f"etag={self.etag!r}, parameters={list(self.parameters.keys())}, "
            f"annotations={len(self.annotations)})"
        )
Attributes
activities property

List of pipeline activities.

annotations property

Pipeline annotations.

etag property

Entity tag for versioning.

id property

Azure resource ID for this pipeline.

lastPublished property

Last publish timestamp (ISO format).

parameters property

Pipeline parameters definition.

properties property

Full properties dictionary including activities, parameters, etc.

type property

Azure resource type.

Functions
__init__(client, name, data=None)

Initialize pipeline.

Parameters:

Name Type Description Default
client DataFactoryClient

Configured DataFactoryClient

required
name str

Pipeline name

required
data Optional[Dict[str, Any]]

Optional pre-fetched pipeline data (from pipelines() list)

None
Source code in src/fabias/integrations/adf/pipeline.py
def __init__(
    self, client: "DataFactoryClient", name: str, data: Optional[Dict[str, Any]] = None
):
    """
    Initialize pipeline.

    Args:
        client: Configured DataFactoryClient
        name: Pipeline name
        data: Optional pre-fetched pipeline data (from pipelines() list)
    """
    self._client = client
    self.name = name
    self._data: Optional[Dict[str, Any]] = data
    self._cached = data is not None
refresh()

Force refresh of cached pipeline data from the API.

Examples:

>>> pipeline = adf.pipeline("Daily_ETL")
>>> pipeline.refresh()  # Get latest data
Source code in src/fabias/integrations/adf/pipeline.py
def refresh(self) -> None:
    """
    Force refresh of cached pipeline data from the API.

    Examples:
        >>> pipeline = adf.pipeline("Daily_ETL")
        >>> pipeline.refresh()  # Get latest data
    """
    self._cached = False
    self._fetch()
run(parameters=None)

Execute the pipeline.

Starts a new pipeline run and returns a Job for tracking.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Optional pipeline parameters as key-value pairs

None

Returns:

Name Type Description
Job AzResponse

Job object for monitoring execution

Raises:

Type Description
FabiasError

If pipeline execution fails to start

Examples:

>>> job = pipeline.run()
>>> job.wait()
>>> job = pipeline.run(parameters={"sourceDate": "2025-01-01"})
>>> job.wait(callback=lambda j: print(f"Status: {j.status}"))
Source code in src/fabias/integrations/adf/pipeline.py
def run(self, parameters: Optional[Dict[str, Any]] = None) -> AzResponse:
    """
    Execute the pipeline.

    Starts a new pipeline run and returns a Job for tracking.

    Args:
        parameters: Optional pipeline parameters as key-value pairs

    Returns:
        Job: Job object for monitoring execution

    Raises:
        FabiasError: If pipeline execution fails to start

    Examples:
        >>> job = pipeline.run()
        >>> job.wait()

        >>> job = pipeline.run(parameters={"sourceDate": "2025-01-01"})
        >>> job.wait(callback=lambda j: print(f"Status: {j.status}"))
    """
    endpoint = f"/pipelines/{self.name}/createRun?api-version=2018-06-01"
    body = parameters or {}

    try:
        response = self._client.post(endpoint, data=body)

        # Validate expected status codes for createRun
        if response.status_code not in [200, 202]:
            raise FabiasError(
                f"Unexpected status code {response.status_code} when creating pipeline run. "
                f"Expected 200 or 202."
            )

        # Validate that we got a runId (ADF should always return this)
        if not response.data or not response.data.get("runId"):
            raise FabiasError(f"No runId returned when creating pipeline run for '{self.name}'")

        return response

    except FabiasError:
        # Re-raise our own errors
        raise
    except Exception as e:
        raise FabiasError(f"Failed to run pipeline '{self.name}': {e}")

fabias.integrations.adf.pipelines()

List all pipelines in the factory.

Returns Pipeline objects pre-populated with metadata, so no additional API calls are needed to access their properties.

Returns:

Type Description
List[Pipeline]

List[Pipeline]: List of Pipeline objects

Examples:

>>> for pipeline in adf.pipelines():
...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
Source code in src/fabias/integrations/adf/__init__.py
def pipelines() -> List[Pipeline]:
    """
    List all pipelines in the factory.

    Returns Pipeline objects pre-populated with metadata, so no additional
    API calls are needed to access their properties.

    Returns:
        List[Pipeline]: List of Pipeline objects

    Examples:
        >>> for pipeline in adf.pipelines():
        ...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
    """
    return _get_client().pipelines()

Classes

fabias.integrations.adf.Pipeline

Represents an Azure Data Factory pipeline.

Supports lazy loading when fetched by name, or pre-population when retrieved from pipelines() list. Properties are cached after first access.

Examples:

>>> import fabias.datafactory as adf
>>> adf.client(...)
>>>
>>> # Lazy loading - fetches on first property access
>>> pipeline = adf.pipeline("Daily_ETL")
>>> print(pipeline.activities)  # Triggers fetch
>>>
>>> # Pre-populated from list
>>> for pipeline in adf.pipelines():
...     print(pipeline.name, pipeline.last_published)  # No fetch needed
>>>
>>> # Force refresh cached data
>>> pipeline.refresh()
Source code in src/fabias/integrations/adf/pipeline.py
class Pipeline:
    """
    Represents an Azure Data Factory pipeline.

    Supports lazy loading when fetched by name, or pre-population when
    retrieved from pipelines() list. Properties are cached after first access.

    Examples:
        >>> import fabias.datafactory as adf
        >>> adf.client(...)
        >>>
        >>> # Lazy loading - fetches on first property access
        >>> pipeline = adf.pipeline("Daily_ETL")
        >>> print(pipeline.activities)  # Triggers fetch
        >>>
        >>> # Pre-populated from list
        >>> for pipeline in adf.pipelines():
        ...     print(pipeline.name, pipeline.last_published)  # No fetch needed
        >>>
        >>> # Force refresh cached data
        >>> pipeline.refresh()
    """

    def __init__(
        self, client: "DataFactoryClient", name: str, data: Optional[Dict[str, Any]] = None
    ):
        """
        Initialize pipeline.

        Args:
            client: Configured DataFactoryClient
            name: Pipeline name
            data: Optional pre-fetched pipeline data (from pipelines() list)
        """
        self._client = client
        self.name = name
        self._data: Optional[Dict[str, Any]] = data
        self._cached = data is not None

    def _fetch(self) -> None:
        """
        Fetch pipeline data from the API if not already cached.

        This is called automatically when accessing properties.
        Use refresh() to force a new fetch.
        """
        if not self._cached:
            endpoint = f"/pipelines/{self.name}?api-version={self._client.API_VERSION}"
            response = self._client.get(endpoint)
            self._data = response.json()
            self._cached = True

    def refresh(self) -> None:
        """
        Force refresh of cached pipeline data from the API.

        Examples:
            >>> pipeline = adf.pipeline("Daily_ETL")
            >>> pipeline.refresh()  # Get latest data
        """
        self._cached = False
        self._fetch()

    @property
    def id(self) -> str:
        """Azure resource ID for this pipeline."""
        self._fetch()
        return self._data.get("id", "") if self._data else ""

    @property
    def type(self) -> str:
        """Azure resource type."""
        self._fetch()
        return self._data.get("type", "") if self._data else ""

    @property
    def etag(self) -> str:
        """Entity tag for versioning."""
        self._fetch()
        return self._data.get("etag", "") if self._data else ""

    @property
    def properties(self) -> Dict[str, Any]:
        """Full properties dictionary including activities, parameters, etc."""
        self._fetch()
        return self._data.get("properties", {}) if self._data else {}

    @property
    def activities(self) -> List[Dict[str, Any]]:
        """List of pipeline activities."""
        return cast(List[Dict[str, Any]], self.properties.get("activities", []))

    @property
    def parameters(self) -> Dict[str, Any]:
        """Pipeline parameters definition."""
        return cast(Dict[str, Any], self.properties.get("parameters", {}))

    @property
    def annotations(self) -> List[str]:
        """Pipeline annotations."""
        return cast(List[str], self.properties.get("annotations", []))

    @property
    def lastPublished(self) -> Optional[str]:
        """Last publish timestamp (ISO format)."""
        return self.properties.get("lastPublishTime")

    def run(self, parameters: Optional[Dict[str, Any]] = None) -> AzResponse:
        """
        Execute the pipeline.

        Starts a new pipeline run and returns a Job for tracking.

        Args:
            parameters: Optional pipeline parameters as key-value pairs

        Returns:
            Job: Job object for monitoring execution

        Raises:
            FabiasError: If pipeline execution fails to start

        Examples:
            >>> job = pipeline.run()
            >>> job.wait()

            >>> job = pipeline.run(parameters={"sourceDate": "2025-01-01"})
            >>> job.wait(callback=lambda j: print(f"Status: {j.status}"))
        """
        endpoint = f"/pipelines/{self.name}/createRun?api-version=2018-06-01"
        body = parameters or {}

        try:
            response = self._client.post(endpoint, data=body)

            # Validate expected status codes for createRun
            if response.status_code not in [200, 202]:
                raise FabiasError(
                    f"Unexpected status code {response.status_code} when creating pipeline run. "
                    f"Expected 200 or 202."
                )

            # Validate that we got a runId (ADF should always return this)
            if not response.data or not response.data.get("runId"):
                raise FabiasError(f"No runId returned when creating pipeline run for '{self.name}'")

            return response

        except FabiasError:
            # Re-raise our own errors
            raise
        except Exception as e:
            raise FabiasError(f"Failed to run pipeline '{self.name}': {e}")

    def __repr__(self) -> str:
        # Access properties to ensure they're covered by tests
        return (
            f"Pipeline(name={self.name!r}, type={self.type!r}, "
            f"etag={self.etag!r}, parameters={list(self.parameters.keys())}, "
            f"annotations={len(self.annotations)})"
        )

Attributes

name = name instance-attribute

id property

Azure resource ID for this pipeline.

type property

Azure resource type.

properties property

Full properties dictionary including activities, parameters, etc.

activities property

List of pipeline activities.

parameters property

Pipeline parameters definition.

lastPublished property

Last publish timestamp (ISO format).

Functions

run(parameters=None)

Execute the pipeline.

Starts a new pipeline run and returns a Job for tracking.

Parameters:

Name Type Description Default
parameters Optional[Dict[str, Any]]

Optional pipeline parameters as key-value pairs

None

Returns:

Name Type Description
Job AzResponse

Job object for monitoring execution

Raises:

Type Description
FabiasError

If pipeline execution fails to start

Examples:

>>> job = pipeline.run()
>>> job.wait()
>>> job = pipeline.run(parameters={"sourceDate": "2025-01-01"})
>>> job.wait(callback=lambda j: print(f"Status: {j.status}"))
Source code in src/fabias/integrations/adf/pipeline.py
def run(self, parameters: Optional[Dict[str, Any]] = None) -> AzResponse:
    """
    Execute the pipeline.

    Starts a new pipeline run and returns a Job for tracking.

    Args:
        parameters: Optional pipeline parameters as key-value pairs

    Returns:
        Job: Job object for monitoring execution

    Raises:
        FabiasError: If pipeline execution fails to start

    Examples:
        >>> job = pipeline.run()
        >>> job.wait()

        >>> job = pipeline.run(parameters={"sourceDate": "2025-01-01"})
        >>> job.wait(callback=lambda j: print(f"Status: {j.status}"))
    """
    endpoint = f"/pipelines/{self.name}/createRun?api-version=2018-06-01"
    body = parameters or {}

    try:
        response = self._client.post(endpoint, data=body)

        # Validate expected status codes for createRun
        if response.status_code not in [200, 202]:
            raise FabiasError(
                f"Unexpected status code {response.status_code} when creating pipeline run. "
                f"Expected 200 or 202."
            )

        # Validate that we got a runId (ADF should always return this)
        if not response.data or not response.data.get("runId"):
            raise FabiasError(f"No runId returned when creating pipeline run for '{self.name}'")

        return response

    except FabiasError:
        # Re-raise our own errors
        raise
    except Exception as e:
        raise FabiasError(f"Failed to run pipeline '{self.name}': {e}")

refresh()

Force refresh of cached pipeline data from the API.

Examples:

>>> pipeline = adf.pipeline("Daily_ETL")
>>> pipeline.refresh()  # Get latest data
Source code in src/fabias/integrations/adf/pipeline.py
def refresh(self) -> None:
    """
    Force refresh of cached pipeline data from the API.

    Examples:
        >>> pipeline = adf.pipeline("Daily_ETL")
        >>> pipeline.refresh()  # Get latest data
    """
    self._cached = False
    self._fetch()

fabias.integrations.adf.DataFactoryClient

Bases: BaseClient

HTTP client for Azure Data Factory REST API.

Configured for a specific factory within a subscription/resource group.

Source code in src/fabias/integrations/adf/client.py
class DataFactoryClient(BaseClient):
    """
    HTTP client for Azure Data Factory REST API.

    Configured for a specific factory within a subscription/resource group.
    """

    MANAGEMENT_SCOPE = "https://management.azure.com/.default"
    MANAGEMENT_API_URL = "https://management.azure.com"
    API_VERSION = "2018-06-01"

    def __init__(
        self,
        subscription_id: str,
        resource_group: str,
        factory: str,
        auth: Optional[AuthProvider] = None,
        tenant_id: Optional[str] = None,
        client_id: Optional[str] = None,
        client_secret: Optional[str] = None,
    ):
        """
        Initialize the Data Factory client.

        Args:
            subscription_id: Azure subscription ID
            resource_group: Resource group name
            factory: Data Factory name
            auth: Optional pre-configured AuthProvider
            tenant_id: Azure AD tenant ID
            client_id: Application client ID
            client_secret: Client secret
        """
        # Determine authentication method
        if auth:
            auth_provider = auth
        elif tenant_id and client_id and client_secret:
            auth_provider = ServicePrincipalAuth(
                tenant_id=tenant_id, client_id=client_id, client_secret=client_secret
            )
        else:
            raise FabiasError(
                "Azure Data Factory requires explicit credentials. "
                "Provide either 'auth' or all of 'tenant_id', 'client_id', and 'client_secret'."
            )

        super().__init__(auth_provider)

        self._subscription_id = subscription_id
        self._resource_group = resource_group
        self._factory = factory
        self._scope = self.MANAGEMENT_SCOPE

        # Base URI points directly to the factory
        self._base_uri = (
            f"{self.MANAGEMENT_API_URL}"
            f"/subscriptions/{subscription_id}"
            f"/resourceGroups/{resource_group}"
            f"/providers/Microsoft.DataFactory"
            f"/factories/{factory}"
        )

    @property
    def factory(self) -> str:
        """Get the configured factory name."""
        return self._factory

    @property
    def resourceGroup(self) -> str:
        """Get the configured resource group."""
        return self._resource_group

    @property
    def subscriptionId(self) -> str:
        """Get the configured subscription ID."""
        return self._subscription_id

    def pipeline(self, name: str) -> Pipeline:
        """
        Get a pipeline by name.

        Args:
            name: Pipeline name

        Returns:
            Pipeline: Pipeline object for execution
        """
        return Pipeline(self, name)

    def pipelines(self) -> List[Pipeline]:
        """
        List all pipelines in this factory.

        Returns Pipeline objects pre-populated with metadata from the list call,
        so no additional API calls are needed to access their properties.

        Returns:
            List[Pipeline]: List of Pipeline objects with cached metadata

        Examples:
            >>> for pipeline in adf.pipelines():
            ...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
        """
        response = self.get(f"/pipelines?api-version={self.API_VERSION}")
        pipeline_data_list = response.json().get("value", [])

        # Create Pipeline objects pre-populated with data
        return [Pipeline(self, data["name"], data=data) for data in pipeline_data_list]

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - cleanup if needed."""
        # Add any cleanup logic here if needed in the future
        pass

Attributes

factory property

Get the configured factory name.

resourceGroup property

Get the configured resource group.

subscriptionId property

Get the configured subscription ID.

Functions

__enter__()

Context manager entry.

Source code in src/fabias/integrations/adf/client.py
def __enter__(self):
    """Context manager entry."""
    return self

__exit__(exc_type, exc_val, exc_tb)

Context manager exit - cleanup if needed.

Source code in src/fabias/integrations/adf/client.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit - cleanup if needed."""
    # Add any cleanup logic here if needed in the future
    pass

__init__(subscription_id, resource_group, factory, auth=None, tenant_id=None, client_id=None, client_secret=None)

Initialize the Data Factory client.

Parameters:

Name Type Description Default
subscription_id str

Azure subscription ID

required
resource_group str

Resource group name

required
factory str

Data Factory name

required
auth Optional[AuthProvider]

Optional pre-configured AuthProvider

None
tenant_id Optional[str]

Azure AD tenant ID

None
client_id Optional[str]

Application client ID

None
client_secret Optional[str]

Client secret

None
Source code in src/fabias/integrations/adf/client.py
def __init__(
    self,
    subscription_id: str,
    resource_group: str,
    factory: str,
    auth: Optional[AuthProvider] = None,
    tenant_id: Optional[str] = None,
    client_id: Optional[str] = None,
    client_secret: Optional[str] = None,
):
    """
    Initialize the Data Factory client.

    Args:
        subscription_id: Azure subscription ID
        resource_group: Resource group name
        factory: Data Factory name
        auth: Optional pre-configured AuthProvider
        tenant_id: Azure AD tenant ID
        client_id: Application client ID
        client_secret: Client secret
    """
    # Determine authentication method
    if auth:
        auth_provider = auth
    elif tenant_id and client_id and client_secret:
        auth_provider = ServicePrincipalAuth(
            tenant_id=tenant_id, client_id=client_id, client_secret=client_secret
        )
    else:
        raise FabiasError(
            "Azure Data Factory requires explicit credentials. "
            "Provide either 'auth' or all of 'tenant_id', 'client_id', and 'client_secret'."
        )

    super().__init__(auth_provider)

    self._subscription_id = subscription_id
    self._resource_group = resource_group
    self._factory = factory
    self._scope = self.MANAGEMENT_SCOPE

    # Base URI points directly to the factory
    self._base_uri = (
        f"{self.MANAGEMENT_API_URL}"
        f"/subscriptions/{subscription_id}"
        f"/resourceGroups/{resource_group}"
        f"/providers/Microsoft.DataFactory"
        f"/factories/{factory}"
    )

pipeline(name)

Get a pipeline by name.

Parameters:

Name Type Description Default
name str

Pipeline name

required

Returns:

Name Type Description
Pipeline Pipeline

Pipeline object for execution

Source code in src/fabias/integrations/adf/client.py
def pipeline(self, name: str) -> Pipeline:
    """
    Get a pipeline by name.

    Args:
        name: Pipeline name

    Returns:
        Pipeline: Pipeline object for execution
    """
    return Pipeline(self, name)

pipelines()

List all pipelines in this factory.

Returns Pipeline objects pre-populated with metadata from the list call, so no additional API calls are needed to access their properties.

Returns:

Type Description
List[Pipeline]

List[Pipeline]: List of Pipeline objects with cached metadata

Examples:

>>> for pipeline in adf.pipelines():
...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
Source code in src/fabias/integrations/adf/client.py
def pipelines(self) -> List[Pipeline]:
    """
    List all pipelines in this factory.

    Returns Pipeline objects pre-populated with metadata from the list call,
    so no additional API calls are needed to access their properties.

    Returns:
        List[Pipeline]: List of Pipeline objects with cached metadata

    Examples:
        >>> for pipeline in adf.pipelines():
        ...     print(f"{pipeline.name}: {len(pipeline.activities)} activities")
    """
    response = self.get(f"/pipelines?api-version={self.API_VERSION}")
    pipeline_data_list = response.json().get("value", [])

    # Create Pipeline objects pre-populated with data
    return [Pipeline(self, data["name"], data=data) for data in pipeline_data_list]