Skip to main content

dbt inspired streaming tool

Project description

Otter

dbt-inspired streaming transforms. Consume events from a message queue, transform them with SQL, write structured Parquet output.

RabbitMQ  ──▶  DuckDB SQL models  ──▶  Parquet files

Otter connects to a message queue, pulls batches of JSON messages, loads each one into a DuckDB table called incoming, runs your SQL SELECT model against it, and writes the results as Parquet files. Failed messages go to a dead-letter queue. That's it.

Install

uv add otter-stream
# or with pip
pip install otter-stream

Quick start

# Scaffold a new project
otter init my-pipeline
cd my-pipeline

# Edit config and models
vim config/sources.yml
vim models/example.sql

# Validate everything parses
otter validate

# Test against a fixture
otter test --fixture tests/sample.json

# Run the pipeline
otter run

How it works

Project structure

my-pipeline/
  config/
    sources.yml       # Queue connection + batch settings
    sinks.yml         # Output path + format
  models/
    orders.sql        # SQL SELECT models — one per transformation
    purchases.sql

The incoming table

Every message consumed from the queue is loaded into a single-row DuckDB table called incoming. Your SQL model SELECTs from it. DuckDB handles nested JSON natively — access nested fields with dot notation (incoming.user.id) and arrays with unnest().

SQL models

Models are plain SQL SELECT statements. If it runs in DuckDB, it works in Otter (because it uses DuckDB!).

-- models/orders.sql
SELECT
    incoming.order_id,
    incoming.user.id            AS user_id,
    incoming.user.name          AS user_name,
    len(incoming.items)         AS item_count,
    list_sum([
        item.price * item.quantity
        FOR item IN incoming.items
    ])                          AS order_total,
    incoming.currency,
    incoming.placed_at
FROM incoming

One message can produce zero, one, or many output rows. Use unnest() to explode arrays:

-- models/purchases.sql
SELECT
    incoming.order_id,
    unnest.sku,
    unnest.name                              AS product_name,
    unnest.quantity,
    unnest.price                             AS unit_price,
    round(unnest.price * unnest.quantity, 2)  AS line_total,
    incoming.user.id                         AS user_id,
    incoming.placed_at
FROM incoming, unnest(incoming.items)

Processing contract

Outcome Behaviour
Good message Transformed row(s) appended to the WAL, fsynced, then acked. Later coalesced into Parquet.
Bad SQL transform Original message + error sent to DLQ. Message acked.
Unparseable JSON Raw bytes + parse error sent to DLQ. Message acked.
Worker crash (WAL intact) On restart, WAL segments are replayed to Parquet before polling resumes.
Worker crash (WAL disk lost) Un-flushed rows are lost — rabbit already acked them. Use a durable volume in prod.
Parquet write failure WAL segment stays on disk; next flush retries. /readyz goes red via sink.is_healthy().

Message flow and guarantees

Otter coalesces many source batches into fewer, larger Parquet files using a per-worker write-ahead log. The main loop looks like this:

rabbit.poll ─▶ deserialize ─▶ transform (per model) ─▶ wal.append (per model)
                                                              │ fsync
                                                              ▼
                                                          rabbit.ack
                                                              │
                                              (size or age trigger per model)
                                                              ▼
                                      wal.read ─▶ sink.write (atomic) ─▶ wal.truncate
  • WAL format. Each model gets a segment file at <wal_path>/<model_name>/segment.arrow in the Arrow IPC stream format. Every append is fsynced before the source ack, so at most the last in-flight batch can be lost on a mid-append crash.
  • Flush triggers. Per model, you configure max_file_bytes and max_age_seconds. Either threshold triggers a drain. The loop checks after every poll, including None polls, so idle streams still flush by age. max_file_bytes is measured against in-memory Arrow Table.nbytes (column-buffer bytes), not bytes on disk — the WAL segment file itself can be noticeably larger once the Arrow IPC stream's per-record-batch flatbuffer headers and alignment padding are accounted for (easily 1.5–2× for small, frequent batches). The compressed Parquet output is then typically 2–10× smaller than the in-memory figure. Size the threshold against in-memory nbytes, not ls -lh on the WAL file.
  • Atomic Parquet writes. Non-partitioned writes land in a .<name>.tmp file and are os.replace-d into place after fsync. Partitioned writes stage into a .staging_<uuid>/ directory and are committed with a single rename to commit_<ts>_<uuid>/. Readers scan the output tree recursively, so the added directory layer is invisible to Trino / Athena / DuckDB / Spark.
  • Crash recovery. On startup the worker walks its WAL root and replays any surviving segments through the sink before polling resumes. Corrupted segment tails (detectable as pyarrow.ArrowInvalid) are logged and deleted. A startup sweep also removes any .staging_* / .*.tmp leftovers from a prior crashed write.
  • Delivery semantics. At-least-once. A crash after sink.write but before wal.truncate will re-emit the segment as a new Parquet file on recovery. Deduplicate downstream if you need exactly-once.
  • Sink failures. If the sink rejects a flush, the WAL segment stays on disk, the loop retries on the next flush tick, and /readyz reports unhealthy via sink.is_healthy().

Per-model flush thresholds (config/models.yml)

max_file_bytes: 134217728    # 128 MB default
max_age_seconds: 300         # 5 min default

overrides:
  orders:
    max_file_bytes: 536870912   # 512 MB
    max_age_seconds: 60
  purchases:
    max_file_bytes: 10485760    # 10 MB
    max_age_seconds: 1500

Deployment: durable WAL storage

Otter workers are stateful — the WAL lives on local disk. Production deployments should point wal.path at a cloud-managed durable volume via a Kubernetes StatefulSet + PersistentVolumeClaim (EBS gp3, GCP Persistent Disk, Azure Managed Disks). Semantics are identical to local disk — fsync, atomic rename — but survive host failure with ~99.999% durability.

Otter does not replicate the WAL to object storage. The right answer for cross-host durability is infrastructure (a durable PV), not application-layer S3 mirroring. Local ephemeral disk is fine for dev and for workloads that can tolerate losing the in-flight WAL on host failure.

Configuration

config/sources.yml

source:
  type: rabbitmq
  host: ${RABBITMQ_HOST:-localhost}
  port: 5672
  vhost: /
  username: guest
  password: ${RABBITMQ_PASSWORD}
  queue: orders.raw
  dlq: orders.dlq
  batch_size: 100          # Messages per batch
  batch_timeout_ms: 5000   # Flush partial batch after this timeout

config/sinks.yml

sink:
  type: parquet
  path: ./output
  partition_by: [event_type]   # Optional: partition into subdirectories
  dedupe_key: order_id         # Optional: downstream deduplication key

Environment variables

Use ${VAR} or ${VAR:-default} anywhere in YAML config. Values are interpolated before parsing.

CLI overrides

CLI flags override config file values (e.g. --log-level DEBUG).

CLI reference

otter run

Start the pipeline. Discovers models and config in the current directory.

otter run                              # Run all models
otter run --model models/orders.sql    # Run a specific model
otter run --dry-run                    # Consume one batch, print output, no writes
otter run --log-json                   # JSON log output (for production)
otter run --log-level DEBUG            # Verbose logging
otter run --config path/to/config      # Custom config directory

otter validate

Check config files and SQL models without connecting to anything.

otter validate
otter validate --config path/to/config --models path/to/models

otter test

Run a model against a local JSON fixture file. No queue connection needed.

otter test --fixture fixtures/sample_order.json
otter test --fixture fixtures/sample_order.json --model models/orders.sql

otter init

Scaffold a new project with example config and models.

otter init my-pipeline

otter models list

Show all discovered SQL models.

otter models list
otter models list --models path/to/models

otter dlq inspect

Peek at messages in the dead-letter queue (non-destructive).

otter dlq inspect
otter dlq inspect --limit 5

otter dlq replay

Replay all DLQ messages back to the source queue for reprocessing.

otter dlq replay
otter dlq replay --target another.queue

Development

make install-dev    # Install dev dependencies
make test           # Run all tests
make lint           # Run ruff check
make format         # Run ruff format

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

otter_stream-0.1.0.tar.gz (78.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

otter_stream-0.1.0-py3-none-any.whl (46.0 kB view details)

Uploaded Python 3

File details

Details for the file otter_stream-0.1.0.tar.gz.

File metadata

  • Download URL: otter_stream-0.1.0.tar.gz
  • Upload date:
  • Size: 78.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for otter_stream-0.1.0.tar.gz
Algorithm Hash digest
SHA256 9fda8fd34984d083e9132665b507d9d3c9e16d09f17d647cb44ec064d55055af
MD5 7898157515c351822c7644aacfa6675c
BLAKE2b-256 b682378db12e20551beb65e8750e6dcd8daeec18bdbc0987e3748e76ae822a66

See more details on using hashes here.

File details

Details for the file otter_stream-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: otter_stream-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 46.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for otter_stream-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 87e4e5011b04cc43d1681b0329cf2e6fc9a3c26c42e45157c82be12319941e5d
MD5 9a2b3ab9fec4580e918d55a92774da49
BLAKE2b-256 79196a9ed18f1b9f6555cbf54b9c7a99dc34cb94a10b301c0874a67a585cee62

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page