Spark-based modular ETL pipeline framework.
Project description
mkpipe
mkpipe is a Spark-based, modular ETL framework. It provides a Python-first API for building data pipelines with pluggable extractors, loaders, and inline transformations.
Key Features
- PySpark engine — parallel, partitioned JDBC reads/writes for high-throughput data movement
- Modular plugins —
pip install mkpipe-extractor-postgresto add a source,pip install mkpipe-loader-postgresto add a destination - Single YAML config — define connections, pipelines, and tables in one file
- Tag-based execution — assign tags to tables and run only what you need across all pipelines
- Orchestrator-agnostic — use with Dagster, Airflow, cron, or any Python scheduler
- Inline transformations — reference a Python function (
df → df) directly in YAML - Dependency Injection — pass your own SparkSession (Glue, EMR, Dataproc) or let mkpipe create one
- Incremental & full replication — append-only incremental with
mkpipe_idfor idempotent deduplication
Quick Start
pip install mkpipe mkpipe-extractor-postgres mkpipe-loader-postgres
Create mkpipe_project.yaml:
version: 2
default_environment: prod
prod:
settings:
timezone: UTC
log_dir: /path/to/logs
backend:
variant: sqlite
connections:
source_pg:
variant: postgres
host: localhost
port: 5432
database: source_db
user: ${PG_USER}
password: ${PG_PASSWORD}
schema: public
target_pg:
variant: postgres
host: localhost
port: 5432
database: dwh_db
user: ${PG_USER}
password: ${PG_PASSWORD}
schema: staging
pipelines:
- name: my_pipeline
source: source_pg
destination: target_pg
tables:
- name: public.users
target_name: stg_users
tags: [api, user-domain]
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
write_strategy: upsert
write_key: [id]
- name: public.orders
target_name: stg_orders
tags: [api, order-domain]
replication_method: full
Run it:
# Run all tables
mkpipe run
# Run only tables tagged "api"
mkpipe run --tags api
Python API
import mkpipe
# Run all pipelines
mkpipe.run(config="mkpipe_project.yaml")
# Run a specific pipeline
mkpipe.run(config="mkpipe_project.yaml", pipeline="my_pipeline")
# Run a specific table
mkpipe.run(config="mkpipe_project.yaml", table="stg_users")
# Run by tags — runs matching tables across ALL pipelines
mkpipe.run(config="mkpipe_project.yaml", tags=["api"])
mkpipe.run(config="mkpipe_project.yaml", tags=["api", "order-domain"])
# Combine filters: pipeline + tags
mkpipe.run(config="mkpipe_project.yaml", pipeline="my_pipeline", tags=["api"])
# Pass a custom SparkSession (e.g. AWS Glue, EMR, Dataproc)
mkpipe.run(config="mkpipe_project.yaml", spark=my_spark_session)
# Extract only — returns ExtractResult with a Spark DataFrame
result = mkpipe.extract(config="mkpipe_project.yaml", table="stg_users")
df = result.df
# Load only — pass your own DataFrame
mkpipe.load(config="mkpipe_project.yaml", table="stg_users", df=my_df)
CLI Reference
mkpipe run [OPTIONS]
mkpipe install-jars
| Command / Option | Short | Description |
|---|---|---|
mkpipe run |
Run pipelines from config file | |
--config |
-c |
Path to config file. Default: mkpipe_project.yaml in current dir |
--pipeline |
-p |
Run only the named pipeline |
--table |
-t |
Run only the named table (source name or target name) |
--tags |
Comma-separated tags to filter tables, e.g. --tags api,ingestion |
|
mkpipe install-jars |
Download Maven JARs for all installed plugins (offline/Docker use) |
Examples:
# Run everything
mkpipe run
# Specific config file
mkpipe run --config /path/to/config.yaml
# Single pipeline
mkpipe run -p my_pipeline
# Single table
mkpipe run -t stg_users
# By tags (OR logic: any matching tag)
mkpipe run --tags api
mkpipe run --tags api,ingestion
# Combine: pipeline + tags
mkpipe run -p my_pipeline --tags api
Tags
Tags let you group tables by business domain, team, priority, or any criteria. When you pass tags, mkpipe runs all matching tables across all pipelines (OR logic).
pipelines:
- name: pg_to_pg
source: source_pg
destination: target_pg
tables:
- name: public.users
target_name: stg_users
tags: [api, user-domain, critical]
- name: public.sessions
target_name: stg_sessions
tags: [api, user-domain]
- name: mysql_to_pg
source: source_mysql
destination: target_pg
tables:
- name: orders
target_name: stg_orders
tags: [api, order-domain, critical]
# Runs stg_users + stg_sessions + stg_orders (all have "api")
mkpipe.run(config="config.yaml", tags=["api"])
# Runs stg_users + stg_orders (both have "critical")
mkpipe.run(config="config.yaml", tags=["critical"])
# Runs stg_users + stg_sessions (both have "user-domain")
mkpipe.run(config="config.yaml", tags=["user-domain"])
# OR logic: runs anything tagged "critical" OR "order-domain"
mkpipe.run(config="config.yaml", tags=["critical", "order-domain"])
YAML Configuration Reference
Top-level Structure
version: 2
default_environment: prod # which environment block to use
prod: # environment name
settings: ...
connections: ...
pipelines: ...
staging: # you can define multiple environments
settings: ...
connections: ...
pipelines: ...
Settings
settings:
timezone: UTC # Spark session timezone (default: UTC)
log_dir: ./logs # Log file directory (optional, logs to console if not set)
ingested_at_column: _ingested_at # Column name for ingestion timestamp (default: _ingested_at)
ingestion_id_column: mkpipe_id # Column name for dedup hash ID (default: mkpipe_id)
spark:
master: "local[*]" # Spark master URL (default: local[*])
driver_memory: "4g" # default: auto-detected from system
executor_memory: "4g" # default: auto-detected from system
extra_config: # any additional Spark config
spark.sql.shuffle.partitions: "200"
spark.dynamicAllocation.enabled: "true"
backend:
variant: sqlite # sqlite (default), postgres, duckdb, clickhouse
host: localhost
port: 5432
database: mkpipe_db
user: mkpipe
password: ${BACKEND_PASSWORD}
Connections
connections:
my_postgres:
variant: postgres
host: ${PG_HOST}
port: 5432
database: mydb
user: ${PG_USER}
password: ${PG_PASSWORD}
schema: public
my_mongodb:
variant: mongodb
mongo_uri: ${MONGO_URI}
database: mydb
my_s3:
variant: file
extra:
storage: s3
format: parquet
path: s3a://my-bucket/data
aws_access_key: ${AWS_ACCESS_KEY}
aws_secret_key: ${AWS_SECRET_KEY}
region: eu-west-1
Environment variables are referenced with ${VAR_NAME} syntax and resolved at load time.
Connection Parameters
| Parameter | Description |
|---|---|
variant |
Required. Plugin type: postgres, mysql, mongodb, file, etc. |
host |
Database host |
port |
Database port |
database |
Database name |
user |
Username |
password |
Password |
schema |
Schema name |
warehouse |
Warehouse (Snowflake) |
private_key_file |
Path to private key file (RSA auth) |
private_key_file_pwd |
Private key passphrase |
mongo_uri |
Full MongoDB connection URI |
bucket_name |
S3/GCS bucket name |
s3_prefix |
S3 key prefix |
aws_access_key |
AWS access key |
aws_secret_key |
AWS secret key |
region |
Cloud region |
credentials_file |
Path to credentials file (GCS service account) |
api_key |
API key |
oauth_token |
OAuth token |
client_id |
OAuth client ID |
client_secret |
OAuth client secret |
extra |
Dict of additional options (storage, format, path, etc.) |
Pipelines & Tables
pipelines:
- name: my_pipeline # unique pipeline name
source: source_pg # connection name for extraction
destination: target_pg # connection name for loading
pass_on_error: false # if true, continue on table failure
tables:
- name: public.users
target_name: stg_users
tags: [api, user-domain]
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
partitions_column: id
partitions_count: 10
fetchsize: 100000
batchsize: 10000
write_partitions: 4
dedup_columns: [id, updated_at]
write_strategy: upsert
write_key: [id]
custom_query: "(SELECT id, name, updated_at FROM users {query_filter}) q"
transform: transforms/clean_users.py::transform
pass_on_error: false
Table Parameters
| Parameter | Default | Description |
|---|---|---|
name |
required | Source table/collection name |
target_name |
required | Destination table name |
tags |
[] |
List of tags for filtering (--tags api,ingestion) |
replication_method |
full |
full or incremental |
iterate_column |
None |
Column for incremental tracking (required if incremental) |
iterate_column_type |
None |
datetime or int |
partitions_column |
iterate_column | Column for Spark JDBC partitioning |
partitions_column_type |
auto | Type of partition column: int or datetime. Defaults to int if partitions_column is specified, otherwise inherits iterate_column_type |
partitions_count |
10 |
Number of JDBC read partitions |
fetchsize |
100000 |
JDBC fetch size (rows per network round trip) |
batchsize |
10000 |
JDBC write batch size |
write_partitions |
None |
Number of write partitions (coalesce before writing) |
dedup_columns |
None |
Columns for dedup hash generation (xxhash64). Column name configurable via settings.ingestion_id_column |
custom_query |
None |
Custom SQL query with {query_filter} placeholder |
custom_query_file |
None |
Path to .sql file (relative to config directory) |
transform |
None |
Transform function reference: path/to/file.py::function |
write_strategy |
None |
Write strategy: append, replace, upsert, merge (see below) |
write_key |
None |
Key columns for upsert/merge (required when strategy is upsert or merge) |
pass_on_error |
false |
Continue pipeline on this table's failure |
Write Strategy
write_strategy controls how data is written to the destination. If not set, it is inferred automatically: overwrite → replace, append → append.
| Strategy | Behavior |
|---|---|
append |
Insert new rows. No deduplication. |
replace |
Drop/overwrite all existing data, then insert. |
upsert |
Insert new rows, update existing rows by write_key. Uses MERGE/ON CONFLICT for SQL databases. |
merge |
Full MERGE with matched update + not-matched insert (JDBC loaders only, same as upsert for most targets). |
Usage
tables:
# Upsert: update existing rows by primary key, insert new ones
- name: public.users
target_name: stg_users
replication_method: incremental
iterate_column: updated_at
write_strategy: upsert
write_key: [id]
# Replace: full overwrite every run
- name: public.orders
target_name: stg_orders
replication_method: full
write_strategy: replace
# Append (default for incremental): just insert
- name: public.events
target_name: stg_events
replication_method: incremental
iterate_column: created_at
write_strategy: append
Supported Strategies per Loader
| Loader | append | replace | upsert | merge |
|---|---|---|---|---|
| PostgreSQL, MySQL, MariaDB, SQL Server, Oracle, Redshift, SQLite, TimescaleDB | Y | Y | Y | Y |
| Snowflake | Y | Y | Y | Y |
| BigQuery | Y | Y | Y | Y |
| MongoDB | Y | Y | Y | — |
| ClickHouse | Y | Y | Y | — |
| Elasticsearch | Y | Y | Y | — |
| DynamoDB | Y | Y | Y | — |
| Cassandra | Y | Y | Y | — |
| InfluxDB | Y | Y | Y | — |
| Redis | — | Y | Y | — |
| File (Parquet/CSV/Iceberg/Delta) | Y | Y | — | — |
Validation Rules
write_strategy: upsertormergerequireswrite_key— raisesConfigErrorif missing.write_keyis ignored when strategy isappendorreplace.- If
write_strategyis not set, the strategy is inferred from the extractor's write mode.
Incremental Replication
mkpipe uses an append-only strategy for incremental replication:
- Extract: reads rows where
iterate_column >= last_point(inclusive, no boundary loss) - Load: appends to the target table (never overwrites or deletes)
- Dedup: if
dedup_columnsis set, amkpipe_id(xxhash64 hash) is generated for downstream deduplication
An ingestion timestamp column is always added to every row. The column name defaults to _ingested_at and can be customized via settings.ingested_at_column.
- name: public.users
target_name: stg_users
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
dedup_columns: [id, updated_at] # mkpipe_id = xxhash64(id, updated_at)
Partition Column Type Behavior
The partitions_column_type parameter controls how partition bounds are converted for Spark JDBC partitioning:
Scenario 1: No partition column specified (default)
- name: public.orders
replication_method: incremental
iterate_column: created_at
iterate_column_type: datetime
# partitions_column defaults to created_at
# partitions_column_type inherits datetime from iterate_column_type
Scenario 2: Integer partition column specified
- name: public.customers
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
partitions_column: customer_id
# partitions_column_type defaults to 'int' (most partition keys are integers)
# Handles PostgreSQL NUMERIC/DECIMAL types correctly
Scenario 3: Explicit datetime partition column
- name: public.events
replication_method: incremental
iterate_column: event_id
iterate_column_type: int
partitions_column: event_timestamp
partitions_column_type: datetime # Must be explicit if different from iterate_column_type
Downstream dedup query example:
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY mkpipe_id ORDER BY _ingested_at DESC) AS rn
FROM stg_users
) WHERE rn = 1
Inline Transformations
Add a transform field to any table:
tables:
- name: public.products
target_name: stg_products
replication_method: full
transform: transforms/clean_products.py::transform
The transform function receives and returns a PySpark DataFrame:
# transforms/clean_products.py
def transform(df):
df = df.filter(df.status != "deleted")
return df
Orchestrator Integration
Dagster
from dagster import asset, Definitions
import mkpipe
@asset
def api_tables():
mkpipe.run(config="mkpipe_project.yaml", tags=["api"])
@asset
def critical_tables():
mkpipe.run(config="mkpipe_project.yaml", tags=["critical"])
defs = Definitions(assets=[api_tables, critical_tables])
Airflow
from airflow.decorators import task
@task
def sync_api_tables():
import mkpipe
mkpipe.run(config="/path/to/mkpipe_project.yaml", tags=["api"])
@task
def sync_user_domain():
import mkpipe
mkpipe.run(config="/path/to/mkpipe_project.yaml", tags=["user-domain"])
Backend (State Tracking)
mkpipe tracks pipeline state (last sync point, status) in a manifest database. Default is SQLite (zero-config). PostgreSQL, DuckDB, and ClickHouse are also supported:
settings:
backend:
variant: postgres
host: localhost
port: 5432
database: mkpipe_db
user: mkpipe
password: ${BACKEND_PASSWORD}
Install optional backend dependencies:
pip install mkpipe[postgres-backend]
pip install mkpipe[duckdb-backend]
pip install mkpipe[clickhouse-backend]
pip install mkpipe[all-backends]
Custom Exceptions
mkpipe provides specific exception classes for clean error handling:
from mkpipe import (
MkpipeError, # base exception
ConfigError, # YAML or configuration issues
ExtractionError, # data extraction failures
LoadError, # data loading failures
TransformError, # transformation failures
PluginNotFoundError, # missing plugin
BackendError, # backend manifest failures
)
JAR Management
mkpipe plugins that depend on JDBC drivers or Spark connectors need JAR files. mkpipe handles this automatically — no manual steps required for most users.
Online (Default) — Lazy Download
When mkpipe starts, it detects which plugins are installed and resolves their Maven dependencies via spark.jars.packages. JARs are downloaded on first run and cached by Spark's Ivy resolver.
# Nothing to do — just run your pipeline
mkpipe run
Offline / Docker — Pre-download JARs
For air-gapped or on-premise environments without internet access, pre-download all JARs during the Docker build:
mkpipe install-jars
This command:
- Discovers all installed plugins and their Maven dependencies
- Downloads JARs via Spark's Ivy resolver into a temp directory
- Copies them into each plugin's
jars/directory - Cleans up the temp Ivy cache
Dockerfile example:
FROM python:3.11-slim
# Install Java (required for PySpark)
RUN apt-get update && apt-get install -y default-jdk && rm -rf /var/lib/apt/lists/*
# Install mkpipe and plugins
RUN pip install mkpipe mkpipe-extractor-postgres mkpipe-loader-clickhouse
# Pre-download JARs (no internet needed at runtime)
RUN mkpipe install-jars
COPY mkpipe_project.yaml .
CMD ["mkpipe", "run"]
How It Works
| Scenario | Local JARs in jars/ |
Maven resolution |
|---|---|---|
| Fresh install, online | No | Yes — spark.jars.packages |
After mkpipe install-jars |
Yes | No — local JARs used |
Plugin with custom JAR (e.g. MongoDB mkpipe-tls-helper.jar) |
Yes (custom) | Yes — only for missing Maven deps |
CLI Reference
mkpipe install-jars # Download all Maven JARs for installed plugins
Available Plugins
For the full list, visit the mkpipe-hub.
Extractors
- PostgreSQL, MySQL, MariaDB, SQL Server, Oracle, SQLite, Redshift, ClickHouse, MongoDB, Snowflake, BigQuery, Cassandra, TimescaleDB, DynamoDB, Elasticsearch, InfluxDB, Redis, File (S3/GCS/local/Iceberg/Delta)
Loaders
- PostgreSQL, MySQL, MariaDB, SQL Server, Oracle, SQLite, Redshift, ClickHouse, MongoDB, Snowflake, BigQuery, Cassandra, TimescaleDB, DynamoDB, Elasticsearch, InfluxDB, Redis, File (S3/GCS/local/Iceberg/Delta)
License
Apache 2.0 — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mkpipe-0.10.0.tar.gz.
File metadata
- Download URL: mkpipe-0.10.0.tar.gz
- Upload date:
- Size: 39.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1c196c1dbc9556709ce2ad34bb195b8737e8e59d81117f43f590a68dce841822
|
|
| MD5 |
2d33c045110fb563765da47c5802059d
|
|
| BLAKE2b-256 |
0fcea17d68f8eeaf6316f5467648857a195617e6531764dee1c6b5d6ebfa1ca7
|
File details
Details for the file mkpipe-0.10.0-py3-none-any.whl.
File metadata
- Download URL: mkpipe-0.10.0-py3-none-any.whl
- Upload date:
- Size: 39.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
69ab1cd1bb4127025269b46ec957ea034ed04aa28801c2fe446f260d6b9daec9
|
|
| MD5 |
1b4b77da9dcd8de887899fb8e40b2fe7
|
|
| BLAKE2b-256 |
8a8bfb36a44ac66d04d01cdb8e64e703d45c64b07606d4d8e2e434fb5f7dbcc6
|