Skip to main content

Declarative ETL/ELT + streaming pipelines on a Rust + Apache Arrow core. Append / merge / SCD2 strategies across SQL databases (Postgres, MySQL, SQLite, DuckDB), object stores + Delta Lake (Parquet, CSV, JSON, ORC), and streaming sources (Kafka, RabbitMQ, Pub/Sub, Kinesis) — one decorator-driven API. 5.87× faster than single-node PySpark on TPC-H.

Project description

ematix-flow

Declarative ETL/ELT + streaming pipelines, multi-backend, on a Rust + Apache Arrow core. A single Python decorator declares a target table; another declares the load. Append, truncate, merge, and SCD2 strategies run against SQL databases, object stores, Delta Lake, or live streams without rewriting the pipeline. 5.87× faster than PySpark local[*] geomean across all 22 TPC-H queries at SF=1 — see docs/BENCHMARKS.md for the full method and per-query numbers.

Status: v0.1.0 on PyPI as ematix-flow (2026-05-05). All four surfaces below are shipped. See docs/ROADMAP.md for what's deferred.

Why ematix-flow

  • Declarative, not framework boilerplate. @ematix.table + @ematix.pipeline is the whole API surface for a typical pipeline. Schemas, primary keys, compare columns, SCD2 event-time, watermarks — all PEP-593 type-annotated. No DAG plumbing, no scheduler stub, no per-source connector wiring.

  • One binary, one dependency tree. Distributed as a Rust library with Python bindings (~150 MB image including all 10 backends). No JVM, no separate scheduler/executor processes, no cluster service to operate. Distributed batch SQL is opt-in via a peer-to-peer worker mesh — not a top-down cluster manager you have to deploy.

  • Multi-backend, write once. SQL databases (Postgres, MySQL, SQLite, DuckDB), object stores + Delta Lake (Parquet, CSV, JSON, ORC — local FS or S3), and streaming sources (Kafka, RabbitMQ, GCP Pub/Sub, AWS Kinesis) all live behind one Backend trait. Switch a pipeline's target by changing one line — either the decorator's target_connection= argument or, if you prefer config-as-data, a TOML field. Both surfaces compile to the same backend descriptor and run the identical Rust execution path; pick whichever fits the workflow. The SQL stays the same.

  • Correct by default. Watermarks restart-safe via row-level advance-after-commit. Stateful streaming windows and joins serialize their per-key state to a durable StateStore (Postgres or in-memory) with atomic state + offset commits on every emit. Manual-ack at-least-once across all four streaming sources; Kafka exactly-once via transactions.

  • Faster than single-node PySpark, with a real distributed story when you need it. SF=1 TPC-H (M3 Pro, 22 queries): DataFusion via ematix-flow beats PySpark local[*] by a geomean of 5.87× (range 1.78× to 16.74×). At SF=10 on the representative set: 3.3× geomean. Distributed batch SQL across ematix-flow processes is available today via the bundled flow-worker peer mesh; cross-host scaling claims are honestly framed as deferred (no cluster hardware in this project's runway — see BENCHMARKS.md).

What it is

Four complementary surfaces in one repo, all sharing a Rust + Apache Arrow core. Data moves through Arrow record batches end- to-end — no row-by-row serialization, no JVM hop, no intermediate file roundtrip.

  1. Declarative table management (Phases 0–25). Decorator- driven schemas with PEP-593 type annotations + normalization markers (trim, lower, parse_timestamp, regex_replace, ...), SCD2 with event-time, run history, watermarks, post- load transforms, polars / pandas / pyspark interop, ML feature store. The original v0.1 scope.

  2. Multi-backend pipelines (Phases 30–38). Source from any of 4 streaming backends, write to any of 10 storage backends. Manual offset commits, app-level + broker-level dead-letter patterns, Confluent Schema Registry-aware Avro/Protobuf, long-running flow consume daemon with Prometheus metrics + supervised restart, flow consume --module typed-Python pipeline registry.

  3. Stream processing (Phase 39). DataFusion-backed mid- stream SQL transforms (filter / project / cast / lookup- join), tumbling / hopping / session windows with watermark- driven emit + late-data handling (drop / reopen / dlq), keyed time-windowed stream-stream joins (inner + outer + retained-buffer reopen for late matches). Per-key state persists to a durable StateStore with atomic state+offset commits across crashes.

  4. Distributed batch SQL (Σ.B). Optional peer-to-peer distributed execution via the bundled flow-worker binary; set [transform] engine = "distributed" + peers = [...] and the SQL fans out across processes via Apache Arrow Flight. mTLS for the worker mesh, cross-pod lookup broadcast, no separate cluster service. SQL dialect translator (Spark / DuckDB → DataFusion) makes existing queries portable without rewrites — 103/103 PASS on the canonical Apache Spark TPC-DS suite.

Test surface (workspace-wide, every PR):

  • 459 Rust core unit tests
  • 124 Rust CLI unit tests + 27 backend-config scaffold round- trip tests across all 10 backends + distributed + TLS config
  • ~80 Rust testcontainers integration tests (Docker-gated)
  • 376 default Python tests + ~196 testcontainers-gated Python tests
  • 22-query TPC-H audit: 22/22 PASS at SF=1
  • 103-query TPC-DS Spark-dialect audit: 103/103 PASS plan-time

clippy + fmt clean on stable Rust; cargo audit green against RustSec; ruff + bandit + pip-audit green on the Python side.

→ For a step-by-step walkthrough of every surface, see docs/USER_GUIDE.md.

→ For runnable examples (one per strategy + streaming + windowed + session + join), see examples/.

→ For what's left to ship, see docs/ROADMAP.md.

→ For cutting a release, see docs/RELEASE.md.

Quickstart 1: declarative Postgres pipeline (v0.1)

from typing import Annotated
from ematix_flow import ematix, pk, register_connection
from ematix_flow.types import BigInt, String, Text, TimestampTZ
from ematix_flow.normalize import lower, trim, empty_to_null, parse_timestamp

# Connection: declared in code via the `@ematix.connection` class-
# body decorator. `${VAR}` interpolates from the environment at
# backend-build time (so changing the env between definition and
# run picks up the new value). `repr()` redacts secrets.
# Alternatives — env-var only, project TOML file, or
# `register_connection(PostgresConnection(...))` — are documented
# below. All three feed the same registry.
@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${EMATIX_FLOW_DSN}"

@ematix.table(schema="analytics")
class CustomerDim:
    customer_id: Annotated[BigInt, pk()]    # ← keys live on the table
    email: Annotated[String[256] | None, lower(), trim(), empty_to_null()]
    name: Text | None
    updated_at: Annotated[TimestampTZ, parse_timestamp()]

@ematix.pipeline(
    target=CustomerDim,
    target_connection="warehouse",   # ← references the @ematix.connection above
    schedule="0 * * * *",
    mode="scd2",
    compare_columns=["email", "name"],
    # `keys=` omitted — the pipeline infers ["customer_id"] from
    # `pk()` on the table. Override only if your merge keys
    # differ from the declared primary key.
)
def sync_customers(conn):
    return "SELECT customer_id, email, name, updated_at FROM raw.customers"

Where does the data come from / go to?

In the example above:

Element What Where it's configured
The target table analytics.customer_dim schema= on @ematix.table + the class name (snake-cased: CustomerDimcustomer_dim). Override the table name with @ematix.table(schema=..., name="...").
The source table raw.customers The SQL string returned from sync_customers(conn). Could equally well be a join, a subquery with filters, etc.
The database The connection named warehouse The @ematix.connection block at the top of the file registers it; target_connection="warehouse" on @ematix.pipeline references it by name. The conn parameter passed to sync_customers is the resolved source connection (defaults to the same as target unless source_connection= is set).

target_connection= is omittable. When omitted, the framework looks up the connection literally named default (which the env var EMATIX_FLOW_DSN populates) — handy for one-off scripts, but the explicit form above keeps the wiring obvious in code review.

By default, source and target use the same connection (same DB → same-DB fast path: INSERT … SELECT). To cross databases, name them explicitly:

@ematix.pipeline(
    target=CustomerDim,
    schedule="0 * * * *",
    mode="scd2",
    compare_columns=["email", "name"],
    source_connection="raw_db",       # ← named connection
    target_connection="warehouse",    # ← named connection
)
def sync_customers(conn):
    # `conn` here is the SOURCE connection (raw_db).
    return "SELECT customer_id, email, name, updated_at FROM customers"

When source_connection != target_connection, the framework switches to the cross-DB Arrow path: read source rows as Arrow batches, stream them into the target. No COPY BINARY shortcut, but no row-by-row INSERT either.

Configuring connections

A connection name like "raw_db" resolves through this chain (highest priority first):

  1. Env var EMATIX_FLOW_DSN_RAW_DB (uppercased), e.g. EMATIX_FLOW_DSN_RAW_DB=postgres://user:pw@host/db.
  2. Env var EMATIX_FLOW_DSN — only for the connection literally named default.
  3. Project file ./.ematix-flow.toml in the working directory:
    [connections.raw_db]
    url = "postgres://user:${RAW_DB_PASSWORD}@host/raw"
    
    [connections.warehouse]
    url = "postgres://${WAREHOUSE_DSN}"
    
  4. User file ~/.ematix-flow/connections.toml (same shape).
  5. Inline config.connect(url=...) as a low-level escape hatch.

TOML values support ${VAR} env-var interpolation, so secrets can stay out of files. Inspect what got picked with flow connections list / flow connections check warehouse.

Declaring connections in code (decorator)

Beyond env vars and TOML files, connections can also live in your Python module — useful when you want the pipeline definition and its DB handle in the same file, or when you want IDE autocomplete on connection fields. Two interchangeable shapes:

from ematix_flow import (
    ematix,                     # the decorator namespace
    PostgresConnection,
    KafkaConnection,
    SchemaRegistryConnection,
    register_connection,
)

# 1. Decorator form — class body declares the fields, the decorator
#    builds + registers a typed connection instance under the class
#    name. Module-level `warehouse` is now a PostgresConnection
#    instance, registered as "warehouse" in the runtime registry.
@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${WAREHOUSE_DSN}"     # ${VAR} interpolates at use time

@ematix.connection
class kafka_prod:
    kind = "kafka"
    bootstrap_servers = "${KAFKA_BOOTSTRAP}"
    group_id = "ematix-flow"
    sasl_plain_username = "${KAFKA_USER}"
    sasl_plain_password = "${KAFKA_PASS}"

@ematix.connection
class sr_prod:
    kind = "schema_registry"
    url = "${SR_URL}"

# 2. Instance form — same effect; useful when the connection has to
#    be built dynamically (e.g. from an environment-driven dict).
warehouse_2 = register_connection(
    PostgresConnection(name="warehouse_2", url="postgres://localhost/wh"),
)

# Pass the instance directly to a pipeline:
@ematix.pipeline(
    target=CustomerDim,
    schedule="0 * * * *",
    mode="scd2",
    target_connection="warehouse",   # name reference; resolves through registry
)
def sync_customers(conn):
    return "SELECT customer_id, email, name, updated_at FROM raw.customers"

Credentials redact in repr() by field-name match (password, secret, secret_access_key, anything containing _password, AMQP URL passwords, etc.) — printing a connection in a notebook won't spill secrets. The same @ematix.connection shape supports every typed connection: KafkaConnection, RabbitMQConnection, PubSubConnection, KinesisConnection, SchemaRegistryConnection, PostgresConnection, MySQLConnection, SQLiteConnection, DuckDBConnection, DeltaLocalConnection, DeltaS3Connection, ObjectStoreLocalConnection, ObjectStoreS3Connection.

Schema Registry as a connection (Π.1). KafkaConnection accepts either an inline schema_registry_url="..." shorthand or a schema_registry=sr_prod reference (instance or registered name) for Avro/Protobuf pipelines, so SR config lives in the same credential-redacting registry as everything else.

Two function signatures

The pipeline-decorated function can take 0 or 1 args:

Signature When to use What conn is
def sync(conn): return "SELECT …" Source SQL needs computed dynamically (filters, dates, etc.) The source connection (the active _core.Connection).
def sync(): pass Static source. Pair with source_table="raw.customers" on the decorator and an optional column_map={"target_col": "source_col", ...}. n/a.

The static form lets you skip writing SELECT * boilerplate:

@ematix.pipeline(
    target=CustomerDim,
    schedule="0 * * * *",
    mode="scd2",
    source_table="raw.customers",  # framework synthesizes SELECT
    compare_columns=["email", "name"],
)
def sync_customers():
    pass

How merge keys are resolved

For merge and scd2 pipelines, keys= is optional. The decorator picks them in this priority order, falling through on absence:

  1. Explicit keys=("col_a", "col_b") on @ematix.pipeline / pipeline.sync(keys=...) — highest priority, silences any warnings.
  2. __merge_keys__ = ("col_a", "col_b") class dunder on the target — useful when the merge key isn't the primary key.
  3. First natural_key() group on the table — for SCD2 where the business key (e.g. customer_id) is distinct from the versioned primary key (e.g. (customer_id, valid_from)).
  4. Columns marked pk() — the default in the example above.

When 2 or 3 resolve to keys that differ from pk(), the pipeline emits a UserWarning so you know what got picked. Pass explicit keys= to silence.

For SCD2 specifically, the natural pattern is to leave the table PK as the business key (customer_id here) — the framework augments the table with valid_from / valid_to / is_current / row_hash columns and merges on customer_id. The PK becomes (customer_id, valid_from) after augmentation. You don't need to think about that unless you're hand-rolling DDL.

natural_key() is for the orthogonal case where you have a non-PK column that should also be UNIQUE (e.g. email), or where you want SCD2 to key off something other than the declared pk() — see help(natural_key).

Fired from cron / k8s CronJob / GitHub Actions:

flow run-due --module my_pipelines           # fires schedules in last interval
flow run     --module my_pipelines sync_customers  # one-shot
flow preview --module my_pipelines sync_customers  # what would it do?
flow validate --module my_pipelines sync_customers # EXPLAIN against the DB

Quickstart 2: streaming pipeline (post-v0.1)

A long-running consumer that drains a Kafka topic and writes batches to Postgres, with manual at-least-once offset commits, Prometheus metrics on :9100, and exponential-backoff restart on error:

1. Write a TOML config:

# pipeline.toml
pipeline_name = "events-to-pg"
source_query = "events"
idle_pause_ms = 500

[source]
kind = "kafka"
bootstrap_servers = "localhost:9092"
group_id = "ematix-flow"

[target]
kind = "postgres"
url = "postgres://localhost/mydb"

[target.table]
schema = "public"
name = "events"

2. Run from Python:

from ematix_flow import run_pipeline

run_pipeline(config="pipeline.toml", metrics_port=9100)

3. Or run from the Rust binary (build from source for now — the binary is named flow so it shadows the Python CLI; we plan to namespace this in a future cleanup):

cargo run --release --bin flow -- consume pipeline.toml \
    --metrics-port 9100 \
    --restart-on-error \
    --max-backoff-ms 30000

Or skip the TOML entirely (Π.3 typed-Python form)

@ematix.streaming_pipeline declares the pipeline alongside its connections, then flow consume --module my_pipelines events_to_pg loads the module, looks the pipeline up by name, and runs it. No TOML round-trip in user code:

# my_pipelines.py
from ematix_flow import ematix

@ematix.connection
class kafka_prod:
    kind = "kafka"
    bootstrap_servers = "${KAFKA_BOOTSTRAP}"
    group_id = "ematix-flow"

@ematix.connection
class warehouse:
    kind = "postgres"
    url = "${WAREHOUSE_DSN}"

@ematix.streaming_pipeline(
    name="events_to_pg",
    source=kafka_prod,
    source_query="events",
    target=warehouse,
    target_table=("public", "events"),
)
def events_to_pg():
    pass
flow consume      --module my_pipelines events_to_pg --metrics-port 9100
flow consume-list --module my_pipelines

The framework renders the equivalent TOML internally and hands it to the same Rust runtime the TOML form uses — same at-least-once guarantees, same Prometheus metrics, same --restart-on-error.

Quickstart 3: stream processing (windows + sessions + joins)

Phase 39 layers stateful transforms onto the streaming pipeline. The canonical shapes:

Tumbling window aggregation (count events per user per minute):

from ematix_flow import (
    Aggregation, Window, run_streaming_pipeline,
    KafkaConnection, PostgresConnection,
)

run_streaming_pipeline(
    name="events-per-min",
    source=KafkaConnection(name="src", bootstrap_servers="localhost:9092",
                           group_id="ematix-flow"),
    source_query="events",
    target=PostgresConnection(name="warehouse", url="postgres://localhost/wh"),
    target_table=("public", "events_per_min"),
    transform_sql="SELECT user_id, _event_ts FROM source",
    window=Window(
        kind="tumbling",
        duration_ms=60_000,
        group_by=("user_id",),
        max_groups_per_window=1_000_000,
        aggregations=[Aggregation(agg="count", as_="n")],
    ),
)

Session window (per-user activity sessions, 5-minute idle gap):

from ematix_flow import StateStore  # new in 39.5a

run_streaming_pipeline(
    name="user-sessions",
    source=KafkaConnection(name="src", bootstrap_servers="localhost:9092",
                           group_id="ematix-flow"),
    source_query="events",
    target=PostgresConnection(name="warehouse", url="postgres://localhost/wh"),
    target_table=("public", "user_sessions"),
    transform_sql="SELECT user_id, page, _event_ts FROM source",
    window=Window(
        kind="session",
        gap_ms=300_000,                   # 5 min idle = session boundary
        max_session_duration_ms=86_400_000,  # 24h hard cap
        group_by=("user_id",),
        max_groups_per_window=1_000_000,
        aggregations=[
            Aggregation(agg="count", as_="events"),
            Aggregation(agg="first", column="page", as_="entry_page"),
            Aggregation(agg="last", column="page", as_="exit_page"),
        ],
    ),
    # State persistence is mandatory for sessions — Postgres-backed
    # `StateStore` handles atomic per-emit state + Kafka offsets
    # commits. Restart-safe out of the box.
    state_store=StateStore(
        kind="postgres",
        url="postgres://localhost/ematix_state",
    ),
)

Stream-stream join (orders + payments within a 5-minute window). Driven from typed Python via sources=[Source(...)]:

from ematix_flow import Join, Source

run_streaming_pipeline(
    name="orders-payments",
    sources=[
        Source(connection=KafkaConnection(name="orders_k",   bootstrap_servers="localhost:9092", group_id="ematix-flow"),
               query="orders"),
        Source(connection=KafkaConnection(name="payments_k", bootstrap_servers="localhost:9092", group_id="ematix-flow"),
               query="payments"),
    ],
    target=PostgresConnection(name="warehouse", url="postgres://localhost/wh"),
    target_table=("public", "orders_with_payments"),
    join=Join(
        left_source="orders",
        right_source="payments",
        left_keys=("order_id",),
        right_keys=("order_id",),
        time_window_ms=300_000,                # ±5 min symmetric window
    ),
    state_store=StateStore(kind="postgres", url="postgres://localhost/ematix_state"),
)

LEFT / RIGHT / FULL outer joins (Join(... kind="left_outer")), late_data="reopen" for retained-buffer late-row matching, and asymmetric time windows (min_delta_ms / max_delta_ms) all work through the same shape.

Advanced knobs (Π.1)

Every advanced streaming knob is now drivable from typed Python — no TOML required:

from ematix_flow import Watermark

run_streaming_pipeline(
    name="events-clean",
    source=kafka_prod, source_query="events",
    target=warehouse, target_table=("public", "events"),
    transform_sql="SELECT user_id, payload, _event_ts FROM source",
    # Π.1: per-batch error policy. "fail" (default) | "drop" | "dlq".
    transform_on_error="dlq",
    dead_letter_topic="events-failed",
    # Π.1: tune per-source watermark slack + idleness without TOML.
    watermark=Watermark(lateness_ms=5_000, source_idleness_ms=120_000),
)

Object-store target with compression (Π.1.4)

from ematix_flow import ObjectStoreS3Connection, Target

lake = ObjectStoreS3Connection(
    name="lake",
    endpoint="https://s3.amazonaws.com",
    bucket="ematix-archive",
    region="us-east-1",
    access_key_id="${AWS_ACCESS_KEY_ID}",
    secret_access_key="${AWS_SECRET_ACCESS_KEY}",
    format="parquet",
)

run_streaming_pipeline(
    name="events_archive",
    source=kafka_prod, source_query="events",
    targets=[
        Target(
            connection=lake,
            prefix="events/raw",
            parquet_compression="zstd",   # or "snappy" / "gzip" / "uncompressed"
        ),
    ],
)

CSV targets accept csv_delimiter=";" and csv_header=False. The typed-Python boundary catches misconfigurations early (e.g. setting parquet_compression on a CSV target raises before TOML round-trip).

Full walkthroughs for each shape (windows, sessions, joins) — including late-data semantics, recovery behavior on restart, and the Prometheus metrics emitted — live in docs/USER_GUIDE.md.

Backend matrix

Backend Source Target DDL planning Strategy executors (append/merge/scd2/truncate)
Postgres ✅ (native + COPY BINARY)
MySQL ✅ (native, ON DUPLICATE KEY)
SQLite
DuckDB
Delta Lake (local + S3) n/a ✅ (DataFusion-backed MERGE)
Object stores (parquet / csv / orc / jsonl, local + S3) n/a append + truncate
Kafka n/a append (cross-backend)
RabbitMQ n/a append (cross-backend)
GCP Pub/Sub n/a append (cross-backend)
AWS Kinesis n/a append (cross-backend)

Streaming-source semantics:

  • Manual offset commit / ack — pipelines call commit_offsets() on the source only after a durable target write, giving at-least-once. Mirrors Kafka offset commits, RabbitMQ basic_ack, Pub/Sub handler acks, Kinesis committed_sequence_number per-shard.
  • DLQ — both app-level (StreamingPipeline.dead_letter_topic, routes failed batch rows to a separate target) and broker-level (RabbitMQ nack_pending(requeue=False) + x-dead-letter-exchange, Pub/Sub nack_pending + subscription dead_letter_policy).
  • Schema Registry — Avro decode/encode (Phase 36h.3/.4) and Protobuf decode/encode (Phase 36h.5/.6) via Confluent SR or Apicurio. Validated against a live emulator container.
  • Exactly-once — Kafka producer-side via transactions (Phase 36j); consumer-coordinated end-to-end via KafkaToKafkaEosPipeline (Phase 36j.2).

Python API: streaming backends from a notebook

from ematix_flow._core import KafkaBackend
import pyarrow as pa

backend = KafkaBackend.open(
    "localhost:9092",
    group_id="ematix-flow",
    payload_format="avro",
    schema_registry_url="http://localhost:8081",
    sasl_plain_username="alice",
    sasl_plain_password="secret",
)
backend.ping()

# Lazy iterator — yields one batch at a time, no list materialization.
for batch in backend.iter_arrow_stream("events"):
    process(batch)  # batch is pyarrow.RecordBatch

backend.commit_offsets()  # at-least-once: ack only after success

The same pattern works for RabbitMQBackend, PubSubBackend, KinesisBackend (each in ematix_flow._core).

What's in it

v0.1 (declarative Postgres) — stable

  • Strategies: append, truncate, merge / scd1, scd2 (with optional event-time valid_from and TTL expiry).
  • Cross-DB: same-DB short-circuit + COPY BINARY staging path; auto- detected, force-overrideable.
  • Watermarks + run history: lazy ematix_flow.run_history, watermarks tables. Restart-safe.
  • Declarative API: @ematix.table / @ematix.pipeline / pk() / natural_key() / PEP 593 Annotated markers.
  • Normalization markers (trim, lower, empty_to_null, parse_timestamp, default, parse_int, regex_replace, derive, raw sql) + pipeline-level transforms_pre=[deduplicate_by(...), filter_where(...), ...]. All compile to in-database SQL.
  • Post-load transforms: transforms_post=[sql_string, callable, ematix.transform_ref("name")]. Each runs in own tx with optional continue_on_failure_post.
  • DataFrame interop: pip install ematix-flow[df] → polars or pandas. Spark interop: pip install ematix-flow[spark].
  • ML feature store: @ematix.feature_view, PIT helpers, online materialized view, training-set builder.
  • CLI: flow list / run / run-due / preview / dry-run / validate / transform list / transform run / connections {list, check, set}.
  • Connections: env vars (EMATIX_FLOW_DSN_<NAME>) + ~/.ematix-flow/connections.toml.

Post-v0.1 (multi-backend + streaming) — stable

  • DB backends (Phases 31-33): MySQL, SQLite, DuckDB — same strategy executor surface as Postgres; cross-DB Arrow streaming bridge between any pair.
  • Object stores (Phase 34): Parquet / CSV / ORC / JSONL on local FS or S3 (via MinIO in tests). Append + truncate.
  • Delta Lake (Phase 35): local FS or S3. DataFusion-backed MERGE.
  • Streaming (Phases 36-37): Kafka (with SASL/PLAIN, SASL/SCRAM, mTLS, AWS MSK IAM), RabbitMQ, GCP Pub/Sub, AWS Kinesis. Manual ack, DLQ patterns, Schema Registry.
  • CLI (Phase 38): flow consume <toml> long-running daemon with --metrics-port (Prometheus /metrics) and --restart-on-error (exponential-backoff supervisor).
  • Python streaming bindings (Phases Py.1-Py.6): run_pipeline in-process runner; pyclass wrappers for each streaming backend with PyArrow record-batch IO; sync iterator (ArrowBatchIter) for lazy batch consumption.

Stream processing (Phase 39) — recently shipped

  • SQL transforms (39.1–39.3): mid-stream SELECT via DataFusion. Filter / project / cast / lookup-join. Static lookups loaded from any DB backend at startup; refresh_interval_ms per lookup runs a background refresh task with atomic MemTable swap.
  • Tumbling + hopping windows (39.4): 9 aggregators including HLL+ approximate count_distinct. late_data = "drop" and "reopen" (with allowed_lateness_ms retention + re-emit on dirty). Idle-tick emission. Per-window max_groups_per_window fail-loud cap. Multi-source min-with-idleness watermark.
  • Session windows + durable StateStore (39.5a): gap-based per-key sessions with mandatory max_session_duration_ms hard cap; out-of-order session merging under Reopen; Postgres- or in-memory-backed StateStore with postcard wire format and forward-only state-version migrations; per-emit atomic state+offsets commit; seek_to on Kafka source for crash-safe resume. Each pipeline rehydrates per-key session state on startup via StateStore::load.
  • Stream-stream join (39.5b): keyed time-windowed inner join. Two [[sources]] (left + right) with per-side per-key buffers and watermark-driven retention; emit on every match within time_window_ms. Reuses the 39.5a StateStore (side-prefixed keys, postcard BufferedRow blobs). Per-source BatchContext::source_id routes batches to the correct side.

Recent additions (Π.1 / Π.3 / Π.1.4)

  • SchemaRegistryConnection (Π.1): SR config lives in the typed-connection registry, redacted in repr(), resolvable by name. KafkaConnection.schema_registry=sr_prod for typed reference; the legacy inline schema_registry_url=... shorthand still works.
  • Kafka SR + payload_format plumbed through the streaming TOML emitter: the Avro / Protobuf path is now usable end-to-end via run_streaming_pipeline and @ematix.streaming_pipeline (was silently dropped before).
  • flow consume --module my_pipelines <name> (Π.3): load streaming pipelines from a Python module instead of TOML; the @ematix.streaming_pipeline decorator registers by name; the CLI imports the module, renders the equivalent TOML internally, and hands off to the same Rust runner. flow consume-list --module my_pipelines lists registered pipelines.
  • Watermark(lateness_ms=, source_idleness_ms=) (Π.1): tune watermark slack + per-source idleness from the typed-Python surface (was hardcoded to defaults). Maps to a [watermark] TOML block on the runner side.
  • transform_on_error = "fail" | "drop" | "dlq" (Π.1): per-batch error policy on run_streaming_pipeline / @ematix.streaming_pipeline (DLQ reuses the existing dead_letter_topic plumbing).
  • Object-store per-format write options (Π.1.4): Parquet compression (uncompressed | snappy | gzip | zstd), CSV delimiter, CSV header on Target — picks up production-grade compression without leaving Python. The typed-Python boundary rejects mis-shaped combos (e.g. parquet_compression on a CSV target) before TOML round-trip.

Install

# Core
pip install ematix-flow

# DataFrame helpers (polars or pandas, plus psycopg2)
pip install "ematix-flow[df]"
pip install polars            # or pandas

# Spark helpers (heavy: pulls in pyspark + JVM JDBC requirement)
pip install "ematix-flow[spark]"

# PyArrow (required for the streaming-backend pyclasses)
pip install pyarrow

The streaming backends, the flow consume binary, and the run_pipeline Python entrypoint are all part of the core install — no extras needed.

Development

# Build the Rust workspace (core + CLI + Python extension crate)
cargo build --release

# Build + install the Python extension into a venv
python -m venv .venv && source .venv/bin/activate
pip install maturin
maturin develop --release

# The flow consume binary is built into target/release/flow
target/release/flow --help

# Run tests
cargo test --workspace --lib              # default (no Docker)
cargo test --workspace -- --ignored       # Docker integration tests
                                          # (Kafka, RabbitMQ, Pub/Sub
                                          # emulator, Kinesis via
                                          # LocalStack, MinIO,
                                          # Schema Registry, etc.)

pytest                                    # default Python suite
pytest -m integration                     # full integration (Docker)
pytest -m spark                           # opt-in Spark E2E

Roadmap

Phases 0–14 (v0.1), 15–38 (multi-backend + streaming), Py.1–Py.6 (Python streaming bindings), and 39.1–39.5b (SQL transforms, windows, sessions, stream-stream join) are all shipped.

See docs/ROADMAP.md for the consolidated "what's left" punch list (release polish, deferred-feature extensions, open design questions).

Phase plans:

Deferred design docs (capture both the design and the "why we haven't built it"):

  • docs/UNIFIED_PIPELINE_API.md — consolidating the v0.1 decorator and streaming TOML onto one declaration surface. Design only.
  • docs/ICEBERG_PLAN.md — Iceberg backend. Deferred because iceberg-rust 0.x still pins arrow 57 vs our arrow 58. Delta covers the use case today.

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ematix_flow-0.1.2.tar.gz (611.8 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

ematix_flow-0.1.2-cp313-cp313-manylinux_2_28_x86_64.whl (66.6 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.28+ x86-64

ematix_flow-0.1.2-cp313-cp313-macosx_11_0_arm64.whl (57.8 MB view details)

Uploaded CPython 3.13macOS 11.0+ ARM64

ematix_flow-0.1.2-cp312-cp312-manylinux_2_28_x86_64.whl (66.6 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.28+ x86-64

ematix_flow-0.1.2-cp312-cp312-macosx_11_0_arm64.whl (57.8 MB view details)

Uploaded CPython 3.12macOS 11.0+ ARM64

ematix_flow-0.1.2-cp311-cp311-manylinux_2_28_x86_64.whl (66.6 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.28+ x86-64

ematix_flow-0.1.2-cp311-cp311-macosx_11_0_arm64.whl (57.8 MB view details)

Uploaded CPython 3.11macOS 11.0+ ARM64

File details

Details for the file ematix_flow-0.1.2.tar.gz.

File metadata

  • Download URL: ematix_flow-0.1.2.tar.gz
  • Upload date:
  • Size: 611.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ematix_flow-0.1.2.tar.gz
Algorithm Hash digest
SHA256 38e3d602639342b066db1e826db729ae05ed10d772a4cf4bf04a8674f64d9cb7
MD5 dcdb1078ae0776b3224d09026c345edd
BLAKE2b-256 c53b6cd2e76746d13200f080665f9b1cad5d7d9b3550e447d6a4c661f06a8517

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.1.2.tar.gz:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.1.2-cp313-cp313-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.1.2-cp313-cp313-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 b9a2f0e309fc53f93a4de8087cb29613f0fdb6bf52709ba2edb21bf9cac47e7f
MD5 de593eee73fad21f96881163a1ddf5c7
BLAKE2b-256 8dcf8b373677dca6d87c724175d328dd3dfff2ea5c7716a50035568447d2d4ac

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.1.2-cp313-cp313-manylinux_2_28_x86_64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.1.2-cp313-cp313-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.1.2-cp313-cp313-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 b28f20fe4b3599d142cf5e7dcd006576abf3d399c0d719abf755a3bcbdb8ebca
MD5 8917829af107c9cd963178d7a0ea9e58
BLAKE2b-256 dbbe240639c137cad1bf6cb85b8ccd11db77b7bc023a28a7a9c3dc89a76c9fa6

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.1.2-cp313-cp313-macosx_11_0_arm64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.1.2-cp312-cp312-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.1.2-cp312-cp312-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 3f16e5ec284a9198f97839eac40d98fb6e56a0349123b8dc62ac70d8dca44cf5
MD5 af430a9618f662587247bb8aadbb9ca1
BLAKE2b-256 5ba429d9f8155b4e2dd46a0b650a6fe026d0d56ba9976445ea8345f87ecfcde4

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.1.2-cp312-cp312-manylinux_2_28_x86_64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.1.2-cp312-cp312-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.1.2-cp312-cp312-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 24576acc1f0984d2b5bff416ca9a06b90e57d7e02a44c4d7fd21bd7b8453be4c
MD5 5738942df4630a06442b6746bc514f3d
BLAKE2b-256 f08ae0ef8ac9eb7c4f0c87d302b33b99473cc8ce8f9b5a1044efbcc3faaac760

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.1.2-cp312-cp312-macosx_11_0_arm64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.1.2-cp311-cp311-manylinux_2_28_x86_64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.1.2-cp311-cp311-manylinux_2_28_x86_64.whl
Algorithm Hash digest
SHA256 40064f4078f949113a3554a690c72e3ac8cd008282b42bf798bc5864748e97b9
MD5 013b5ff065d447543633a48b935d4344
BLAKE2b-256 cb72ea3c504a3067abcdf603663432d4e597aaa85ff6cb0d39b357a5c5329ef9

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.1.2-cp311-cp311-manylinux_2_28_x86_64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ematix_flow-0.1.2-cp311-cp311-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for ematix_flow-0.1.2-cp311-cp311-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 b2c591b61bbaa7f8d38139d827778964ebf363b56b3d56e9afadd85ddb1f1074
MD5 e6913b2d578e351151d384d62bfa9fad
BLAKE2b-256 e0da0070ce9af42556407081fc7049e2f8ae3923de3811f14b033a849767fe08

See more details on using hashes here.

Provenance

The following attestation bundles were made for ematix_flow-0.1.2-cp311-cp311-macosx_11_0_arm64.whl:

Publisher: release.yml on ryan-evans-git/ematix-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page