Skip to main content

Move data between databases, files, and streams from Python. Rust + Apache Arrow under the hood.

Project description

███████╗███╗   ███╗ █████╗ ████████╗██╗██╗  ██╗
██╔════╝████╗ ████║██╔══██╗╚══██╔══╝██║╚██╗██╔╝
█████╗  ██╔████╔██║███████║   ██║   ██║ ╚███╔╝
██╔══╝  ██║╚██╔╝██║██╔══██║   ██║   ██║ ██╔██╗
███████╗██║ ╚═╝ ██║██║  ██║   ██║   ██║██╔╝ ██╗
╚══════╝╚═╝     ╚═╝╚═╝  ╚═╝   ╚═╝   ╚═╝╚═╝  ╚═╝

ematix-flow

Declarative Python data pipelines. Rust + Apache Arrow under the hood.

Move data between databases, files, and streams with one decorator. Cron schedules, DAG dependencies, watermarks, schema evolution, restart-safe state, and at-least-once delivery are all built in — no extra scheduler service to deploy.

Project site: ematix.dev.

from ematix_flow import ematix, ManagedTable, Annotated, BigInt, Text, TimestampTZ, pk

# 1. Connection — which database. Credentials redacted in repr();
#    the framework looks this up by name from any pipeline.
@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${WAREHOUSE_URL}"   # resolved from env at run time

# 2. Target table — which schema + which table. Column types are
#    annotated Python; the framework creates / migrates the table
#    on first run.
class Events(ManagedTable):
    __schema__ = "analytics"
    __tablename__ = "events"

    event_id: Annotated[BigInt, pk()]
    name: Text | None
    received_at: TimestampTZ

# 3. Pipeline — source SQL into the target table on a cron.
@ematix.pipeline(
    target=Events,
    target_connection="warehouse",
    schedule="*/5 * * * *",
    mode="append",
)
def ingest_events(conn):
    return "SELECT event_id, name, received_at FROM raw.events"
pip install ematix-flow
flow run-due --module my_pipelines    # cron-style; drop into systemd / cron / k8s CronJob

Why ematix-flow

  • Fast. TPC-H, 22 queries, single Apple M4 Max, measured at three scales through the production preset (fresh context per query, each engine in its own process — no bench-only tricks): ematix-flow takes 22 / 22 at SF=1 (2.35× DuckDB, 3.89× Polars, 17× single-node PySpark), 22 / 22 at SF=10 (1.58× DuckDB), and 14 / 22 at SF=100 (1.18× DuckDB) — head-to-head vs DuckDB it wins 22 / 22 / 14 of 22 in the same isolated protocol (at SF=100, Q8 + Q10 are consistent losses and 6 more sit within run-to-run noise), and leads the geomean at every scale. Full numbers, the losses included, and the reproducer in Benchmarks.
  • Scheduling + DAG, no service to operate. Pipelines carry their own cron schedule and depends_on= edges (with cycle detection and exponential- backoff retries). Run flow run-due from cron, systemd, a k8s CronJob, GitHub Actions, or the bundled long-running scheduler — same code, same topological order, same retry semantics. Already on Airflow / Dagster / Prefect? Call .sync() directly.
  • Batteries included. Postgres, MySQL, SQLite, DuckDB, Snowflake, BigQuery, Redshift, Kafka, RabbitMQ, Kinesis, Pub/Sub, S3, Delta Lake. Schema Registry + Avro / Protobuf. CDC source mode dispatches per-op transactionally to your existing target.
  • Operationally honest. Restart-safe state, watermarks, at-least-once delivery, credential redaction, structured run history, Prometheus + OpenTelemetry metrics, Slack alerts.

Status: v0.11.0 on PyPI as ematix-flow (alpha). All four surfaces — declarative pipelines, multi-backend, streaming, stream processing — are shipped end-to-end, with warehouse pipelines (cron + retries + DAG), Schema Registry (Confluent + AWS Glue), and timezone-aware cron schedules. See CHANGELOG for the release history.


Table of contents

  1. Install
  2. Connections
  3. Backends
  4. Pipelines
  5. Modes
  6. Scheduling
  7. Streaming pipelines
  8. Stream processing
  9. Configuration reference
  10. CLI
  11. Python API
  12. Benchmarks and comparisons
  13. What's shipped
  14. Development
  15. License

Install

pip install ematix-flow

The core install ships every backend, the flow CLI binary, and the run_pipeline / run_streaming_pipeline Python entrypoints.

Optional extras

Extra What it adds Install
df DataFrame interop helpers (polars / pandas) for to_polars() / to_pandas() materialization. pip install "ematix-flow[df]" then pip install polars (or pandas).
spark PySpark interop helpers (to_pyspark() / from_pyspark()). Heavy — pulls in PySpark + its JDBC dependency. pip install "ematix-flow[spark]"
pyarrow Required for the streaming-backend pyclass wrappers (KafkaBackend, KinesisBackend, …) when you want batch-by-batch iteration in Python. pip install pyarrow

The flow binary, run_pipeline, and the typed-Python streaming API work without any extras. To build from source, see Development at the bottom.


Connections

Connections are the first thing to set up. Every pipeline references one or more connections by name; ematix-flow resolves that name through a layered chain so you can keep secrets out of code.

Resolution chain

A connection name like "warehouse" resolves through these sources, highest priority first:

  1. Env var EMATIX_FLOW_DSN_<NAME> — uppercase the name (EMATIX_FLOW_DSN_WAREHOUSE=postgres://...).
  2. Env var EMATIX_FLOW_DSN — only matches the literal name default. Convenient for one-off scripts.
  3. ./.ematix-flow.toml in the current directory.
  4. ~/.ematix-flow/connections.toml for user-wide defaults.
  5. In-process registrationregister_connection(...), @ematix.connection, or inline config.connect(url=...).
flow connections list             # what's resolved + from where
flow connections check warehouse  # connect + report

Declaring connections

Three interchangeable forms — pick whichever fits the workflow.

TOML file

# ~/.ematix-flow/connections.toml
[connections.warehouse]
url = "postgres://user:${WAREHOUSE_PASSWORD}@host/wh"

[connections.kafka_prod]
kind = "kafka"
bootstrap_servers = "${KAFKA_BOOTSTRAP}"
group_id = "ematix-flow"

${VAR} interpolation lets secrets stay out of the file. Bare ${VAR} resolves against os.environ; prefixed forms route to pluggable resolvers — ${vault:secret/db#password} (HashiCorp Vault), ${aws:prod/db#password} (AWS Secrets Manager), ${gcp:db-password} (GCP Secret Manager). Register additional resolvers via ematix_flow.secrets.register_resolver.

@ematix.connection decorator

from ematix_flow import ematix

@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${WAREHOUSE_DSN}"          # ${VAR} interpolates at use time

@ematix.connection
class kafka_prod:
    kind = "kafka"
    bootstrap_servers = "${KAFKA_BOOTSTRAP}"
    group_id = "ematix-flow"
    sasl_plain_username = "${KAFKA_USER}"
    sasl_plain_password = "${KAFKA_PASS}"

After import, warehouse and kafka_prod are typed Connection instances registered in the runtime. Pass them by reference, or look them up by name string.

Typed instance + register_connection

from ematix_flow import PostgresConnection, register_connection

warehouse = register_connection(
    PostgresConnection(name="warehouse", url="postgres://localhost/wh"),
)

Useful when the connection has to be built dynamically (e.g. from an environment-driven dict).

Credential redaction

Every typed connection redacts secrets in repr() by field-name match (password, secret, secret_access_key, anything matching _password, AMQP URL passwords, etc.). Printing a connection in a notebook will not spill credentials.

Schema Registry as a connection

Avro / Protobuf pipelines reference a Schema Registry the same way:

@ematix.connection
class sr_prod:
    kind = "schema_registry"
    url = "${SR_URL}"

@ematix.connection
class kafka_avro:
    kind = "kafka"
    bootstrap_servers = "${KAFKA_BOOTSTRAP}"
    payload_format = "avro"
    schema_registry = "sr_prod"      # name reference

Today's kind = "schema_registry" covers Confluent-style wire format (Confluent SR, Apicurio Confluent-compat). AWS Glue Schema Registry (separate framing: 0x03 header byte + UUID + compression byte) is fully wired in v0.5.0 — use GlueSchemaRegistryConnection(kind = "glue_schema_registry", …), and the Rust Kafka backend dispatches consumer + producer paths to Glue automatically. End-to-end recipe (LocalStack quickstart + production wiring + IAM template) at docs/DEPLOYMENT.md Recipe 10.


Backends

Every source and target lives behind one Backend trait. Switch a pipeline's target by changing one line.

Backend Batch source Streaming source Target DDL planning Strategy executors (append / merge / scd2 / truncate) CDC target
Postgres ✅ (native + COPY BINARY)
MySQL ✅ (ON DUPLICATE KEY)
SQLite
DuckDB
Snowflake n/a append (write_pandas) + truncate + merge (staged MERGE)
BigQuery n/a append (load_table_from_dataframe) + truncate + merge (MERGE INTO from staging)
Redshift n/a append (S3 → COPY) + truncate + merge (MERGE INTO from staging)
Delta Lake (local + S3) n/a ✅ (DataFusion-backed MERGE)
Object stores (Parquet / CSV / ORC / JSONL, local + S3) n/a append + truncate (see Δ.X3)
Kafka n/a append (cross-backend) source role only
RabbitMQ n/a append (cross-backend)
GCP Pub/Sub n/a append (cross-backend)
AWS Kinesis n/a append (cross-backend)

Batch source = readable by @ematix.pipeline (the function returns a SQL string; the framework executes it against the source connection). Streaming source = tailable by flow consume / @ematix.streaming_pipeline (long-running consumer with manual offset commit / ack). Target = writable by either pipeline shape. Cross-backend moves stream Apache Arrow batches end-to-end — same-DB pairs take the INSERT … SELECT fast path automatically.

Streaming source guarantees

  • Manual offset commit / ack. Pipelines call commit_offsets() on the source only after a durable target write — at-least-once end-to-end. Kafka offsets, RabbitMQ basic_ack, Pub/Sub handler acks, and Kinesis per-shard sequence numbers all flow through the same surface.
  • Exactly-once. Kafka producer-side via transactions; consumer-coordinated end-to-end via KafkaToKafkaEosPipeline.
  • DLQ. Both app-level (dead_letter_topic routes failed batch rows to a separate target) and broker-level (RabbitMQ x-dead-letter-exchange, Pub/Sub subscription dead_letter_policy).
  • Schema Registry. Avro and Protobuf decode/encode via Confluent SR or Apicurio. AWS Glue Schema Registry support lands in the next release.

Cross-backend reads + writes

When source and target are on the same database, ematix-flow uses an INSERT … SELECT fast path. When they differ, the framework streams Apache Arrow batches between them — no row-by-row serialization, no intermediate file roundtrip. Switching from Postgres → Postgres to Postgres → Delta Lake is a one-line change.


Pipelines

A pipeline binds a source query to a target table and a load strategy. The minimum surface:

from typing import Annotated
from ematix_flow import ematix, pk
from ematix_flow.types import BigInt, Text, TimestampTZ

@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${WAREHOUSE_DSN}"

@ematix.table(schema="analytics")
class Events:
    event_id: Annotated[BigInt, pk()]
    name: Text | None
    received_at: TimestampTZ

@ematix.pipeline(
    target=Events,
    target_connection="warehouse",
    schedule="*/5 * * * *",
    mode="append",
)
def ingest_events(conn):
    return "SELECT event_id, name, received_at FROM raw.events"

That's the full file. Run it:

flow run --module my_pipelines ingest_events     # one-shot
flow run-due --module my_pipelines               # cron-style
flow status --module my_pipelines                # operator view
flow preview --module my_pipelines ingest_events # what would it do?
flow validate --module my_pipelines ingest_events # EXPLAIN against the DB

Dependencies + retry

Pipelines can declare upstream depends_on and a per-pipeline retry policy. flow run-due honors both: it topologically orders fires, skips downstream work when upstreams fail, and applies exponential backoff between attempts.

@ematix.pipeline(
    target=DailyRollups,
    target_connection="warehouse",
    schedule="0 2 * * *",
    mode="merge",
    depends_on=["ingest_events"],          # must succeed first today
    retry={"max_attempts": 5, "backoff_seconds": 30, "backoff_factor": 2.0},
)
def daily_rollups(conn):
    return "SELECT ... FROM analytics.events GROUP BY ..."

Cycles are detected at module load time. Attempt state survives process restarts when a durable Run history backend is configured.

Tables

@ematix.table declares the target schema. Columns are typed with PEP-593 annotations + markers:

from ematix_flow.normalize import lower, trim, empty_to_null, parse_timestamp
from ematix_flow.types import String

@ematix.table(schema="analytics")
class CustomerDim:
    customer_id: Annotated[BigInt, pk()]                          # primary key
    email: Annotated[String[256] | None, lower(), trim(), empty_to_null()]
    name: Text | None
    updated_at: Annotated[TimestampTZ, parse_timestamp()]

The class name maps to the table name (CustomerDimcustomer_dim); override with @ematix.table(schema=..., name="..."). pk() and natural_key() markers control merge keys (see Modes below). Normalization markers (lower, trim, empty_to_null, parse_timestamp, parse_int, regex_replace, default, derive, raw sql) compile to in-database SQL — no Python row loop.

Source / target wiring

@ematix.pipeline(
    target=CustomerDim,
    source_connection="raw_db",       # if omitted, defaults to target_connection
    target_connection="warehouse",    # required (or set via EMATIX_FLOW_DSN_<NAME>)
    schedule="0 * * * *",
    mode="scd2",
    compare_columns=["email", "name"],
)
def sync_customers(conn):
    # `conn` is the source connection.
    return "SELECT customer_id, email, name, updated_at FROM customers"
Element Where it's configured
Target table schema= on @ematix.table + the class name (snake-cased).
Source query The SQL string returned from the decorated function. Can be a join, subquery, parametrised WHERE, etc.
Connections source_connection= and target_connection= reference names from the Connections registry.
Source/target separation Set both to cross databases. ematix-flow auto-switches between same-DB INSERT … SELECT and cross-DB Arrow streaming.

Two function signatures

def sync(conn):  return "SELECT …"   # dynamic SQL: filters, dates, etc.
def sync():      pass                # static — pair with source_table=

The static form skips boilerplate when the source is a single table:

@ematix.pipeline(
    target=CustomerDim,
    target_connection="warehouse",
    source_table="raw.customers",      # framework synthesizes SELECT
    column_map={"target_col": "source_col"},  # optional rename map
    mode="merge",
    schedule="0 * * * *",
)
def sync_customers():
    pass

Multi-target fan-out

One source query, N targets:

from ematix_flow import Target

@ematix.pipeline(
    targets=[
        Target(table=CustomerDim,    connection="warehouse"),
        Target(table=CustomerArchive, connection="lake"),
    ],
    source_connection="raw_db",
    schedule="0 * * * *",
    mode="merge",
)
def sync_customers(conn):
    return "SELECT customer_id, email, name, updated_at FROM customers"

Each target gets its own strategy executor; failures on one target don't block the others (configurable via continue_on_target_failure=True).


Modes

The load strategy. Set on @ematix.pipeline(mode=...).

append

Insert all source rows. No deduplication, no updates.

@ematix.pipeline(target=Events, mode="append", schedule="*/5 * * * *")
def ingest(conn):
    return "SELECT event_id, name, received_at FROM raw.events"

Use with incremental=True and a watermark_column= to filter to new rows on each run; ematix-flow tracks the last-loaded watermark in ematix_flow.watermarks.

truncate

TRUNCATE (or DELETE FROM) the target, then load. Useful for small reference / dimension tables that get reloaded fully each run.

merge (a.k.a. scd1)

Insert new rows; update existing rows on PK match. Same as upserting on conflict. Compare columns control which non-key fields participate in the update:

@ematix.pipeline(
    target=CustomerDim,
    mode="merge",
    compare_columns=["email", "name"],   # only update if these change
    schedule="0 * * * *",
)
def sync(conn):
    return "SELECT customer_id, email, name FROM raw.customers"

scd2

Slowly-changing dimension type 2. Closes the previous version of each row and inserts a new one when the compare columns change. ematix-flow auto-augments the table with valid_from, valid_to, is_current, row_hash:

@ematix.pipeline(
    target=CustomerDim,
    mode="scd2",
    compare_columns=["email", "name"],
    event_timestamp_column="updated_at",  # use updated_at as valid_from
    ttl_seconds=86_400 * 30,              # auto-close versions older than 30d
    schedule="0 * * * *",
)
def sync(conn):
    return "SELECT customer_id, email, name, updated_at FROM raw.customers"

How merge keys are resolved

For merge and scd2, keys= is optional. Resolution priority:

  1. Explicit keys=("col_a", "col_b") on @ematix.pipeline.
  2. __merge_keys__ = ("col_a", "col_b") class dunder on the table.
  3. First natural_key() group on the table — for SCD2 where the business key differs from the versioned PK.
  4. Columns marked pk().

A UserWarning fires when steps 2 or 3 resolve to keys that differ from pk(). Pass keys= explicitly to silence.

Incremental loads + watermarks

@ematix.pipeline(
    target=Events,
    mode="append",
    incremental=True,
    watermark_column="received_at",    # filter source rows > last watermark
    schedule="*/5 * * * *",
)
def ingest(conn):
    return "SELECT event_id, name, received_at FROM raw.events"

Watermarks live in ematix_flow.watermarks and advance only after a successful target commit — restart-safe.

Pre- and post-load transforms

from ematix_flow.transforms import deduplicate_by, filter_where

@ematix.pipeline(
    target=Events,
    mode="merge",
    transforms_pre=[
        deduplicate_by(keys=("event_id",), order_by="received_at desc"),
        filter_where("name IS NOT NULL"),
    ],
    transforms_post=[
        "ANALYZE analytics.events",         # raw SQL string
        ematix.transform_ref("update_mart"), # named transform
    ],
    continue_on_failure_post=True,
    schedule="*/5 * * * *",
)
def ingest(conn): ...

transforms_pre compile to in-database SQL ahead of the load. transforms_post each run in their own transaction.


Scheduling

Three ways to fire a pipeline.

Cron (schedule= on the decorator)

@ematix.pipeline(target=Events, mode="append", schedule="*/5 * * * *")
def ingest(conn): ...

Then run on any cadence (cron / k8s CronJob / GitHub Actions):

flow run-due --module my_pipelines

run-due fires every pipeline whose schedule expires within the last interval. Idempotent — running it twice in the same window re-fires nothing because watermarks advance only after success.

One-shot

flow run     --module my_pipelines ingest        # run now
flow preview --module my_pipelines ingest        # what would it do? (dry-run, no commit)
flow validate --module my_pipelines ingest       # EXPLAIN against the DB

Programmatic

from my_pipelines import ingest
ingest.sync(keys=("event_id",))     # pipelines expose a `.sync()` method

Useful for tests, notebooks, or wrapping a pipeline inside another orchestrator.

Web UI (flow web)

For visual ops, flow web serves a local SPA over the same RunLog. Lists pipelines + run history, shows per-run task graphs, and exposes one-click restart from failed step / resume from watermark / rerun from beginning / pause / resume on in-flight runs.

pip install "ematix-flow[web]"
flow web --port 8080
# open http://127.0.0.1:8080/

Localhost-only by default. For remote access, pass --token <value> to enable bearer-token auth on every /api/* route (/api/health stays open for load-balancer probes); without a token, binding a non-loopback address logs a warning — SSH tunnel or front with a reverse proxy.

flow web --port 8080 --bind 0.0.0.0 --token "$EMATIX_FLOW_WEB_TOKEN"
# clients send: Authorization: Bearer $EMATIX_FLOW_WEB_TOKEN

The landing view is Pipelines — one card per pipeline with a last-10-execution strip (teal = succeeded, red = failed, amber-pulse = running; a retried-then-succeeded run counts as green). Bar height is proportional to run duration, scaled per-pipeline so variance is visible at a glance — a tall bar means that run took unusually long. The right side shows "Next: <UTC>" for batch pipelines or "LIVE STREAMING" for streaming. Batch cards have a median-duration footer; streaming cards swap that for live throughput (1m / 5m rolling) + average batch cycle, snapshotted from the daemon's /metrics endpoint every ~30s:

Pipelines view — batch median duration vs streaming live throughput

Clicking any square drills into the run-detail page, which renders the task DAG live — solid teal for succeeded, pulsing amber for running, dashed dim teal for pending, solid red for failed:

workflow DAG with parallel branch

Reading left-to-right: extract_orders (done) fans out into two parallel branches — transform_orders (done) and transform_payments (running) — both feeding merge_payments and ultimately load_warehouse. Sibling steps at the same rank stack vertically so parallelism is visible at a glance.

The action buttons map directly to scheduler-loop pickup: clicking Restart from step "merge_payments" writes a requested row to the RunLog with extras["restart_from_step"]; the next scheduler tick claims it and the worker honors EMATIX_FLOW_RESTART_FROM_STEP to resume the DAG from that node — upstream artifacts get reused rather than recomputed.

A third top-level view, DAG, renders the cross-pipeline depends_on graph: each node carries its schedule + timezone + fan-out count, and topological-rank columns guarantee upstreams sit left of their downstreams. Useful for spotting orphan pipelines, unintended cycles, or fan-out hotspots before they bite at scheduler tick time.

More screenshots + walkthrough: ematix.dev/specs/04-web-ui-screenshots.

Run history

Every run lands in the configured RunLog with a run_id, status, row counts, attempt count, error message (if any), and metrics JSON. Inspect via SQL, the backend's tooling, or flow runs list.

RunLog backends

Choose a backend via URL — same form for the CLI flag, the @ematix.connection decorator, and run_due_with_dag_detailed.

Scheme Backend Notes
sqlite://path/to/run_log.db SQLite (default) Single-process; zero config.
memory:// In-memory Tests only; lost on exit.
postgres://user:pw@host/db Postgres Multi-host; auto-creates ematix_flow schema unless create_tables=false.
mysql://user:pw@host/db MySQL Same shape as Postgres.
duckdb://path/to/run_log.duckdb DuckDB Single-file analytical store.
s3://bucket/prefix?region=... S3 (AWS) JSONL append; good for serverless.
azureblob://account/container/prefix Azure Blob Append-block log.
gcs://bucket/prefix GCS JSONL append.
flow run-due --module my_pipelines \
    --run-log postgres://flow:pw@logdb/flow_history \
    --alerter slack://hooks.slack.com/services/... \
    --metrics prometheus://:9100

When the configured RunLog location is unwritable (lambda read-only FS, missing credentials), flow warns and continues — orchestration stays alive even with the durable-history layer down.

Alerters

--alerter <url> (repeatable) attaches one or more sinks for failure / recovery events.

Scheme Effect
stdout:// Human-readable lines on stderr.
slack://hooks.slack.com/services/... Posts to a Slack incoming webhook.
email://user:pass@host:port?from=…&to=…&starttls=1 Sends email via stdlib smtplib (default port 587 STARTTLS / 465 implicit SSL).
pagerduty://<routing_key>?service=…&severity=… PagerDuty Events API v2 trigger/resolve; dedup_key="<service>:<pipeline>" auto-resolves on recovery.

Buggy alerters are fault-isolated: any exception is logged and swallowed, never crashes the orchestrator.

Metrics sinks

--metrics <url> exports per-pipeline run counts, durations, and current attempt state.

Scheme Effect
null:// Drop everything (default).
stdout:// Pretty-print on flush.
memory:// In-process counters; readable from Python.
prometheus://:9100 /metrics endpoint on the given port.
otlp://collector:4318 OTel HTTP exporter.

Full operator-deployment recipes (per environment, with example URLs and the right pyproject extras): docs/DEPLOYMENT.md.


Streaming pipelines

A long-running consumer that drains a source and writes batches to a target with manual at-least-once offset commits, Prometheus metrics, and exponential-backoff supervised restart.

TOML form

# pipeline.toml
pipeline_name = "events-to-pg"
source_query  = "events"
idle_pause_ms = 500

[source]
kind = "kafka"
bootstrap_servers = "localhost:9092"
group_id = "ematix-flow"

[target]
kind = "postgres"
url  = "postgres://localhost/mydb"

[target.table]
schema = "public"
name   = "events"

Run from Python:

from ematix_flow import run_pipeline
run_pipeline(config="pipeline.toml", metrics_port=9100)

Or from the CLI binary:

flow consume pipeline.toml \
    --metrics-port 9100 \
    --restart-on-error \
    --max-backoff-ms 30000

Typed-Python form (decorator, no TOML)

# my_pipelines.py
from ematix_flow import ematix

@ematix.connection
class kafka_prod:
    kind = "kafka"
    bootstrap_servers = "${KAFKA_BOOTSTRAP}"
    group_id = "ematix-flow"

@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${WAREHOUSE_DSN}"

@ematix.streaming_pipeline(
    name="events_to_pg",
    source=kafka_prod,
    source_query="events",
    target=warehouse,
    target_table=("public", "events"),
)
def events_to_pg():
    pass
flow consume      --module my_pipelines events_to_pg --metrics-port 9100
flow consume-list --module my_pipelines              # show registered pipelines

The framework renders the equivalent TOML internally and hands it to the same Rust runtime — same at-least-once guarantees, same metrics, same supervisor.

Multi-target fan-out

from ematix_flow import Target, ObjectStoreS3Connection

@ematix.connection
class lake:
    kind = "object_store_s3"
    endpoint = "https://s3.amazonaws.com"
    bucket = "ematix-archive"
    region = "us-east-1"
    access_key_id = "${AWS_ACCESS_KEY_ID}"
    secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
    format = "parquet"

@ematix.streaming_pipeline(
    name="events_fanout",
    source=kafka_prod,
    source_query="events",
    targets=[
        Target(connection=warehouse, table=("public", "events")),
        Target(connection=lake, prefix="events/raw", parquet_compression="zstd"),
    ],
)
def events_fanout(): pass

Each target gets its own write path; failures on one don't stop the others (configurable).


Stream processing

Stateful transforms layered onto a streaming pipeline.

SQL transforms

Mid-stream SELECT via DataFusion — filter / project / cast / lookup-join. Static lookups load from any backend at startup; refresh_interval_ms per lookup runs a background refresh task.

run_streaming_pipeline(
    name="events-clean",
    source=kafka_prod, source_query="events",
    target=warehouse,  target_table=("public", "events_clean"),
    transform_sql="""
      SELECT s.user_id, s._event_ts, u.tier
      FROM source s
      LEFT JOIN users u ON u.user_id = s.user_id
    """,
    lookups={"users": Lookup(connection=warehouse,
                             query="SELECT user_id, tier FROM dim_users",
                             refresh_interval_ms=60_000)},
)

Aggregating many source rows into one JSON-shaped target row

When the target schema isn't 1:1 with the source — e.g. an option_chain_snapshots(minute, strikes_json) table where every row carries a JSON array of N contracts for that minute — DataFusion's array_agg(named_struct(...)) does the pivot inside a single transform_sql pass. The framework's Postgres write path serializes Arrow List<Struct<...>> straight into a JSONB column, so no staging table or two-stage pipeline is needed.

run_streaming_pipeline(
    name="option-chain-snapshots",
    source=s3,                                # CSV files under a watched prefix
    source_query="raw/options/",
    target=warehouse,                         # postgres
    target_table=("marketdata", "option_chain_snapshots"),
    transform_sql="""
      SELECT
        date_trunc('minute', ts) AS minute,
        array_agg(named_struct(
          'strike', strike,
          'bid',    bid,
          'ask',    ask
        )) AS strikes_json
      FROM source
      WHERE underlying = 'SPXW' AND days_to_expiry = 0
      GROUP BY 1
    """,
)

The mirror table:

CREATE TABLE marketdata.option_chain_snapshots (
  minute        TIMESTAMPTZ NOT NULL,
  strikes_json  JSONB NOT NULL  -- [{"strike": 4500, "bid": 1.25, "ask": 1.30}, ...]
);

Same shape works for MERGE / SCD2 / Truncate strategies via the strategy executor — the JSONB column round-trips like any other type.

For functions DataFusion's stdlib doesn't cover (cumulative-normal CDF, custom hashing, day-count conventions), register a Python scalar UDF — covered in the next section.

Python UDFs (@ematix_flow.udf)

Wrap a Python callable as a DataFusion scalar UDF and call it from transform_sql. Per-batch dispatch through PyArrow zero-copy: one GIL acquisition + PyArrow round-trip per batch (typically thousands of rows), so vectorised numpy / pyarrow.compute inside the callable amortises the overhead.

import math

import numpy as np
import pyarrow as pa

from ematix_flow import run_streaming_pipeline, udf


@udf(args=("Float64", "Float64", "Float64", "Float64", "Float64"),
     returns="Float64")
def bs_call_delta(strike, spot, vol, rate, expiry):
    # All five inputs arrive as PyArrow Float64 Arrays; convert
    # once, do the math vectorised, ship a PyArrow Array back.
    k = strike.to_numpy(zero_copy_only=False)
    s = spot.to_numpy(zero_copy_only=False)
    v = vol.to_numpy(zero_copy_only=False)
    r = rate.to_numpy(zero_copy_only=False)
    t = expiry.to_numpy(zero_copy_only=False)
    d1 = (np.log(s / k) + (r + 0.5 * v * v) * t) / (v * np.sqrt(t))
    cdf = 0.5 * (1.0 + np.vectorize(math.erf)(d1 / np.sqrt(2)))
    return pa.array(cdf, type=pa.float64())


run_streaming_pipeline(
    name="option-chain-with-greeks",
    source=s3, source_query="raw/options/",
    target=warehouse, target_table=("marketdata", "option_chain_snapshots"),
    transform_sql="""
        SELECT
          date_trunc('minute', ts) AS minute,
          array_agg(named_struct(
            'strike', strike,
            'bid',    bid,
            'ask',    ask,
            'delta',  bs_call_delta(strike, spot, vol, rate, expiry)
          )) AS strikes_json
        FROM source
        GROUP BY 1
    """,
    udfs=[bs_call_delta],
)

Argument and return types are DataFusion DataType strings — "Int64", "Float64", "Utf8", "Boolean", etc. Mismatched call sites surface at plan-compile time as DataFusion type errors. Two UDFs registering under the same name is a config-load error — no silent shadowing.

Python aggregate UDFs (@ematix_flow.udaf)

For per-group reductions DataFusion's stdlib doesn't ship (VWAP, custom percentiles, distinct-by-cardinality), decorate a Python class with @udaf and pass it as aggregate_udfs=. The class must expose four methods — update_batch, merge_batch, evaluate, state — mirroring DataFusion's Accumulator trait. PyArrow zero-copy on the inputs, length-1 PyArrow Arrays on the outputs.

import pyarrow as pa
import pyarrow.compute as pc

from ematix_flow import run_streaming_pipeline, udaf


@udaf(args=("Float64", "Float64"),
      state=("Float64", "Float64"),
      returns="Float64", name="vwap")
class Vwap:
    def __init__(self):
        self.num = 0.0
        self.den = 0.0

    def update_batch(self, prices, qtys):
        self.num += pc.sum(pc.multiply(prices, qtys)).as_py() or 0.0
        self.den += pc.sum(qtys).as_py() or 0.0

    def merge_batch(self, num_states, den_states):
        self.num += pc.sum(num_states).as_py() or 0.0
        self.den += pc.sum(den_states).as_py() or 0.0

    def evaluate(self):
        if self.den == 0:
            return pa.array([None], type=pa.float64())
        return pa.array([self.num / self.den], type=pa.float64())

    def state(self):
        return (pa.array([self.num], type=pa.float64()),
                pa.array([self.den], type=pa.float64()))


run_streaming_pipeline(
    name="vwap-per-minute",
    source=kafka_quotes, source_query="quotes",
    target=warehouse, target_table=("marketdata", "vwap_per_minute"),
    transform_sql="""
      SELECT date_trunc('minute', ts) AS minute,
             vwap(price, qty)          AS vwap
      FROM source GROUP BY 1
    """,
    aggregate_udfs=[Vwap],
)

Per-batch dispatch (one GIL acquisition + PyArrow round-trip per batch, accumulator instance survives across batches within a group), vectorise with pyarrow.compute or numpy inside the methods. See the user guide for the full state-shape contract + the pure-Rust escape hatch when GIL contention dominates.

Tumbling / hopping windows

Nine aggregators including HLL+ approximate count_distinct. late_data = "drop" | "reopen" | "dlq", idle-tick emission, per-window max_groups_per_window fail-loud cap.

from ematix_flow import Window, Aggregation

run_streaming_pipeline(
    name="events-per-min",
    source=kafka_prod, source_query="events",
    target=warehouse,  target_table=("public", "events_per_min"),
    transform_sql="SELECT user_id, _event_ts FROM source",
    window=Window(
        kind="tumbling",
        duration_ms=60_000,
        group_by=("user_id",),
        max_groups_per_window=1_000_000,
        aggregations=[Aggregation(agg="count", as_="n")],
    ),
)

Session windows

Gap-based per-key sessions with mandatory max_session_duration_ms hard cap. Out-of-order session merging under Reopen. Mandatory StateStore for restart safety.

from ematix_flow import StateStore

run_streaming_pipeline(
    name="user-sessions",
    source=kafka_prod, source_query="events",
    target=warehouse,  target_table=("public", "user_sessions"),
    transform_sql="SELECT user_id, page, _event_ts FROM source",
    window=Window(
        kind="session",
        gap_ms=300_000,                       # 5-min idle = session boundary
        max_session_duration_ms=86_400_000,   # 24h hard cap
        group_by=("user_id",),
        max_groups_per_window=1_000_000,
        aggregations=[
            Aggregation(agg="count", as_="events"),
            Aggregation(agg="first", column="page", as_="entry_page"),
            Aggregation(agg="last",  column="page", as_="exit_page"),
        ],
    ),
    state_store=StateStore(kind="postgres", url="postgres://localhost/ematix_state"),
)

Stream-stream joins

Keyed time-windowed inner / outer joins. Per-side per-key buffers, watermark-driven retention, retained-buffer reopen for late matches.

from ematix_flow import Join, Source

run_streaming_pipeline(
    name="orders-payments",
    sources=[
        Source(connection=kafka_prod, query="orders"),
        Source(connection=kafka_prod, query="payments"),
    ],
    target=warehouse,
    target_table=("public", "orders_with_payments"),
    join=Join(
        left_source="orders",
        right_source="payments",
        left_keys=("order_id",),
        right_keys=("order_id",),
        time_window_ms=300_000,                # ±5 min symmetric
        # kind="left_outer",                   # or "right_outer" / "full_outer"
        # min_delta_ms=-60_000, max_delta_ms=300_000,  # asymmetric
        # late_data="reopen", allowed_lateness_ms=10_000,
    ),
    state_store=StateStore(kind="postgres", url="postgres://localhost/ematix_state"),
)

Change-data-capture

Apply Debezium / Maxwell / custom CDC envelopes to a target table with proper per-op semantics (insert on c, update on u, delete or soft-delete on d):

from ematix_flow import CDC

@ematix.streaming_pipeline(
    name="mirror_customers",
    source=kafka_prod,
    source_query="dbserver1.public.customers",
    target=customer_mirror,
    target_connection=warehouse,
    cdc=CDC(envelope="debezium"),
)
def mirror_customers(): pass

Per-PK idempotency gate (ematix_flow.cdc_idempotency) keeps Kafka redeliveries from double-applying. Schema-evolution policy (Skip / Fail) controls drift behaviour. Hard or soft delete (delete_mode="soft", soft_delete_column="deleted_at"). Supported targets: Postgres, Delta Lake (local + S3), DuckDB, SQLite, MySQL.

Raw object stores (S3 / GCS / Azure Parquet, CSV, JSON) are intentionally not CDC targets — those file formats are immutable, so per-event UPDATE / DELETE has no clean shape. For CDC into object storage, use DeltaS3Backend: Delta sits on top of the same object stores and gives you transactional MERGE for free. The append-only "event log" pattern (one row per change event, materialise current state downstream) remains buildable ad-hoc with the existing transform stage + ObjectStore target.

End-to-end demos:

Distributed batch SQL (opt-in)

Set engine = "distributed" + peers = [...] and the SQL fans out across a peer-to-peer mesh of flow-worker processes via Apache Arrow Flight. mTLS for the mesh, cross-pod lookup broadcast, no separate cluster service. SQL dialect translator (Spark / DuckDB → DataFusion) makes existing queries portable without rewrites.

Peer auto-detection. peers = [...] accepts three schemes:

  • http://host:port — static, unchanged.
  • dns://host:port — resolves the A-record at startup and expands to every IP behind the name (good for headless services).
  • k8s://service.namespace:port — sugar for dns://service.namespace.svc.cluster.local:port.

Mix and match freely. Static peers added by the user are kept as-is; auto-detected peers are appended on top.

Engine default — engine = "auto". When unspecified (or set to "auto"), the runtime picks distributed if peers expands to ≥1 URL, otherwise falls back to in-process with an info log. Existing engine = "single" / engine = "distributed" selections are honored verbatim.


Configuration reference

Selected knobs that don't fit any single section above. Every knob is reachable from typed Python; most also have a TOML equivalent.

Watermark behaviour

from ematix_flow import Watermark

run_streaming_pipeline(
    ...,
    watermark=Watermark(
        lateness_ms=5_000,           # how late an event can arrive
        source_idleness_ms=120_000,  # advance watermark when source is idle
    ),
)

Per-batch error policy

run_streaming_pipeline(
    ...,
    transform_on_error="dlq",         # "fail" (default) | "drop" | "dlq"
    dead_letter_topic="events-failed",
)

Object-store file formats

Object-store backends support parquet, csv, json_lines, and orc. The format is set on the connection and steers both the reader (schema inference + decode) and the writer (per-format encoder).

from ematix_flow.connections import format_from_path

format_from_path("s3://bucket/year=2026/events.csv.gz")  # → "csv"
format_from_path("logs.ndjson")                          # → "json_lines"
format_from_path("data.parquet")                         # → "parquet"

Recognized extensions cover .parquet / .pq, .csv / .tsv, .json / .jsonl / .ndjson, and .orc; the matcher strips .gz / .bz2 / .zst / .lz4 / .snappy first.

Object-store write options

Target(
    connection=lake,
    prefix="events/raw",
    parquet_compression="zstd",       # or "snappy" / "gzip" / "uncompressed"
)

Target(
    connection=lake,
    prefix="events/csv",
    csv_delimiter=";",                # single ASCII char
    csv_header=False,
    csv_quote="'",                    # Π.4e — defaults to `"`
    csv_escape="\\",                  # defaults to doubled-quote
    csv_null_value="\\N",             # how null cells render on write
)

Object-store read options

CSV decode and JSON-lines decode honor a matching set of options. Schema inference and the row decoder both see the same settings, so a file written with one dialect can be read back with the same one.

Target(
    connection=lake,
    prefix="events/csv",
    csv_read_options={
        "has_header": True,
        "delimiter": ",",
        "quote": '"',
        "escape": "\\",
        "comment": "#",
        "null_regex": r"^(NA|NULL|\\N)$",  # in addition to empty string
        "truncated_rows_ok": False,
        "schema_infer_max_records": 4096,
    },
    json_read_options={
        "schema_infer_max_records": 4096,
        "batch_size": 8192,
    },
)

The typed-Python boundary catches mis-shaped combos (e.g. setting parquet_compression on a CSV target, or csv_read_options on a Parquet target) before TOML round-trip.

State store

Every stateful transform (sessions, joins) requires a StateStore:

StateStore(
    kind="postgres",
    url="postgres://localhost/ematix_state",
    checkpoint_interval_ms=60_000,    # periodic dirty-state flush cadence
)

In-memory mode is for tests only. Pipeline config-load emits a loud warning if kind="in_memory" is paired with a stateful transform.

Schema Registry + payload format

@ematix.connection
class kafka_avro:
    kind = "kafka"
    bootstrap_servers = "${KAFKA_BOOTSTRAP}"
    payload_format = "avro"           # or "protobuf" / "json"
    schema_registry = "sr_prod"       # name reference

Connection introspection

flow connections list
flow connections check warehouse
flow connections set warehouse url=postgres://...

CLI

flow list                # registered pipelines
flow run <name>          # one-shot
flow run-due             # cron-style fire of all due pipelines (DAG-aware)
flow scheduler --executor <url>   # long-running daemon: claim pipelines, dispatch to workers
flow status              # operator view: per-pipeline status / next-due / attempts
flow preview <name>      # dry-run, no commit
flow validate <name>     # EXPLAIN against the target
flow runs list           # recent runs from the configured RunLog
flow logs <run_id>       # tail captured stdout/stderr for a run (opt-in capture: EMATIX_FLOW_CAPTURE_LOGS=1)
flow connections list / check / set
flow transform list / run

flow init <dir>          # scaffold a runnable starter project (pipelines.py + connections.toml + Dockerfile + flow.service)
flow doctor              # connection-health probes per kind (pg / kafka / glue / s3 / snowflake / …); non-zero exit on any failure
flow secrets test [path] # resolve every ${…} in connections.toml + report per-secret OK/missing/error (no values printed)

flow consume <toml>      # streaming daemon (TOML form)
flow consume --module my_pipelines <name>   # typed-Python form
flow consume-list --module my_pipelines     # registered streaming pipelines

flow run-due is the cron-tick model (external scheduler fires the process each minute). flow scheduler is the central-daemon model: one long-running process holds a leader lease, walks the DAG every --poll-interval seconds, and hands eligible pipelines to an Executor (subprocess://, k8s://<ns>?image=..., lambda://<fn>). See USER_GUIDE.md § Central scheduler and DEPLOYMENT.md recipe 8.

--module points at any importable Python module.

Observability flags (work on run-due, status, runs list):

--run-log <url>     # any RunLog scheme (sqlite/postgres/mysql/duckdb/s3/azureblob/gcs/memory)
--alerter <url>    # repeatable; stdout:// / slack://... / email://... / pagerduty://...
--metrics  <url>   # null:// / stdout:// / memory:// / prometheus://:port / otlp://endpoint
--traces   <url>   # OTEL spans for every pipeline run: otel://stdout / otel+otlp+grpc://… / otel+otlp+http://…

Streaming-daemon flags: --metrics-port exposes Prometheus metrics; --restart-on-error --max-backoff-ms enables the supervised-restart loop.


Python API

For when you want to bypass the pipeline orchestration and use a streaming backend directly — e.g. in a notebook for ad-hoc exploration:

from ematix_flow._core import KafkaBackend

backend = KafkaBackend.open(
    "localhost:9092",
    group_id="ematix-flow",
    payload_format="avro",
    schema_registry_url="http://localhost:8081",
    sasl_plain_username="alice",
    sasl_plain_password="secret",
)
backend.ping()

for batch in backend.iter_arrow_stream("events"):
    process(batch)               # batch is pyarrow.RecordBatch

backend.commit_offsets()         # at-least-once: ack only after success

The same shape works for RabbitMQBackend, PubSubBackend, and KinesisBackend (each in ematix_flow._core).

For polars / pandas / pyspark interop after a pipeline run, see the Install extras and the docs/USER_GUIDE.md walkthrough.


Benchmarks and comparisons

TPC-H — SF=1 / SF=10 / SF=100, all 22 queries

Five engines, the same Apple M4 Max, the same Parquet files (2026-06-18 refresh). ematix-flow numbers are production-faithful: the harness (examples/tpch_preset_rebench.rs) builds a fresh SessionContext per measurement through the shipped preset — no plan cache, no context reuse. ematix-flow and DuckDB are each timed in their own process (medians of 5 trials after 2 warmups); at SF=100 that isolation is essential — the ~30 GB working set exceeds the 36 GB box, so co-running both engines in one process makes them contend for RAM and understates whichever uses more memory. Polars runs the in-process harness (Q14/Q15 re-measured 2026-06-11, other columns carried from 2026-06-10 / 2026-05-29-31); PySpark is local[*] on the JVM; Postgres 14 is EXPLAIN ANALYZE Execution Time.

Scale wins vs DuckDB DuckDB geomean vs Polars vs PySpark vs Postgres
SF=1 (~1 GB, in-cache) 22 / 22 2.35× 3.89× 16.8× 15.5×
SF=10 (~10 GB) 22 / 22 1.58× 3.72× 11.1× 20.2×
SF=100 (~100 GB) 14 / 22 1.18× 4.70× 7.9× 62× †

The Polars / PySpark / Postgres columns are geomean of competitor ÷ ematix-flow across the 22 queries (Polars n=21 at SF=10 / n=17 at SF=100; Postgres ran 6 / 22 at SF=100 under a 90 s cap) — carried from earlier runs, not co-measured today. The wins vs DuckDB column is head-to-head in the same isolated protocol — 22 / 22 / 14 of 22. The queries ematix-flow doesn't win are named, not hidden: at SF=1 and SF=10 it wins every query vs DuckDB (Q05 flipped via the #159 transitive-dim-semi splice — the 22nd SF=10 win); at SF=100, across 3 independent sweeps DuckDB consistently takes Q10 (~0.73×) and Q8 (~0.95×) — the wide-string / high-RSS queries — and 6 more (Q3/Q5/Q9/Q16/Q18/Q21) swing ±10-30% on cache/thermal state and aren't individually decidable. ematix-flow leads the DuckDB geomean at every scale — 2.35× (SF=1), 1.58× (SF=10), 1.18× (SF=100). Full tables, tabbed by scale, are on the docs site: ematix.dev/reference/benchmarks. SF=1 in full (the README's headline scale):

Query ematix-flow DuckDB Polars PySpark Postgres
Q01 18.2 47.4 38.6 167 411
Q02 7.8 18.3 47.9 190 123
Q03 14.2 33.5 46.5 257 163
Q04 11.7 22.6 23.8 184 94.6
Q05 17.1 32.0 8,949 335 226
Q06 1.9 12.8 10.5 41.5 218
Q07 26.5 33.1 118 260 1,262
Q08 14.0 39.3 96.7 182 99.7
Q09 18.8 55.3 47.5 583 820
Q10 28.0 60.9 111 362 355
Q11 6.0 10.1 8.9 119 36.3
Q12 15.0 24.8 19.2 269 361
Q13 10.4 142 118 684 871
Q14 11.5 22.0 12.3 114 69.8
Q15 10.9 13.8 11.5 127 140
Q16 10.0 21.7 21.2 205 113
Q17 15.8 25.6 39.0 233 398
Q18 19.0 44.9 56.6 560 1,154
Q19 18.0 35.3 105 86.0 31.9
Q20 13.1 29.0 22.4 106 147
Q21 34.8 76.5 721 628 609
Q22 8.7 21.1 13.6 354 24.1

Median ms, fastest per row in bold; ematix-flow + DuckDB measured in isolated passes through the production preset (fresh SessionContext per query). Polars runs hand-translated .polars.sql variants where its planner rejects the canonical shape (semantically identical); its Q05 SF=1 outlier (~9 s) is a known planner blowup. ematix-flow takes all 22 at SF=1, with the biggest margins on the join-heavy queries (Q06 7.1×, Q13 14×, Q18 2.4×) where the fused-aggregate + push-LeftSemi + runtime-bloom rules do the work.

Full methodology, per-engine reproducers, and the SF=10 / SF=100 tables are in docs/BENCHMARKS.md.

How the speedups land

Two layers do the work, both transparent at the SQL level:

  1. EmatixFastParquetTableProvider — the default parquet scan, backed by ematix-parquet 0.16.3 (a hand-rolled decoder we ship as a sibling crate). Adds row-group-parallel decode, parallel per-page Snappy decompression, dict-preserved Utf8View, buffer-reuse on the hot path, and small-bit-width NEON+AVX2 SIMD kernels for RLE-dictionary indices. Replaces DataFusion's parquet-rs scan in-place — register_dict_aware_parquet(ctx, name, path) is the whole API.

  2. Physical-optimiser rules — pattern-match canonical aggregate shapes on the default plan tree and rewrite the matching subtree to a specialised operator over the scan. No exec construction at the user's level — SessionContext::sql(...) is the whole API.

    Rule Shape it matches Replaces with
    InjectFilterMultiAggRule Aggregate(Final/Partial) → Projection(CSE) → Filter → scan with a small-cardinality group-by Template-specialised FilterMultiAggSpec (perfect-hash agg over typed-slice predicate eval)
    InjectFilterSumRule Aggregate(Final, sum/sum-product) → CoalescePartitions → Aggregate(Partial) → Filter(N-AND chain) → scan FusedFilterSumExec
    EnableDictGroupCountRule count(*) group-by on a dictionary-encoded string column DictGroupCountExec (counts directly over dict keys)

    Each rule fires on shape — no query-specific gating. When anything diverges (different aggregate, wrong column types, missing filter, extra wrappers we don't recognise) the rule passes the node through unchanged.

Register them like any other physical-optimiser rule:

use ematix_flow_core::dict_aggregate_rule::EnableDictGroupCountRule;
use ematix_flow_core::ematix_fast_parquet::EmatixFastParquetTableProvider;
use ematix_flow_core::fused_aggregate_filter_multi_agg_rule::InjectFilterMultiAggRule;
use ematix_flow_core::fused_aggregate_filter_sum_rule::InjectFilterSumRule;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use std::sync::Arc;

let state = SessionStateBuilder::new()
    .with_config(SessionConfig::new().with_target_partitions(14))
    .with_default_features()
    .with_physical_optimizer_rule(Arc::new(EnableDictGroupCountRule))
    .with_physical_optimizer_rule(Arc::new(InjectFilterMultiAggRule))
    .with_physical_optimizer_rule(Arc::new(InjectFilterSumRule))
    .build();
let ctx = SessionContext::new_with_state(state);
for t in ["lineitem", "part" /* … */] {
    let prov = EmatixFastParquetTableProvider::try_new(format!("{t}.parquet"))?;
    ctx.register_table(t, Arc::new(prov))?;
}
// SQL goes through the fast scan and the matching rule fires
// automatically. No per-query code.

Reproduce the bench with:

cargo run --release -p ematix-flow-core \
    --example tpch_triangulation_bench --features triangulation

22/22 PASS on TPC-H SF=1; 103/103 PASS on the canonical Apache Spark TPC-DS plan-time audit (Spark dialect → DataFusion via the built-in translator).

Distributed batch SQL across multiple ematix-flow processes is available via the bundled flow-worker peer mesh. Cross-host scaling claims are honestly framed as deferred — there's no cluster hardware in this project's runway.

How it compares

If you're running… …ematix-flow replaces…
One-off pandas / SQL scripts Adds correctness guarantees (watermarks, atomic state, schema evolution) without the operational weight of Airflow + Spark.
Airflow + dbt Handles the load logic and streaming sources without a scheduler tier. Cron / k8s CronJob / GitHub Actions all fire flow run-due — no Airflow worker, no scheduler stub, no DAG plumbing.
Kafka Connect + Debezium + custom sinks First-class CDC source mode dispatches per-op transactionally to your existing target. No separate connector tier to operate.
PySpark Structured Streaming (single-node) Same SQL surface (DataFusion + Spark dialect translator). No cluster manager, no JVM startup tax, no driver/executor split — for single-node workloads the operational weight gap is the bigger win than any one query's wall-clock.
Polars read_* + custom load logic Comparable per-query performance on shared workloads, plus the load tier on top — watermarks, atomic state, schema evolution, multi-target fan-out, CDC sources, streaming.

What's shipped

All four surfaces are stable. See docs/ROADMAP.md for the consolidated "what's left" punch list (release polish, deferred-feature extensions, open design questions).

Test surface (workspace-wide, every PR)

  • 459 Rust core unit tests
  • 124 Rust CLI unit tests + 27 backend-config scaffold round-trip tests across all 10 backends + distributed + TLS config
  • ~80 Rust testcontainers integration tests (Docker-gated)
  • 622 default Python tests + ~196 testcontainers-gated Python tests
  • 22-query TPC-H audit: 22/22 PASS at SF=1
  • 103-query TPC-DS Spark-dialect audit: 103/103 PASS plan-time

clippy + fmt clean on stable Rust; cargo audit green against RustSec; ruff + bandit + pip-audit green on the Python side.

Phase plans (for the curious)

Full step-by-step walkthrough: docs/USER_GUIDE.md. → Operator deployment recipes: docs/DEPLOYMENT.md. → Runnable examples: examples/ (one per strategy + streaming + windowed + session + join + CDC). → Cutting a release: docs/RELEASE.md.


Development

# Build the Rust workspace (core + CLI + Python extension crate)
cargo build --release

# Build + install the Python extension into a venv
python -m venv .venv && source .venv/bin/activate
pip install maturin
maturin develop --release

# The flow consume binary is built into target/release/flow
target/release/flow --help

# Run tests
make test                                 # both fast suites (no Docker)
make test-integration                     # Docker-gated; auto-cleans testcontainers after

# Or run them directly:
cargo test --workspace --lib              # default (no Docker)
cargo test --workspace -- --ignored       # Docker integration tests
                                          # (Kafka, RabbitMQ, Pub/Sub
                                          # emulator, Kinesis via
                                          # LocalStack, MinIO,
                                          # Schema Registry, etc.)

pytest                                    # default Python suite
pytest -m integration                     # full integration (Docker)
pytest -m spark                           # opt-in Spark E2E

# If a test run is SIGKILL'd or OOM'd before testcontainers' Drop
# fires, leaked containers + volumes can accumulate. The Makefile
# target prunes by label and won't touch unrelated containers:
make clean-testcontainers

See make help for the full target list (fmt / lint / security).


License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ematix_flow-0.12.0.tar.gz (2.1 MB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

ematix_flow-0.12.0-cp314-cp314-manylinux_2_28_x86_64.whl (71.0 MB view details)

Uploaded CPython 3.14manylinux: glibc 2.28+ x86-64

ematix_flow-0.12.0-cp314-cp314-macosx_11_0_arm64.whl (61.7 MB view details)

Uploaded CPython 3.14macOS 11.0+ ARM64

ematix_flow-0.12.0-cp313-cp313-manylinux_2_28_x86_64.whl (71.0 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.28+ x86-64

ematix_flow-0.12.0-cp313-cp313-macosx_11_0_arm64.whl (61.7 MB view details)

Uploaded CPython 3.13macOS 11.0+ ARM64

ematix_flow-0.12.0-cp312-cp312-manylinux_2_28_x86_64.whl (71.0 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.28+ x86-64

ematix_flow-0.12.0-cp312-cp312-macosx_11_0_arm64.whl (61.7 MB view details)

Uploaded CPython 3.12macOS 11.0+ ARM64

ematix_flow-0.12.0-cp311-cp311-manylinux_2_28_x86_64.whl (71.0 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.28+ x86-64

ematix_flow-0.12.0-cp311-cp311-macosx_11_0_arm64.whl (61.7 MB view details)

Uploaded CPython 3.11macOS 11.0+ ARM64

File details

Details for the file ematix_flow-0.12.0.tar.gz.

File metadata

  • Download URL: ematix_flow-0.12.0.tar.gz
  • Upload date:
  • Size: 2.1 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ematix_flow-0.12.0.tar.gz
Algorithm Hash digest
SHA256 af2b8a3e1de4bf67500fc86ba732d043fa576004cd396195e2a74a7192491a51
MD5 936aca2b6019077ab8bb8ae0ca9f6666
BLAKE2b-256 d20f5c6d0bb5d2ea9a2399ef248aceceaf813bf793f1e1032de245664d4cd4b4

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0.tar.gz:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp314-cp314-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp314-cp314-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 a18137a338412ed811d697e2ac297d69bf9d7dc4ec9ad19a45a9a93b63995da5
MD5 c3c3caa24f6b968dd12936468fcad3aa
BLAKE2b-256 2a090cf6992a1bf55b7d08a33abccd8e54773511ae772c75b6e2b1b393872764

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp314-cp314-manylinux_2_28_x86_64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp314-cp314-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp314-cp314-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 1062e096cabfae56c93cf25a1e71e2a4e494a3b553b8cfcc9fef00616f9c3ddb
MD5 3aec35b2bcde4ed26de34e06f41404f7
BLAKE2b-256 c78c8a8c374744d983a6a44eb292a68675927e1bf6b30efb7cc87d7803f07e97

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp314-cp314-macosx_11_0_arm64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp313-cp313-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp313-cp313-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 61da4f9bfbd25bac24c93470169285591c771551c06e24c2e64e84b52c9a6a83
MD5 e47111ec471a07b3faaccc190a87a523
BLAKE2b-256 19a65a27e5c7828bccc714cac26fc90f4ee2935613046d8314cb93faf4046ee6

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp313-cp313-manylinux_2_28_x86_64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp313-cp313-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp313-cp313-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 1a2c5b2fb290a3e74e2f704747789a086d8e79787cb2d3c0a3ef95c9af03b992
MD5 31e85ae1ec591930390eb4013e0248b7
BLAKE2b-256 7cda6e39f512e30ae16e7854693b1c943ce1305a19c0748b9b8d139e8df00456

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp313-cp313-macosx_11_0_arm64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp312-cp312-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp312-cp312-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 4825e390a3d2b4b8bfaaf704c1b22f0727c7e14fa3dd2bc462398da8c64ccd57
MD5 7d3d49392ceadd698f3098896603a78d
BLAKE2b-256 e0664d36ac4b6353ac9b5fa30ce65118d8232387e3abd10058e2969340868b25

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp312-cp312-manylinux_2_28_x86_64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp312-cp312-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp312-cp312-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 79f2efc3277349e7a22ba246545091318daf1952edef869ccf0d91bfadf445a2
MD5 b67d773eabd50eb1326991a01dfc60c4
BLAKE2b-256 700848a43ea7f58e9851a950f4ede53f129aae76a2c02aefc239b57c1e2b6db1

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp312-cp312-macosx_11_0_arm64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp311-cp311-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp311-cp311-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 2475c5413b680101b80a056612b3a8479ef437f7201de6ddb121eba869988437
MD5 3872f578909a2030ea669e7b67bcea16
BLAKE2b-256 d7b427ba962e21da9eb5c4d822b4a320227ad051fad180cc753cc2353e79fd98

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp311-cp311-manylinux_2_28_x86_64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.12.0-cp311-cp311-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.12.0-cp311-cp311-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 6d3fcb22c86b353457efe7a60d203fbf58b8ce690ad6e931c58971cd6a3e2968
MD5 a53ca2a3895924549c0b62d3e2e61c97
BLAKE2b-256 91bb6e21c194d784562b71a66a7a83c12d444ac0ab8207379e4f443e40036a52

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.12.0-cp311-cp311-macosx_11_0_arm64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page