Skip to main content

Spark-based modular ETL pipeline framework.

Project description

mkpipe

mkpipe is a Spark-based, modular ETL framework. It provides a Python-first API for building data pipelines with pluggable extractors, loaders, and inline transformations.

Key Features

  • PySpark engine — parallel, partitioned JDBC reads/writes for high-throughput data movement
  • Modular pluginspip install mkpipe-extractor-postgres to add a source, pip install mkpipe-loader-postgres to add a destination
  • Single YAML config — define connections, pipelines, and tables in one file
  • Tag-based execution — assign tags to tables and run only what you need across all pipelines
  • Orchestrator-agnostic — use with Dagster, Airflow, cron, or any Python scheduler
  • Inline transformations — reference a Python function (df → df) directly in YAML
  • Dependency Injection — pass your own SparkSession (Glue, EMR, Dataproc) or let mkpipe create one
  • Incremental & full replication — append-only incremental with mkpipe_id for idempotent deduplication

Quick Start

pip install mkpipe mkpipe-extractor-postgres mkpipe-loader-postgres

Create mkpipe_project.yaml:

version: 2
default_environment: prod

prod:
  settings:
    timezone: UTC
    backend:
      variant: sqlite

  connections:
    source_pg:
      variant: postgresql
      host: localhost
      port: 5432
      database: source_db
      user: ${PG_USER}
      password: ${PG_PASSWORD}
      schema: public

    target_pg:
      variant: postgresql
      host: localhost
      port: 5432
      database: dwh_db
      user: ${PG_USER}
      password: ${PG_PASSWORD}
      schema: staging

  pipelines:
    - name: my_pipeline
      source: source_pg
      destination: target_pg
      tables:
        - name: public.users
          target_name: stg_users
          tags: [api, user-domain]
          replication_method: incremental
          iterate_column: updated_at
          iterate_column_type: datetime
          dedup_columns: [id, updated_at]

        - name: public.orders
          target_name: stg_orders
          tags: [api, order-domain]
          replication_method: full

Run it:

# Run all tables
mkpipe run

# Run only tables tagged "api"
mkpipe run --tags api

Python API

import mkpipe

# Run all pipelines
mkpipe.run(config="mkpipe_project.yaml")

# Run a specific pipeline
mkpipe.run(config="mkpipe_project.yaml", pipeline="my_pipeline")

# Run a specific table
mkpipe.run(config="mkpipe_project.yaml", table="stg_users")

# Run by tags — runs matching tables across ALL pipelines
mkpipe.run(config="mkpipe_project.yaml", tags=["api"])
mkpipe.run(config="mkpipe_project.yaml", tags=["api", "order-domain"])

# Combine filters: pipeline + tags
mkpipe.run(config="mkpipe_project.yaml", pipeline="my_pipeline", tags=["api"])

# Pass a custom SparkSession (e.g. AWS Glue, EMR, Dataproc)
mkpipe.run(config="mkpipe_project.yaml", spark=my_spark_session)

# Extract only — returns ExtractResult with a Spark DataFrame
result = mkpipe.extract(config="mkpipe_project.yaml", table="stg_users")
df = result.df

# Load only — pass your own DataFrame
mkpipe.load(config="mkpipe_project.yaml", table="stg_users", df=my_df)

CLI Reference

mkpipe run [OPTIONS]
Option Short Description
--config -c Path to config file. Default: mkpipe_project.yaml in current dir
--pipeline -p Run only the named pipeline
--table -t Run only the named table (source name or target name)
--tags Comma-separated tags to filter tables, e.g. --tags api,ingestion

Examples:

# Run everything
mkpipe run

# Specific config file
mkpipe run --config /path/to/config.yaml

# Single pipeline
mkpipe run -p my_pipeline

# Single table
mkpipe run -t stg_users

# By tags (OR logic: any matching tag)
mkpipe run --tags api
mkpipe run --tags api,ingestion

# Combine: pipeline + tags
mkpipe run -p my_pipeline --tags api

Tags

Tags let you group tables by business domain, team, priority, or any criteria. When you pass tags, mkpipe runs all matching tables across all pipelines (OR logic).

pipelines:
  - name: pg_to_pg
    source: source_pg
    destination: target_pg
    tables:
      - name: public.users
        target_name: stg_users
        tags: [api, user-domain, critical]

      - name: public.sessions
        target_name: stg_sessions
        tags: [api, user-domain]

  - name: mysql_to_pg
    source: source_mysql
    destination: target_pg
    tables:
      - name: orders
        target_name: stg_orders
        tags: [api, order-domain, critical]
# Runs stg_users + stg_sessions + stg_orders (all have "api")
mkpipe.run(config="config.yaml", tags=["api"])

# Runs stg_users + stg_orders (both have "critical")
mkpipe.run(config="config.yaml", tags=["critical"])

# Runs stg_users + stg_sessions (both have "user-domain")
mkpipe.run(config="config.yaml", tags=["user-domain"])

# OR logic: runs anything tagged "critical" OR "order-domain"
mkpipe.run(config="config.yaml", tags=["critical", "order-domain"])

YAML Configuration Reference

Top-level Structure

version: 2
default_environment: prod    # which environment block to use

prod:                        # environment name
  settings: ...
  connections: ...
  pipelines: ...

staging:                     # you can define multiple environments
  settings: ...
  connections: ...
  pipelines: ...

Settings

settings:
  timezone: UTC              # Spark session timezone (default: UTC)

  spark:
    master: "local[*]"       # Spark master URL (default: local[*])
    driver_memory: "4g"      # default: auto-detected from system
    executor_memory: "4g"    # default: auto-detected from system
    extra_config:            # any additional Spark config
      spark.sql.shuffle.partitions: "200"
      spark.dynamicAllocation.enabled: "true"

  backend:
    variant: sqlite          # sqlite (default), postgresql, duckdb, clickhouse
    host: localhost
    port: 5432
    database: mkpipe_db
    user: mkpipe
    password: ${BACKEND_PASSWORD}

Connections

connections:
  my_postgres:
    variant: postgresql
    host: ${PG_HOST}
    port: 5432
    database: mydb
    user: ${PG_USER}
    password: ${PG_PASSWORD}
    schema: public

  my_mongodb:
    variant: mongodb
    mongo_uri: ${MONGO_URI}
    database: mydb

  my_s3:
    variant: file
    extra:
      storage: s3
      format: parquet
      path: s3a://my-bucket/data
    aws_access_key: ${AWS_ACCESS_KEY}
    aws_secret_key: ${AWS_SECRET_KEY}
    region: eu-west-1

Environment variables are referenced with ${VAR_NAME} syntax and resolved at load time.

Connection Parameters

Parameter Description
variant Required. Plugin type: postgresql, mysql, mongodb, file, etc.
host Database host
port Database port
database Database name
user Username
password Password
schema Schema name
warehouse Warehouse (Snowflake)
private_key_file Path to private key file (RSA auth)
private_key_file_pwd Private key passphrase
mongo_uri Full MongoDB connection URI
bucket_name S3/GCS bucket name
s3_prefix S3 key prefix
aws_access_key AWS access key
aws_secret_key AWS secret key
region Cloud region
credentials_file Path to credentials file (GCS service account)
api_key API key
oauth_token OAuth token
client_id OAuth client ID
client_secret OAuth client secret
extra Dict of additional options (storage, format, path, etc.)

Pipelines & Tables

pipelines:
  - name: my_pipeline        # unique pipeline name
    source: source_pg         # connection name for extraction
    destination: target_pg    # connection name for loading
    pass_on_error: false      # if true, continue on table failure
    tables:
      - name: public.users
        target_name: stg_users
        tags: [api, user-domain]
        replication_method: incremental
        iterate_column: updated_at
        iterate_column_type: datetime
        partitions_column: id
        partitions_count: 10
        fetchsize: 100000
        batchsize: 10000
        write_partitions: 4
        dedup_columns: [id, updated_at]
        custom_query: "(SELECT id, name, updated_at FROM users {query_filter}) q"
        transform: transforms/clean_users.py::transform
        pass_on_error: false

Table Parameters

Parameter Default Description
name required Source table/collection name
target_name required Destination table name
tags [] List of tags for filtering (--tags api,ingestion)
replication_method full full or incremental
iterate_column None Column for incremental tracking (required if incremental)
iterate_column_type None datetime or int
partitions_column iterate_column Column for Spark JDBC partitioning
partitions_count 10 Number of JDBC read partitions
fetchsize 100000 JDBC fetch size (rows per network round trip)
batchsize 10000 JDBC write batch size
write_partitions None Number of write partitions (coalesce before writing)
dedup_columns None Columns for mkpipe_id hash generation (xxhash64)
custom_query None Custom SQL query with {query_filter} placeholder
custom_query_file None Path to .sql file (relative to sql/ directory)
transform None Transform function reference: path/to/file.py::function
pass_on_error false Continue pipeline on this table's failure

Incremental Replication

mkpipe uses an append-only strategy for incremental replication:

  1. Extract: reads rows where iterate_column >= last_point (inclusive, no boundary loss)
  2. Load: appends to the target table (never overwrites or deletes)
  3. Dedup: if dedup_columns is set, a mkpipe_id (xxhash64 hash) is generated for downstream deduplication

An etl_time timestamp is always added to every row.

- name: public.users
  target_name: stg_users
  replication_method: incremental
  iterate_column: updated_at
  iterate_column_type: datetime
  dedup_columns: [id, updated_at]  # mkpipe_id = xxhash64(id, updated_at)

Downstream dedup query example:

SELECT * FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY mkpipe_id ORDER BY etl_time DESC) AS rn
  FROM stg_users
) WHERE rn = 1

Inline Transformations

Add a transform field to any table:

tables:
  - name: public.products
    target_name: stg_products
    replication_method: full
    transform: transforms/clean_products.py::transform

The transform function receives and returns a PySpark DataFrame:

# transforms/clean_products.py
def transform(df):
    df = df.filter(df.status != "deleted")
    return df

Orchestrator Integration

Dagster

from dagster import asset, Definitions
import mkpipe

@asset
def api_tables():
    mkpipe.run(config="mkpipe_project.yaml", tags=["api"])

@asset
def critical_tables():
    mkpipe.run(config="mkpipe_project.yaml", tags=["critical"])

defs = Definitions(assets=[api_tables, critical_tables])

Airflow

from airflow.decorators import task

@task
def sync_api_tables():
    import mkpipe
    mkpipe.run(config="/path/to/mkpipe_project.yaml", tags=["api"])

@task
def sync_user_domain():
    import mkpipe
    mkpipe.run(config="/path/to/mkpipe_project.yaml", tags=["user-domain"])

Backend (State Tracking)

mkpipe tracks pipeline state (last sync point, status) in a manifest database. Default is SQLite (zero-config). PostgreSQL, DuckDB, and ClickHouse are also supported:

settings:
  backend:
    variant: postgresql
    host: localhost
    port: 5432
    database: mkpipe_db
    user: mkpipe
    password: ${BACKEND_PASSWORD}

Install optional backend dependencies:

pip install mkpipe[postgres-backend]
pip install mkpipe[duckdb-backend]
pip install mkpipe[clickhouse-backend]
pip install mkpipe[all-backends]

Custom Exceptions

mkpipe provides specific exception classes for clean error handling:

from mkpipe import (
    MkpipeError,          # base exception
    ConfigError,          # YAML or configuration issues
    ExtractionError,      # data extraction failures
    LoadError,            # data loading failures
    TransformError,       # transformation failures
    PluginNotFoundError,  # missing plugin
    BackendError,         # backend manifest failures
)

Available Plugins

For the full list, visit the mkpipe-hub.

Extractors

  • PostgreSQL, MySQL, MariaDB, SQL Server, Oracle, SQLite, Redshift, ClickHouse, MongoDB, File (S3/GCS/local), Snowflake

Loaders

  • PostgreSQL, MySQL, MariaDB, SQLite, File (S3/GCS/local), Snowflake, ClickHouse

License

Apache 2.0 — see LICENSE.

Project details


Release history Release notifications | RSS feed

This version

0.7.1

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe-0.7.1.tar.gz (28.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe-0.7.1-py3-none-any.whl (31.7 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe-0.7.1.tar.gz.

File metadata

  • Download URL: mkpipe-0.7.1.tar.gz
  • Upload date:
  • Size: 28.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for mkpipe-0.7.1.tar.gz
Algorithm Hash digest
SHA256 fdcda8acd919dc014c7f3e3e4760dc6258f62b0d59128083ec806340fd2b703a
MD5 39f7442920f692c14e2a2acc3467da71
BLAKE2b-256 48eb3d988c06d5b09f0e8b7d22678426bc92e3477e438c074c26eb43decf61b1

See more details on using hashes here.

File details

Details for the file mkpipe-0.7.1-py3-none-any.whl.

File metadata

  • Download URL: mkpipe-0.7.1-py3-none-any.whl
  • Upload date:
  • Size: 31.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for mkpipe-0.7.1-py3-none-any.whl
Algorithm Hash digest
SHA256 58542a388b7905be8529220ede7cee5a5b1d34e114b1350145c436a2b8270091
MD5 e503f6d82dcea7e75865394c440b5599
BLAKE2b-256 9fd74ae168030a960da8fa3ecf0600a30fc70c3c6314f09384065afab36537e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page