This package contains utility functions for Prefect and Snowflake
Project description
orchestration-utilities
This repository holds the utilities modules that are essential for ETL operations. This repository will be used as a package and serve the ETL flows.
This package will be used in the PREFECT flows and SNOWFLAKE as part of the ETL operations.
Installation
Install the package using PyPI:
pip install orchestration-utils
For development installation with testing dependencies:
git clone https://github.com/cloudfactory/orchestration-utilities.git
cd orchestration-utilities
pip install -r requirements-dev.txt
pip install -e .
Inside this package
1. aws.py
This module contains the functions that are used to interact with the AWS services.
Example: S3
2. copy_into_s3
This module contains the functions that can be used to copy the data from the Snowflake Stage(S3 Bucket) to the Snowflake Table.
This module leverages the etl_operations module to perform the Schema Drift Handeling and Query Execution.
This module works best with the Stages that are partitioned well. Example: The data in the S3 bucket is partitioned by date, year, month, etc.
This module does not perform well if the data is not partitioned well in the S3 bucket.
Example: If the data in the S3 bucket is dropped under a single folder without any partitioning, then the copy operation will take a lot of time to complete. Given the folder is heavy with files.
Class/Groups:
CopyIntoTable: This class contains the functions that are used to copy the data from the Snowflake Stage(S3 Bucket) to the Snowflake Table.copy_into_snowflake_table: This function is the main function that is used to copy the data from the Snowflake Stage(S3 Bucket) to the Snowflake Table. It accepts the parameterforcewhich is used to force the copy operation to be performed even if the data is already present in the table. The default value of theforceparameter isFalse.
3. s3_to_clickhouse
Production-ready module for loading data from S3 directly into ClickHouse using the dlt (Data Loading Tool) library.
This module has been extensively updated with powerful new features for intelligent data partition filtering, automatic schema management, and optimized parallel processing.
Key Features:
- Date-Based Partition Discovery & Filtering: Automatically discovers S3 folders with dates (e.g.,
2024-01-15/) and filters based onstart_dateandend_dateparameters - Smart Path Filtering Logic:
- Folders with dates in their names are filtered by the date range
- Nested dates are ignored (only first-level folders are checked)
- Automatic Table Name Generation: Auto-generates table names following pattern
s3_{type}_{source}_{project}when not explicitly provided - Dynamic Schema Management: Powered by
dltfor automatic schema inference, table creation, and evolution: https://dlthub.com/docs/general-usage/schema-evolution - Intelligent Worker Allocation: Automatically calculates optimal worker count based on data size (1-16 workers)
- Rich Metadata Tracking: Every record enriched with
id,ingestion_date,s3_last_modified,s3_etag,folder_name,file_name,json_object,type,source, andprojectcolumns - File Format Support: JSON, CSV (with automatic normalization of variants like
jsoneachrow,ndjson) - Prefect Integration: Seamless workflow orchestration with dedicated Prefect flows
- Two-Level Parallelism: Partition-level parallelism via Prefect + file-level parallelism via
dlt - S3 Structure Caching: Intelligent caching to avoid redundant S3 scans
Classes:
S3ToClickHouseDLT: Main class for S3 → ClickHouse data loading usingdltS3ToClickHouseResult: Result dataclass with load statistics (rows inserted, success status, message)
Key Methods:
load_from_s3(force=False, workers=None): Primary loading method with optional force reload and worker overrides3_to_clickhouse_flow(start_date, end_date, config_data): Prefect flow for batch processing multiple ingestion jobs
Date Filtering Behavior:
The module implements sophisticated path filtering:
Example S3 Structure:
s3://bucket/
├── foo1.csv
├── 2024-01-15/
│ └── data.csv
└── logs/
└── app.log
With start_date=2024-01-01 and end_date=2024-01-31 and discover_partitions is true:
- ✅
s3://bucket/foo1.csv- Excluded (file at root) - ✅
s3://bucket/2024-01-15/- Included (folder date within range) - ✅
s3://bucket/2024-01-15/data.csv- Included (file in valid folder) - ❌
s3://bucket/2024-12-30/- Excluded (folder date outside range) - ✅
s3://bucket/logs/- Excluded (no date in folder name)
With discover_partitions as false:
- ✅
s3://bucket/foo1.csv- Included (file at root) - ✅
s3://bucket/2024-01-15/- Included (folder date within range) - ✅
s3://bucket/2024-01-15/data.csv- Included (file in valid folder) - ❌
s3://bucket/2024-12-30/- Included (folder date outside range) - ✅
s3://bucket/logs/- Included (no date in folder name)
Quick Example:
from orchestration_utils.s3_to_clickhouse import S3ToClickHouseDLT
from datetime import datetime
# Initialize with date filtering and auto table naming
loader = S3ToClickHouseDLT(
# ClickHouse credentials from Prefect JSONSecret block
clickhouse_credentials_block="my-clickhouse-creds",
clickhouse_database="analytics",
# S3 source configuration
s3_bucket="data-lake",
s3_prefix="events/",
s3_pattern="*.json",
file_format="json",
# AWS credentials via Prefect block (optional - uses default credential chain if not provided)
aws_credentials_block="my-aws-creds",
# Auto-generate table name: s3_inference_api_chatbot
data_type="inference",
data_source="api",
project="chatbot",
# Date filtering (only process date-based folders within range)
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 1, 31),
# Optional: explicit worker count (auto-calculated if omitted)
workers=8
)
# Load data
result = loader.load_from_s3()
if result.success:
print(f"✅ Loaded {result.rows_inserted} rows")
print(f"📊 Table: {loader.table_name}") # Output: s3_inference_api_chatbot
else:
print(f"❌ Failed: {result.message}")
loader.close()
Worker Auto-Calculation:
If workers parameter is not specified, the system automatically calculates the optimal number based on data size:
- < 10 MB: 1 worker
- 10 MB - 100 MB: 2 workers
- 100 MB - 500 MB: 4 workers
- 500 MB - 1 GB: 8 workers
- > 1 GB: 16 workers
Workers are always capped between 1 and 16 for optimal performance.
4. etl_contol.py
This module contains the functions that interact with Snowflake and stores the states of the flows in the database.
- This module accepts the connection(connection_creds) paramater where the default value is
snowflake-prefect-user, pipeline name and environment name. - The pipeline name and environment name are used to store the states of the flows in the database. Example when the flow is started, completed, failed, etc.
5. etl_operations.py
This module contains the functions that are used to perform the ETL operations either in the Destination table or in the Source table.
Class/Groups:
CreateConnections: This class is used to create the connections to the databases. The connections are created using the connection credentials and warehouse name.SnowflakeDestination: This class contains all the load types and the functions that are used to load the data into the Snowflake tables.
This class accepts the connection credentials (by default the value issnowflake-prefect-user), warehouse name(by default the value isloading), database name, and environment name(by default the value isdev).DataFrameHadler: This class contains the functions that converts the dataframes columns to the relevant data types.SchemaDriftHandler: This class contains the functions that are used to handle the schema drifts in the destination table.SnowflakeSource: This class contains the functions that are used to extract the data from the Snowflake tables.
6. notifications.py
This module contains the functions that are used to send the notifications to Slack. The Webhook blocks need to be created in Prefect first to send the notifications to Slack.
Class/Groups:
SlackWebhooksNotification: This class is used to send the notifications to Slack. The Class accepts the webhook name and the message that needs to be sent to Slack.
7. queries.py
This module contains the queries that are used to perform the ETL operations in the Snowflake tables. This module is referred by the etl_control and etl_operations modules.
Development
Running Tests
Run the test suite using pytest:
make test
Or directly with pytest:
python -m pytest test -v
For coverage report:
python -m pytest test -v --cov=orchestration_utils --cov-report=html
Building the Package Locally
Install the dependencies in your virtual environment:
pip install -r requirements-dev.txt
Build dist folder where .whl and .tar.gz files are created:
make build
This will create the dist folder with:
orchestration_utils-0.0.0.tar.gzorchestration_utils-0.0.0-py3-none-any.whl
The .whl file can be installed using: pip install dist/orchestration_utils-0.0.0-py3-none-any.whl
How to deploy
Deploy the package to the PYPI using Github Actions. There are two workflows one to deploy in dev and the other to deploy in production.
1. Dev/Manual Release to TestPyPI
- Click on Run workflow
- Select the branch that you have made the changes
- The changes will be refelcted in TestPyPI
2. Prod Release to PyPI
- Click on Run workflow
- Select the
mainbranch only - The changes will be refelcted in PyPI
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file orchestration_utils-0.2.0.tar.gz.
File metadata
- Download URL: orchestration_utils-0.2.0.tar.gz
- Upload date:
- Size: 36.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
182b70b2354cf7779bee1e6389ae2a50deec43f0baf4fce9fcd14be35503c87d
|
|
| MD5 |
a28bca8006835fca36dc9635f5799c25
|
|
| BLAKE2b-256 |
7540873417610102ff5314dc70f4b19d3a394dc802fae6f7e187a13b1f94c8a6
|
Provenance
The following attestation bundles were made for orchestration_utils-0.2.0.tar.gz:
Publisher:
prod-release.yml on cloudfactory/orchestration-utilities
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
orchestration_utils-0.2.0.tar.gz -
Subject digest:
182b70b2354cf7779bee1e6389ae2a50deec43f0baf4fce9fcd14be35503c87d - Sigstore transparency entry: 991576897
- Sigstore integration time:
-
Permalink:
cloudfactory/orchestration-utilities@380ec07079fa7fb7db91f08eb5426a2afea3a1dc -
Branch / Tag:
refs/heads/main - Owner: https://github.com/cloudfactory
-
Access:
internal
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
prod-release.yml@380ec07079fa7fb7db91f08eb5426a2afea3a1dc -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file orchestration_utils-0.2.0-py3-none-any.whl.
File metadata
- Download URL: orchestration_utils-0.2.0-py3-none-any.whl
- Upload date:
- Size: 37.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3042052de85ca6cd138d43ec269345939e74a0a26f33ac3aafcdb941f3fb7c12
|
|
| MD5 |
bdf6228b4d15faf3ecf4a61e4fa46ad7
|
|
| BLAKE2b-256 |
3bb2ec0cd648e59c9b012eb419b223a080ad9f22887cf08c22f9dfe655dc47ec
|
Provenance
The following attestation bundles were made for orchestration_utils-0.2.0-py3-none-any.whl:
Publisher:
prod-release.yml on cloudfactory/orchestration-utilities
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
orchestration_utils-0.2.0-py3-none-any.whl -
Subject digest:
3042052de85ca6cd138d43ec269345939e74a0a26f33ac3aafcdb941f3fb7c12 - Sigstore transparency entry: 991576899
- Sigstore integration time:
-
Permalink:
cloudfactory/orchestration-utilities@380ec07079fa7fb7db91f08eb5426a2afea3a1dc -
Branch / Tag:
refs/heads/main - Owner: https://github.com/cloudfactory
-
Access:
internal
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
prod-release.yml@380ec07079fa7fb7db91f08eb5426a2afea3a1dc -
Trigger Event:
workflow_dispatch
-
Statement type: