Spark-based modular ETL pipeline framework.
Project description
mkpipe
mkpipe is a Spark-based, modular ETL framework. It provides a Python-first API for building data pipelines with pluggable extractors, loaders, and inline transformations.
Key Features
- PySpark engine — parallel, partitioned JDBC reads/writes for high-throughput data movement
- Modular plugins —
pip install mkpipe-extractor-postgresto add a source,pip install mkpipe-loader-postgresto add a destination - Single YAML config — define connections, pipelines, and tables in one file
- Tag-based execution — assign tags to tables and run only what you need across all pipelines
- Orchestrator-agnostic — use with Dagster, Airflow, cron, or any Python scheduler
- Inline transformations — reference a Python function (
df → df) directly in YAML - Dependency Injection — pass your own SparkSession (Glue, EMR, Dataproc) or let mkpipe create one
- Incremental & full replication — append-only incremental with
mkpipe_idfor idempotent deduplication
Quick Start
pip install mkpipe mkpipe-extractor-postgres mkpipe-loader-postgres
Create mkpipe_project.yaml:
version: 2
default_environment: prod
prod:
settings:
timezone: UTC
backend:
variant: sqlite
connections:
source_pg:
variant: postgresql
host: localhost
port: 5432
database: source_db
user: ${PG_USER}
password: ${PG_PASSWORD}
schema: public
target_pg:
variant: postgresql
host: localhost
port: 5432
database: dwh_db
user: ${PG_USER}
password: ${PG_PASSWORD}
schema: staging
pipelines:
- name: my_pipeline
source: source_pg
destination: target_pg
tables:
- name: public.users
target_name: stg_users
tags: [api, user-domain]
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
dedup_columns: [id, updated_at]
- name: public.orders
target_name: stg_orders
tags: [api, order-domain]
replication_method: full
Run it:
# Run all tables
mkpipe run
# Run only tables tagged "api"
mkpipe run --tags api
Python API
import mkpipe
# Run all pipelines
mkpipe.run(config="mkpipe_project.yaml")
# Run a specific pipeline
mkpipe.run(config="mkpipe_project.yaml", pipeline="my_pipeline")
# Run a specific table
mkpipe.run(config="mkpipe_project.yaml", table="stg_users")
# Run by tags — runs matching tables across ALL pipelines
mkpipe.run(config="mkpipe_project.yaml", tags=["api"])
mkpipe.run(config="mkpipe_project.yaml", tags=["api", "order-domain"])
# Combine filters: pipeline + tags
mkpipe.run(config="mkpipe_project.yaml", pipeline="my_pipeline", tags=["api"])
# Pass a custom SparkSession (e.g. AWS Glue, EMR, Dataproc)
mkpipe.run(config="mkpipe_project.yaml", spark=my_spark_session)
# Extract only — returns ExtractResult with a Spark DataFrame
result = mkpipe.extract(config="mkpipe_project.yaml", table="stg_users")
df = result.df
# Load only — pass your own DataFrame
mkpipe.load(config="mkpipe_project.yaml", table="stg_users", df=my_df)
CLI Reference
mkpipe run [OPTIONS]
| Option | Short | Description |
|---|---|---|
--config |
-c |
Path to config file. Default: mkpipe_project.yaml in current dir |
--pipeline |
-p |
Run only the named pipeline |
--table |
-t |
Run only the named table (source name or target name) |
--tags |
Comma-separated tags to filter tables, e.g. --tags api,ingestion |
Examples:
# Run everything
mkpipe run
# Specific config file
mkpipe run --config /path/to/config.yaml
# Single pipeline
mkpipe run -p my_pipeline
# Single table
mkpipe run -t stg_users
# By tags (OR logic: any matching tag)
mkpipe run --tags api
mkpipe run --tags api,ingestion
# Combine: pipeline + tags
mkpipe run -p my_pipeline --tags api
Tags
Tags let you group tables by business domain, team, priority, or any criteria. When you pass tags, mkpipe runs all matching tables across all pipelines (OR logic).
pipelines:
- name: pg_to_pg
source: source_pg
destination: target_pg
tables:
- name: public.users
target_name: stg_users
tags: [api, user-domain, critical]
- name: public.sessions
target_name: stg_sessions
tags: [api, user-domain]
- name: mysql_to_pg
source: source_mysql
destination: target_pg
tables:
- name: orders
target_name: stg_orders
tags: [api, order-domain, critical]
# Runs stg_users + stg_sessions + stg_orders (all have "api")
mkpipe.run(config="config.yaml", tags=["api"])
# Runs stg_users + stg_orders (both have "critical")
mkpipe.run(config="config.yaml", tags=["critical"])
# Runs stg_users + stg_sessions (both have "user-domain")
mkpipe.run(config="config.yaml", tags=["user-domain"])
# OR logic: runs anything tagged "critical" OR "order-domain"
mkpipe.run(config="config.yaml", tags=["critical", "order-domain"])
YAML Configuration Reference
Top-level Structure
version: 2
default_environment: prod # which environment block to use
prod: # environment name
settings: ...
connections: ...
pipelines: ...
staging: # you can define multiple environments
settings: ...
connections: ...
pipelines: ...
Settings
settings:
timezone: UTC # Spark session timezone (default: UTC)
spark:
master: "local[*]" # Spark master URL (default: local[*])
driver_memory: "4g" # default: auto-detected from system
executor_memory: "4g" # default: auto-detected from system
extra_config: # any additional Spark config
spark.sql.shuffle.partitions: "200"
spark.dynamicAllocation.enabled: "true"
backend:
variant: sqlite # sqlite (default), postgresql, duckdb, clickhouse
host: localhost
port: 5432
database: mkpipe_db
user: mkpipe
password: ${BACKEND_PASSWORD}
Connections
connections:
my_postgres:
variant: postgresql
host: ${PG_HOST}
port: 5432
database: mydb
user: ${PG_USER}
password: ${PG_PASSWORD}
schema: public
my_mongodb:
variant: mongodb
mongo_uri: ${MONGO_URI}
database: mydb
my_s3:
variant: file
extra:
storage: s3
format: parquet
path: s3a://my-bucket/data
aws_access_key: ${AWS_ACCESS_KEY}
aws_secret_key: ${AWS_SECRET_KEY}
region: eu-west-1
Environment variables are referenced with ${VAR_NAME} syntax and resolved at load time.
Connection Parameters
| Parameter | Description |
|---|---|
variant |
Required. Plugin type: postgresql, mysql, mongodb, file, etc. |
host |
Database host |
port |
Database port |
database |
Database name |
user |
Username |
password |
Password |
schema |
Schema name |
warehouse |
Warehouse (Snowflake) |
private_key_file |
Path to private key file (RSA auth) |
private_key_file_pwd |
Private key passphrase |
mongo_uri |
Full MongoDB connection URI |
bucket_name |
S3/GCS bucket name |
s3_prefix |
S3 key prefix |
aws_access_key |
AWS access key |
aws_secret_key |
AWS secret key |
region |
Cloud region |
credentials_file |
Path to credentials file (GCS service account) |
api_key |
API key |
oauth_token |
OAuth token |
client_id |
OAuth client ID |
client_secret |
OAuth client secret |
extra |
Dict of additional options (storage, format, path, etc.) |
Pipelines & Tables
pipelines:
- name: my_pipeline # unique pipeline name
source: source_pg # connection name for extraction
destination: target_pg # connection name for loading
pass_on_error: false # if true, continue on table failure
tables:
- name: public.users
target_name: stg_users
tags: [api, user-domain]
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
partitions_column: id
partitions_count: 10
fetchsize: 100000
batchsize: 10000
write_partitions: 4
dedup_columns: [id, updated_at]
custom_query: "(SELECT id, name, updated_at FROM users {query_filter}) q"
transform: transforms/clean_users.py::transform
pass_on_error: false
Table Parameters
| Parameter | Default | Description |
|---|---|---|
name |
required | Source table/collection name |
target_name |
required | Destination table name |
tags |
[] |
List of tags for filtering (--tags api,ingestion) |
replication_method |
full |
full or incremental |
iterate_column |
None |
Column for incremental tracking (required if incremental) |
iterate_column_type |
None |
datetime or int |
partitions_column |
iterate_column | Column for Spark JDBC partitioning |
partitions_count |
10 |
Number of JDBC read partitions |
fetchsize |
100000 |
JDBC fetch size (rows per network round trip) |
batchsize |
10000 |
JDBC write batch size |
write_partitions |
None |
Number of write partitions (coalesce before writing) |
dedup_columns |
None |
Columns for mkpipe_id hash generation (xxhash64) |
custom_query |
None |
Custom SQL query with {query_filter} placeholder |
custom_query_file |
None |
Path to .sql file (relative to sql/ directory) |
transform |
None |
Transform function reference: path/to/file.py::function |
pass_on_error |
false |
Continue pipeline on this table's failure |
Incremental Replication
mkpipe uses an append-only strategy for incremental replication:
- Extract: reads rows where
iterate_column >= last_point(inclusive, no boundary loss) - Load: appends to the target table (never overwrites or deletes)
- Dedup: if
dedup_columnsis set, amkpipe_id(xxhash64 hash) is generated for downstream deduplication
An etl_time timestamp is always added to every row.
- name: public.users
target_name: stg_users
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
dedup_columns: [id, updated_at] # mkpipe_id = xxhash64(id, updated_at)
Downstream dedup query example:
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY mkpipe_id ORDER BY etl_time DESC) AS rn
FROM stg_users
) WHERE rn = 1
Inline Transformations
Add a transform field to any table:
tables:
- name: public.products
target_name: stg_products
replication_method: full
transform: transforms/clean_products.py::transform
The transform function receives and returns a PySpark DataFrame:
# transforms/clean_products.py
def transform(df):
df = df.filter(df.status != "deleted")
return df
Orchestrator Integration
Dagster
from dagster import asset, Definitions
import mkpipe
@asset
def api_tables():
mkpipe.run(config="mkpipe_project.yaml", tags=["api"])
@asset
def critical_tables():
mkpipe.run(config="mkpipe_project.yaml", tags=["critical"])
defs = Definitions(assets=[api_tables, critical_tables])
Airflow
from airflow.decorators import task
@task
def sync_api_tables():
import mkpipe
mkpipe.run(config="/path/to/mkpipe_project.yaml", tags=["api"])
@task
def sync_user_domain():
import mkpipe
mkpipe.run(config="/path/to/mkpipe_project.yaml", tags=["user-domain"])
Backend (State Tracking)
mkpipe tracks pipeline state (last sync point, status) in a manifest database. Default is SQLite (zero-config). PostgreSQL, DuckDB, and ClickHouse are also supported:
settings:
backend:
variant: postgresql
host: localhost
port: 5432
database: mkpipe_db
user: mkpipe
password: ${BACKEND_PASSWORD}
Install optional backend dependencies:
pip install mkpipe[postgres-backend]
pip install mkpipe[duckdb-backend]
pip install mkpipe[clickhouse-backend]
pip install mkpipe[all-backends]
Custom Exceptions
mkpipe provides specific exception classes for clean error handling:
from mkpipe import (
MkpipeError, # base exception
ConfigError, # YAML or configuration issues
ExtractionError, # data extraction failures
LoadError, # data loading failures
TransformError, # transformation failures
PluginNotFoundError, # missing plugin
BackendError, # backend manifest failures
)
Available Plugins
For the full list, visit the mkpipe-hub.
Extractors
- PostgreSQL, MySQL, MariaDB, SQL Server, Oracle, SQLite, Redshift, ClickHouse, MongoDB, File (S3/GCS/local), Snowflake
Loaders
- PostgreSQL, MySQL, MariaDB, SQLite, File (S3/GCS/local), Snowflake, ClickHouse
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mkpipe-0.7.1.tar.gz.
File metadata
- Download URL: mkpipe-0.7.1.tar.gz
- Upload date:
- Size: 28.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fdcda8acd919dc014c7f3e3e4760dc6258f62b0d59128083ec806340fd2b703a
|
|
| MD5 |
39f7442920f692c14e2a2acc3467da71
|
|
| BLAKE2b-256 |
48eb3d988c06d5b09f0e8b7d22678426bc92e3477e438c074c26eb43decf61b1
|
File details
Details for the file mkpipe-0.7.1-py3-none-any.whl.
File metadata
- Download URL: mkpipe-0.7.1-py3-none-any.whl
- Upload date:
- Size: 31.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58542a388b7905be8529220ede7cee5a5b1d34e114b1350145c436a2b8270091
|
|
| MD5 |
e503f6d82dcea7e75865394c440b5599
|
|
| BLAKE2b-256 |
9fd74ae168030a960da8fa3ecf0600a30fc70c3c6314f09384065afab36537e9
|