Skip to main content

A code-first approach for MS Fabric data pipelines and ETL.

Project description

FabricFlow

PyPI version PyPI Downloads


FabricFlow is a code-first Python SDK for building, managing, and automating Microsoft Fabric data pipelines, workspaces, and core items. It provides a high-level, object-oriented interface for interacting with the Microsoft Fabric REST API, enabling you to create, execute, and monitor data pipelines programmatically.


Features

  • Pipeline Templates: Easily create data pipelines from reusable templates (e.g., SQL Server to Lakehouse).
  • Pipeline Execution: Trigger, monitor, and extract results from pipeline runs.
  • Copy & Lookup Activities: Build and execute copy and lookup activities with source/sink abstractions.
  • Modular Architecture: Activities, sources, and sinks are organized in separate modules for better organization.
  • Workspace & Item Management: CRUD operations for workspaces and core items.
  • Connection & Capacity Utilities: Resolve and manage connections and capacities.
  • Logging Utilities: Simple logging setup for consistent diagnostics.
  • Service Principal Authentication: Authenticate securely with Microsoft Fabric REST API using Azure Service Principal credentials.

Installation

pip install fabricflow

Sample Usage

Below is a sample workflow that demonstrates how to use FabricFlow to automate workspace creation, pipeline deployment, and data copy operations in Microsoft Fabric.

1. Import Required Libraries

from sempy.fabric import FabricRestClient
from fabricflow import create_workspace, create_data_pipeline
from fabricflow.pipeline.activities import Copy, Lookup
from fabricflow.pipeline.sources import SQLServerSource, GoogleBigQuerySource, PostgreSQLSource, FileSystemSource
from fabricflow.pipeline.sinks import LakehouseTableSink, ParquetFileSink, LakehouseFilesSink
from fabricflow.pipeline.sinks.types import FileCopyBehavior
from fabricflow.pipeline.templates import (
    DataPipelineTemplates,
    COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE,
    COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH,
    COPY_FILES_TO_LAKEHOUSE,
    LOOKUP_SQL_SERVER,
    LOOKUP_SQL_SERVER_FOR_EACH
)

2. Initialize Fabric Client

fabric_client = FabricRestClient()

Note: If you are using ServicePrincipalTokenProvider, please make sure your Service Principal has access to the workspace and connections you are using.

3. Define Workspace and Capacity

capacity_name = "FabricFlow"
workspace_name = "FabricFlow"

4. Create Workspace (Optional)

You can create a new workspace, or use an existing one by specifying its name.

create_workspace(fabric_client, workspace_name, capacity_name)

5. Deploy Data Pipeline Templates

You can also create individual data pipeline templates by selecting specific templates from the list.

New Feature: You can now use custom pipeline templates by passing a dict (json payload), enum, or file path to create_data_pipeline.

for template in DataPipelineTemplates:
    create_data_pipeline(
        fabric_client,
        template,
        workspace_name
    )

6. Define Source and Sink Details

SOURCE_CONNECTION_ID = "your-source-connection-id"
SOURCE_DATABASE_NAME = "AdventureWorks2022"

SINK_WORKSPACE_ID = "your-sink-workspace-id"
SINK_LAKEHOUSE_ID = "your-sink-lakehouse-id"

ITEMS_TO_LOAD = [
    {
        "source_schema_name": "Sales",
        "source_table_name": "SalesOrderHeader",
        "source_query": "SELECT * FROM [Sales].[SalesOrderHeader]",
        "sink_table_name": "SalesOrderHeader",
        "sink_schema_name": "dbo",
        "sink_table_action": "Overwrite",
        "load_type": "Incremental",
        "primary_key_columns": ["SalesOrderID"],
        "skip": True,
        "load_from_timestamp": None,
        "load_to_timestamp": None,
    },
    # Add more items as needed...
]

7. Copy Data

You can copy data using either a single item per pipeline run (Option 1) or multiple items per pipeline run (Option 2). Choose the option that best fits your requirements.

Note: The examples below uses the new Copy class. You can also use CopyManager for backward compatibility, but Copy is recommended for new code.

Option 1: Single Item Per Pipeline Run

copy = Copy(
    fabric_client,
    workspace_name,
    COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE
)

source = SQLServerSource(
    source_connection_id=SOURCE_CONNECTION_ID,
    source_database_name=SOURCE_DATABASE_NAME,
    source_query=ITEMS_TO_LOAD[0]["source_query"],
)

sink = LakehouseTableSink(
    sink_workspace=SINK_WORKSPACE_ID,
    sink_lakehouse=SINK_LAKEHOUSE_ID,
    sink_table_name=ITEMS_TO_LOAD[0]["sink_table_name"],
    sink_schema_name=ITEMS_TO_LOAD[0]["sink_schema_name"],
    sink_table_action=ITEMS_TO_LOAD[0]["sink_table_action"],
)

result = (
    copy
    .source(source)
    .sink(sink)
    .execute()
)

Option 2: Multiple Items Per Pipeline Run

copy = Copy(
    fabric_client,
    workspace_name,
    COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH
)

source = SQLServerSource(
    source_connection_id=SOURCE_CONNECTION_ID,
    source_database_name=SOURCE_DATABASE_NAME,
)

sink = LakehouseTableSink(
    sink_workspace=SINK_WORKSPACE_ID,
    sink_lakehouse=SINK_LAKEHOUSE_ID,
)

result = (
    copy
    .source(source)
    .sink(sink)
    .items(ITEMS_TO_LOAD)
    .execute()
)

8. Lookup Data (New Feature)

FabricFlow now supports lookup operations for data validation and enrichment:

# Single lookup operation
lookup = Lookup(
    fabric_client,
    workspace_name,
    LOOKUP_SQL_SERVER
)

source = SQLServerSource(
    source_connection_id=SOURCE_CONNECTION_ID,
    source_database_name=SOURCE_DATABASE_NAME,
    source_query="SELECT COUNT(*) as record_count FROM [Sales].[SalesOrderHeader]",
)

result = (
    lookup
    .source(source)
    .execute()
)

# Multiple lookup operations
lookup_items = [
    {
        "source_query": "SELECT COUNT(*) as order_count FROM [Sales].[SalesOrderHeader]",
        "first_row_only": True,
    },
    {
        "source_query": "SELECT MAX(OrderDate) as latest_order FROM [Sales].[SalesOrderHeader]",
        "first_row_only": True,
    }
]

lookup = Lookup(
    fabric_client,
    workspace_name,
    LOOKUP_SQL_SERVER_FOR_EACH
)

result = (
    lookup
    .source(source)
    .items(lookup_items)
    .execute()
)

File System to Lakehouse Copy (New Feature)

FabricFlow now supports copying files from file servers directly to Lakehouse Files area:

copy = Copy(
    fabric_client,
    workspace_name,
    COPY_FILES_TO_LAKEHOUSE
)

# Define file system source with pattern matching and filtering
source = FileSystemSource(
    source_connection_id="your-file-server-connection-id",
    source_folder_pattern="incoming/data/*",  # Wildcard folder pattern
    source_file_pattern="*.csv",              # File pattern
    source_modified_after="2025-01-01T00:00:00Z",  # Optional date filter
    recursive_search=True,                    # Recursive directory search
    delete_source_after_copy=False,          # Keep source files
    max_concurrent_connections=10             # Connection limit
)

# Define lakehouse files sink
sink = LakehouseFilesSink(
    sink_lakehouse="data-lakehouse",
    sink_workspace="analytics-workspace",
    sink_directory="processed/files",         # Target directory in lakehouse
    copy_behavior=FileCopyBehavior.PRESERVE_HIERARCHY,  # Maintain folder structure
    enable_staging=False,                     # Direct copy without staging
    parallel_copies=4,                       # Parallel operations
    max_concurrent_connections=10            # Connection limit
)

result = (
    copy
    .source(source)
    .sink(sink)
    .execute()
)

API Overview

Below are the main classes and functions available in FabricFlow:

Core Pipeline Components

  • DataPipelineExecutor – Execute data pipelines and monitor their status.
  • DataPipelineError – Exception class for pipeline errors.
  • PipelineStatus – Enum for pipeline run statuses.
  • DataPipelineTemplates – Enum for pipeline templates.
  • get_template – Retrieve a pipeline template definition.
  • get_base64_str – Utility for base64 encoding of template files.
  • create_data_pipeline – Create a new data pipeline from template.

Pipeline Activities

  • Copy – Build and execute copy activities (replaces CopyManager).
  • Lookup – Build and execute lookup activities for data validation.

Sources and Sinks

  • SQLServerSource – Define SQL Server as a data source.
  • GoogleBigQuerySource – Define Google BigQuery as a data source.
  • PostgreSQLSource – Define PostgreSQL as a data source.
  • FileSystemSource – Define file server as a data source for file-based operations.
  • BaseSource – Base class for all data sources.
  • LakehouseTableSink – Define a Lakehouse table as a data sink.
  • ParquetFileSink – Define a Parquet file as a data sink.
  • LakehouseFilesSink – Define Lakehouse Files area as a data sink for file operations.
  • BaseSink – Base class for all data sinks.
  • SinkType / SourceType – Enums for sink and source types.
  • FileCopyBehavior – Enum for file copy behavior options.

Workspace and Item Management

  • FabricCoreItemsManager – Manage core Fabric items via APIs.
  • FabricWorkspacesManager – Manage Fabric workspaces via APIs.
  • get_workspace_id – Get a workspace ID or return the current one.
  • create_workspace – Create a new workspace and assign to a capacity.
  • FabricItemType – Enum for Fabric item types.

Utilities

  • setup_logging – Configure logging for diagnostics.
  • resolve_connection_id – Resolve a connection by name or ID.
  • resolve_capacity_id – Resolve a capacity by name or ID.
  • ServicePrincipalTokenProvider – Handles Azure Service Principal authentication.

Activities, Sources, and Sinks

FabricFlow provides a modular architecture with separate packages for activities, sources, sinks, and templates:

  • Activities: Copy, Lookup - Build and execute pipeline activities
  • Sources: SQLServerSource, GoogleBigQuerySource, PostgreSQLSource, FileSystemSource, BaseSource, SourceType - Define data sources
  • Sinks: LakehouseTableSink, ParquetFileSink, LakehouseFilesSink, BaseSink, SinkType, FileCopyBehavior - Define data destinations
  • Templates: Pre-built pipeline definitions for common patterns

Backward Compatibility

  • CopyManager → Copy: The CopyManager class is now renamed to Copy for consistency. Existing code using CopyManager will continue to work (backward compatible alias), but new code should use Copy.

Development

Read the Contributing file.

License

MIT License


Author

Parth Lad

LinkedIn | Website

Acknowledgements

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fabricflow-0.1.5.tar.gz (50.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fabricflow-0.1.5-py3-none-any.whl (79.0 kB view details)

Uploaded Python 3

File details

Details for the file fabricflow-0.1.5.tar.gz.

File metadata

  • Download URL: fabricflow-0.1.5.tar.gz
  • Upload date:
  • Size: 50.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fabricflow-0.1.5.tar.gz
Algorithm Hash digest
SHA256 d6d2dd6d4e742d76cf8b6fbba3047dce807d0e665f31e48627b6d972d14ce2c2
MD5 8ac8a5b2bf11d8087bdc5f7046b3062b
BLAKE2b-256 673643d5a9f8a743553eeacb4b3aef4227435abe2653aba3dddd7cef13c61eaf

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricflow-0.1.5.tar.gz:

Publisher: publish.yml on ladparth/fabricflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fabricflow-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: fabricflow-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 79.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fabricflow-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 6ee26ddaf1cd1c9edbeba6107953f0396096073c2c854a8da5c8acec0defec28
MD5 b7b7b833e5a1f452b89f12d4b2b1de2b
BLAKE2b-256 9f1b493d11ebd7da227721eebbb0acad6c49b3b29fb6f0c3575e3743ac273ef8

See more details on using hashes here.

Provenance

The following attestation bundles were made for fabricflow-0.1.5-py3-none-any.whl:

Publisher: publish.yml on ladparth/fabricflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page