A code-first approach for MS Fabric data pipelines and ETL.
Project description
FabricFlow
FabricFlow is a code-first Python SDK for building, managing, and automating Microsoft Fabric data pipelines, workspaces, and core items. It provides a high-level, object-oriented interface for interacting with the Microsoft Fabric REST API, enabling you to create, execute, and monitor data pipelines programmatically.
Features
- Pipeline Templates: Easily create data pipelines from reusable templates (e.g., SQL Server to Lakehouse).
- Pipeline Execution: Trigger, monitor, and extract results from pipeline runs.
- Copy & Lookup Activities: Build and execute copy and lookup activities with source/sink abstractions.
- Modular Architecture: Activities, sources, and sinks are organized in separate modules for better organization.
- Workspace & Item Management: CRUD operations for workspaces and core items.
- Connection & Capacity Utilities: Resolve and manage connections and capacities.
- Logging Utilities: Simple logging setup for consistent diagnostics.
- Service Principal Authentication: Authenticate securely with Microsoft Fabric REST API using Azure Service Principal credentials.
Installation
pip install fabricflow
Sample Usage
Below is a sample workflow that demonstrates how to use FabricFlow to automate workspace creation, pipeline deployment, and data copy operations in Microsoft Fabric.
1. Import Required Libraries
from sempy.fabric import FabricRestClient
from fabricflow import create_workspace, create_data_pipeline
from fabricflow.pipeline.activities import Copy, Lookup
from fabricflow.pipeline.sources import SQLServerSource, GoogleBigQuerySource, PostgreSQLSource, FileSystemSource
from fabricflow.pipeline.sinks import LakehouseTableSink, ParquetFileSink, LakehouseFilesSink
from fabricflow.pipeline.sinks.types import FileCopyBehavior
from fabricflow.pipeline.templates import (
DataPipelineTemplates,
COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE,
COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH,
COPY_FILES_TO_LAKEHOUSE,
LOOKUP_SQL_SERVER,
LOOKUP_SQL_SERVER_FOR_EACH
)
2. Initialize Fabric Client
fabric_client = FabricRestClient()
Note: If you are using
ServicePrincipalTokenProvider, please make sure your Service Principal has access to the workspace and connections you are using.
3. Define Workspace and Capacity
capacity_name = "FabricFlow"
workspace_name = "FabricFlow"
4. Create Workspace (Optional)
You can create a new workspace, or use an existing one by specifying its name.
create_workspace(fabric_client, workspace_name, capacity_name)
5. Deploy Data Pipeline Templates
You can also create individual data pipeline templates by selecting specific templates from the list.
New Feature: You can now use custom pipeline templates by passing a dict (json payload), enum, or file path to
create_data_pipeline.
for template in DataPipelineTemplates:
create_data_pipeline(
fabric_client,
template,
workspace_name
)
6. Define Source and Sink Details
SOURCE_CONNECTION_ID = "your-source-connection-id"
SOURCE_DATABASE_NAME = "AdventureWorks2022"
SINK_WORKSPACE_ID = "your-sink-workspace-id"
SINK_LAKEHOUSE_ID = "your-sink-lakehouse-id"
ITEMS_TO_LOAD = [
{
"source_schema_name": "Sales",
"source_table_name": "SalesOrderHeader",
"source_query": "SELECT * FROM [Sales].[SalesOrderHeader]",
"sink_table_name": "SalesOrderHeader",
"sink_schema_name": "dbo",
"sink_table_action": "Overwrite",
"load_type": "Incremental",
"primary_key_columns": ["SalesOrderID"],
"skip": True,
"load_from_timestamp": None,
"load_to_timestamp": None,
},
# Add more items as needed...
]
7. Copy Data
You can copy data using either a single item per pipeline run (Option 1) or multiple items per pipeline run (Option 2). Choose the option that best fits your requirements.
Note: The examples below uses the new
Copyclass. You can also useCopyManagerfor backward compatibility, butCopyis recommended for new code.
Option 1: Single Item Per Pipeline Run
copy = Copy(
fabric_client,
workspace_name,
COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE
)
source = SQLServerSource(
source_connection_id=SOURCE_CONNECTION_ID,
source_database_name=SOURCE_DATABASE_NAME,
source_query=ITEMS_TO_LOAD[0]["source_query"],
)
sink = LakehouseTableSink(
sink_workspace=SINK_WORKSPACE_ID,
sink_lakehouse=SINK_LAKEHOUSE_ID,
sink_table_name=ITEMS_TO_LOAD[0]["sink_table_name"],
sink_schema_name=ITEMS_TO_LOAD[0]["sink_schema_name"],
sink_table_action=ITEMS_TO_LOAD[0]["sink_table_action"],
)
result = (
copy
.source(source)
.sink(sink)
.execute()
)
Option 2: Multiple Items Per Pipeline Run
copy = Copy(
fabric_client,
workspace_name,
COPY_SQL_SERVER_TO_LAKEHOUSE_TABLE_FOR_EACH
)
source = SQLServerSource(
source_connection_id=SOURCE_CONNECTION_ID,
source_database_name=SOURCE_DATABASE_NAME,
)
sink = LakehouseTableSink(
sink_workspace=SINK_WORKSPACE_ID,
sink_lakehouse=SINK_LAKEHOUSE_ID,
)
result = (
copy
.source(source)
.sink(sink)
.items(ITEMS_TO_LOAD)
.execute()
)
8. Lookup Data (New Feature)
FabricFlow now supports lookup operations for data validation and enrichment:
# Single lookup operation
lookup = Lookup(
fabric_client,
workspace_name,
LOOKUP_SQL_SERVER
)
source = SQLServerSource(
source_connection_id=SOURCE_CONNECTION_ID,
source_database_name=SOURCE_DATABASE_NAME,
source_query="SELECT COUNT(*) as record_count FROM [Sales].[SalesOrderHeader]",
)
result = (
lookup
.source(source)
.execute()
)
# Multiple lookup operations
lookup_items = [
{
"source_query": "SELECT COUNT(*) as order_count FROM [Sales].[SalesOrderHeader]",
"first_row_only": True,
},
{
"source_query": "SELECT MAX(OrderDate) as latest_order FROM [Sales].[SalesOrderHeader]",
"first_row_only": True,
}
]
lookup = Lookup(
fabric_client,
workspace_name,
LOOKUP_SQL_SERVER_FOR_EACH
)
result = (
lookup
.source(source)
.items(lookup_items)
.execute()
)
File System to Lakehouse Copy (New Feature)
FabricFlow now supports copying files from file servers directly to Lakehouse Files area:
copy = Copy(
fabric_client,
workspace_name,
COPY_FILES_TO_LAKEHOUSE
)
# Define file system source with pattern matching and filtering
source = FileSystemSource(
source_connection_id="your-file-server-connection-id",
source_folder_pattern="incoming/data/*", # Wildcard folder pattern
source_file_pattern="*.csv", # File pattern
source_modified_after="2025-01-01T00:00:00Z", # Optional date filter
recursive_search=True, # Recursive directory search
delete_source_after_copy=False, # Keep source files
max_concurrent_connections=10 # Connection limit
)
# Define lakehouse files sink
sink = LakehouseFilesSink(
sink_lakehouse="data-lakehouse",
sink_workspace="analytics-workspace",
sink_directory="processed/files", # Target directory in lakehouse
copy_behavior=FileCopyBehavior.PRESERVE_HIERARCHY, # Maintain folder structure
enable_staging=False, # Direct copy without staging
parallel_copies=4, # Parallel operations
max_concurrent_connections=10 # Connection limit
)
result = (
copy
.source(source)
.sink(sink)
.execute()
)
API Overview
Below are the main classes and functions available in FabricFlow:
Core Pipeline Components
DataPipelineExecutor– Execute data pipelines and monitor their status.DataPipelineError– Exception class for pipeline errors.PipelineStatus– Enum for pipeline run statuses.DataPipelineTemplates– Enum for pipeline templates.get_template– Retrieve a pipeline template definition.get_base64_str– Utility for base64 encoding of template files.create_data_pipeline– Create a new data pipeline from template.
Pipeline Activities
Copy– Build and execute copy activities (replacesCopyManager).Lookup– Build and execute lookup activities for data validation.
Sources and Sinks
SQLServerSource– Define SQL Server as a data source.GoogleBigQuerySource– Define Google BigQuery as a data source.PostgreSQLSource– Define PostgreSQL as a data source.FileSystemSource– Define file server as a data source for file-based operations.BaseSource– Base class for all data sources.LakehouseTableSink– Define a Lakehouse table as a data sink.ParquetFileSink– Define a Parquet file as a data sink.LakehouseFilesSink– Define Lakehouse Files area as a data sink for file operations.BaseSink– Base class for all data sinks.SinkType/SourceType– Enums for sink and source types.FileCopyBehavior– Enum for file copy behavior options.
Workspace and Item Management
FabricCoreItemsManager– Manage core Fabric items via APIs.FabricWorkspacesManager– Manage Fabric workspaces via APIs.get_workspace_id– Get a workspace ID or return the current one.create_workspace– Create a new workspace and assign to a capacity.FabricItemType– Enum for Fabric item types.
Utilities
setup_logging– Configure logging for diagnostics.resolve_connection_id– Resolve a connection by name or ID.resolve_capacity_id– Resolve a capacity by name or ID.ServicePrincipalTokenProvider– Handles Azure Service Principal authentication.
Activities, Sources, and Sinks
FabricFlow provides a modular architecture with separate packages for activities, sources, sinks, and templates:
- Activities:
Copy,Lookup- Build and execute pipeline activities - Sources:
SQLServerSource,GoogleBigQuerySource,PostgreSQLSource,FileSystemSource,BaseSource,SourceType- Define data sources - Sinks:
LakehouseTableSink,ParquetFileSink,LakehouseFilesSink,BaseSink,SinkType,FileCopyBehavior- Define data destinations - Templates: Pre-built pipeline definitions for common patterns
Backward Compatibility
- CopyManager → Copy: The
CopyManagerclass is now renamed toCopyfor consistency. Existing code usingCopyManagerwill continue to work (backward compatible alias), but new code should useCopy.
Development
Read the Contributing file.
License
Author
Parth Lad
Acknowledgements
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fabricflow-0.1.5.tar.gz.
File metadata
- Download URL: fabricflow-0.1.5.tar.gz
- Upload date:
- Size: 50.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d6d2dd6d4e742d76cf8b6fbba3047dce807d0e665f31e48627b6d972d14ce2c2
|
|
| MD5 |
8ac8a5b2bf11d8087bdc5f7046b3062b
|
|
| BLAKE2b-256 |
673643d5a9f8a743553eeacb4b3aef4227435abe2653aba3dddd7cef13c61eaf
|
Provenance
The following attestation bundles were made for fabricflow-0.1.5.tar.gz:
Publisher:
publish.yml on ladparth/fabricflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fabricflow-0.1.5.tar.gz -
Subject digest:
d6d2dd6d4e742d76cf8b6fbba3047dce807d0e665f31e48627b6d972d14ce2c2 - Sigstore transparency entry: 452849586
- Sigstore integration time:
-
Permalink:
ladparth/fabricflow@b34aabf26031e948373999d96eb4942d469a5c22 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/ladparth
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b34aabf26031e948373999d96eb4942d469a5c22 -
Trigger Event:
pull_request
-
Statement type:
File details
Details for the file fabricflow-0.1.5-py3-none-any.whl.
File metadata
- Download URL: fabricflow-0.1.5-py3-none-any.whl
- Upload date:
- Size: 79.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6ee26ddaf1cd1c9edbeba6107953f0396096073c2c854a8da5c8acec0defec28
|
|
| MD5 |
b7b7b833e5a1f452b89f12d4b2b1de2b
|
|
| BLAKE2b-256 |
9f1b493d11ebd7da227721eebbb0acad6c49b3b29fb6f0c3575e3743ac273ef8
|
Provenance
The following attestation bundles were made for fabricflow-0.1.5-py3-none-any.whl:
Publisher:
publish.yml on ladparth/fabricflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fabricflow-0.1.5-py3-none-any.whl -
Subject digest:
6ee26ddaf1cd1c9edbeba6107953f0396096073c2c854a8da5c8acec0defec28 - Sigstore transparency entry: 452849589
- Sigstore integration time:
-
Permalink:
ladparth/fabricflow@b34aabf26031e948373999d96eb4942d469a5c22 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/ladparth
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@b34aabf26031e948373999d96eb4942d469a5c22 -
Trigger Event:
pull_request
-
Statement type: