dbt inspired streaming tool
Project description
Otter
dbt-inspired streaming transforms. Consume events from a message queue, transform them with SQL, write structured Parquet output.
RabbitMQ ──▶ DuckDB SQL models ──▶ Parquet files
Otter connects to a message queue, pulls batches of JSON messages, loads each one into a DuckDB table called incoming, runs your SQL SELECT model against it, and writes the results as Parquet files. Failed messages go to a dead-letter queue. That's it.
Install
uv add otter-stream
# or with pip
pip install otter-stream
Quick start
# Scaffold a new project
otter init my-pipeline
cd my-pipeline
# Edit config and models
vim config/sources.yml
vim models/example.sql
# Validate everything parses
otter validate
# Test against a fixture
otter test --fixture tests/sample.json
# Run the pipeline
otter run
How it works
Project structure
my-pipeline/
config/
sources.yml # Queue connection + batch settings
sinks.yml # Output path + format
models/
orders.sql # SQL SELECT models — one per transformation
purchases.sql
The incoming table
Every message consumed from the queue is loaded into a single-row DuckDB table called incoming. Your SQL model SELECTs from it. DuckDB handles nested JSON natively — access nested fields with dot notation (incoming.user.id) and arrays with unnest().
SQL models
Models are plain SQL SELECT statements. If it runs in DuckDB, it works in Otter (because it uses DuckDB!).
-- models/orders.sql
SELECT
incoming.order_id,
incoming.user.id AS user_id,
incoming.user.name AS user_name,
len(incoming.items) AS item_count,
list_sum([
item.price * item.quantity
FOR item IN incoming.items
]) AS order_total,
incoming.currency,
incoming.placed_at
FROM incoming
One message can produce zero, one, or many output rows. Use unnest() to explode arrays:
-- models/purchases.sql
SELECT
incoming.order_id,
unnest.sku,
unnest.name AS product_name,
unnest.quantity,
unnest.price AS unit_price,
round(unnest.price * unnest.quantity, 2) AS line_total,
incoming.user.id AS user_id,
incoming.placed_at
FROM incoming, unnest(incoming.items)
Processing contract
| Outcome | Behaviour |
|---|---|
| Good message | Transformed row(s) appended to the WAL, fsynced, then acked. Later coalesced into Parquet. |
| Bad SQL transform | Original message + error sent to DLQ. Message acked. |
| Unparseable JSON | Raw bytes + parse error sent to DLQ. Message acked. |
| Worker crash (WAL intact) | On restart, WAL segments are replayed to Parquet before polling resumes. |
| Worker crash (WAL disk lost) | Un-flushed rows are lost — rabbit already acked them. Use a durable volume in prod. |
| Parquet write failure | WAL segment stays on disk; next flush retries. /readyz goes red via sink.is_healthy(). |
Message flow and guarantees
Otter coalesces many source batches into fewer, larger Parquet files using a per-worker write-ahead log. The main loop looks like this:
rabbit.poll ─▶ deserialize ─▶ transform (per model) ─▶ wal.append (per model)
│ fsync
▼
rabbit.ack
│
(size or age trigger per model)
▼
wal.read ─▶ sink.write (atomic) ─▶ wal.truncate
- WAL format. Each model gets a segment file at
<wal_path>/<model_name>/segment.arrowin the Arrow IPC stream format. Every append is fsynced before the source ack, so at most the last in-flight batch can be lost on a mid-append crash. - Flush triggers. Per model, you configure
max_file_bytesandmax_age_seconds. Either threshold triggers a drain. The loop checks after every poll, includingNonepolls, so idle streams still flush by age.max_file_bytesis measured against in-memory ArrowTable.nbytes(column-buffer bytes), not bytes on disk — the WAL segment file itself can be noticeably larger once the Arrow IPC stream's per-record-batch flatbuffer headers and alignment padding are accounted for (easily 1.5–2× for small, frequent batches). The compressed Parquet output is then typically 2–10× smaller than the in-memory figure. Size the threshold against in-memory nbytes, notls -lhon the WAL file. - Atomic Parquet writes. Non-partitioned writes land in a
.<name>.tmpfile and areos.replace-d into place afterfsync. Partitioned writes stage into a.staging_<uuid>/directory and are committed with a single rename tocommit_<ts>_<uuid>/. Readers scan the output tree recursively, so the added directory layer is invisible to Trino / Athena / DuckDB / Spark. - Crash recovery. On startup the worker walks its WAL root and replays any surviving segments through the sink before polling resumes. Corrupted segment tails (detectable as
pyarrow.ArrowInvalid) are logged and deleted. A startup sweep also removes any.staging_*/.*.tmpleftovers from a prior crashed write. - Delivery semantics. At-least-once. A crash after
sink.writebut beforewal.truncatewill re-emit the segment as a new Parquet file on recovery. Deduplicate downstream if you need exactly-once. - Sink failures. If the sink rejects a flush, the WAL segment stays on disk, the loop retries on the next flush tick, and
/readyzreports unhealthy viasink.is_healthy().
Per-model flush thresholds (config/models.yml)
max_file_bytes: 134217728 # 128 MB default
max_age_seconds: 300 # 5 min default
overrides:
orders:
max_file_bytes: 536870912 # 512 MB
max_age_seconds: 60
purchases:
max_file_bytes: 10485760 # 10 MB
max_age_seconds: 1500
Deployment: durable WAL storage
Otter workers are stateful — the WAL lives on local disk. Production deployments should point wal.path at a cloud-managed durable volume via a Kubernetes StatefulSet + PersistentVolumeClaim (EBS gp3, GCP Persistent Disk, Azure Managed Disks). Semantics are identical to local disk — fsync, atomic rename — but survive host failure with ~99.999% durability.
Otter does not replicate the WAL to object storage. The right answer for cross-host durability is infrastructure (a durable PV), not application-layer S3 mirroring. Local ephemeral disk is fine for dev and for workloads that can tolerate losing the in-flight WAL on host failure.
Configuration
config/sources.yml
source:
type: rabbitmq
host: ${RABBITMQ_HOST:-localhost}
port: 5672
vhost: /
username: guest
password: ${RABBITMQ_PASSWORD}
queue: orders.raw
dlq: orders.dlq
batch_size: 100 # Messages per batch
batch_timeout_ms: 5000 # Flush partial batch after this timeout
config/sinks.yml
sink:
type: parquet
path: ./output
partition_by: [event_type] # Optional: partition into subdirectories
dedupe_key: order_id # Optional: downstream deduplication key
Environment variables
Use ${VAR} or ${VAR:-default} anywhere in YAML config. Values are interpolated before parsing.
CLI overrides
CLI flags override config file values (e.g. --log-level DEBUG).
CLI reference
otter run
Start the pipeline. Discovers models and config in the current directory.
otter run # Run all models
otter run --model models/orders.sql # Run a specific model
otter run --dry-run # Consume one batch, print output, no writes
otter run --log-json # JSON log output (for production)
otter run --log-level DEBUG # Verbose logging
otter run --config path/to/config # Custom config directory
otter validate
Check config files and SQL models without connecting to anything.
otter validate
otter validate --config path/to/config --models path/to/models
otter test
Run a model against a local JSON fixture file. No queue connection needed.
otter test --fixture fixtures/sample_order.json
otter test --fixture fixtures/sample_order.json --model models/orders.sql
otter init
Scaffold a new project with example config and models.
otter init my-pipeline
otter models list
Show all discovered SQL models.
otter models list
otter models list --models path/to/models
otter dlq inspect
Peek at messages in the dead-letter queue (non-destructive).
otter dlq inspect
otter dlq inspect --limit 5
otter dlq replay
Replay all DLQ messages back to the source queue for reprocessing.
otter dlq replay
otter dlq replay --target another.queue
Development
make install-dev # Install dev dependencies
make test # Run all tests
make lint # Run ruff check
make format # Run ruff format
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file otter_stream-0.1.0.tar.gz.
File metadata
- Download URL: otter_stream-0.1.0.tar.gz
- Upload date:
- Size: 78.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9fda8fd34984d083e9132665b507d9d3c9e16d09f17d647cb44ec064d55055af
|
|
| MD5 |
7898157515c351822c7644aacfa6675c
|
|
| BLAKE2b-256 |
b682378db12e20551beb65e8750e6dcd8daeec18bdbc0987e3748e76ae822a66
|
File details
Details for the file otter_stream-0.1.0-py3-none-any.whl.
File metadata
- Download URL: otter_stream-0.1.0-py3-none-any.whl
- Upload date:
- Size: 46.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.30 {"installer":{"name":"uv","version":"0.9.30","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
87e4e5011b04cc43d1681b0329cf2e6fc9a3c26c42e45157c82be12319941e5d
|
|
| MD5 |
9a2b3ab9fec4580e918d55a92774da49
|
|
| BLAKE2b-256 |
79196a9ed18f1b9f6555cbf54b9c7a99dc34cb94a10b301c0874a67a585cee62
|