Distributed DAG pipeline engine with Rust core and Python bindings
Project description
Oxidizer ALPHA
A distributed DAG execution engine built with Python and Rust (via PyO3 / Maturin). Oxidizer orchestrates multi-tier data pipelines across workers using Redis Streams for messaging, Redis JSON for state, and S3/MinIO for config storage.
Supports both batch DAG runs and live (streaming) topologies. Live nodes use a stateless batch+republish model — each worker processes one batch, re-publishes the task back to the stream, and any worker can claim it next. Redistribution across workers is automatic; scaling requires zero coordination.
Architecture
| Component | Role |
|---|---|
| Oxidizer | DAG controller — builds execution tiers, dispatches nodes to workers |
| Reagent | Worker — consumes tasks from a stream and runs user-defined processing |
| Microscope | FastMCP control plane — HTTP API, MCP tools, resources, and prompts |
| Catalyst | Redis/Valkey cache client (Rust core) — streams, state, locking |
| Alloy | Pipeline config manager — loads, validates, and stores YAML configs in S3/MinIO |
| Anvil | S3 operations toolkit — static helpers for object storage via Rust engine |
| Formula | DAG engine — constructs runs from cached configs, manages DAG lifecycle |
| Residue | Structured logging — structlog + Rust pyo3_log bridge, optional Redis log sink |
All I/O (Redis, S3, stream reads/writes) runs in async Rust via Tokio. Python only handles business logic.
Project Structure
oxidizer/
├── examples/ # Self-contained pipeline examples
│ ├── Dockerfile # Shared multi-stage build
│ ├── main.py # Controller entrypoint
│ ├── api.py # API entrypoint
│ ├── skeleton/ # Minimal starter template (batch)
│ ├── batch_etl/ # E-Commerce ETL pipeline (batch)
│ ├── live_events/ # IoT sensor monitoring (live)
│ └── mixed_pipeline/ # Fintech transactions (mixed)
├── oxidizer/ # Python package
│ ├── ui.html # Dashboard UI (served by Microscope)
│ └── *.py # Core modules
├── rust/src/ # Rust source (PyO3 extension)
├── tests/ # Pytest + Rust unit tests
├── Cargo.toml
├── pyproject.toml
└── requirements.txt
Each example folder contains its own docker-compose.yml, worker file(s), and YAML config. See examples/README.md for details.
Prerequisites
- Python 3.10+
- Rust 1.70+ (for local builds)
- Docker & Docker Compose (for containerized runs)
- Redis (with JSON module) and MinIO (or S3-compatible store)
Quickstart — Docker Compose
The fastest way to run the full stack (using the skeleton example):
docker compose -f examples/skeleton/docker-compose.yml up --build -d
Each example starts the following core services:
| Service | Description |
|---|---|
main |
Oxidizer controller — executes DAG runs |
worker (×N) |
Reagent workers — process nodes from streams |
api |
Microscope — FastMCP server on port 8000 (HTTP + MCP + Dashboard) |
redis |
Redis Stack (with JSON module) on port 6379 |
minio |
MinIO object store on ports 9000/9001 |
minio-init |
Sidecar — creates the bucket and uploads alloy configs |
Some examples add additional workers (e.g. dedicated stream workers). See each example's docker-compose.yml for specifics.
Once running, the dashboard is at http://localhost:8000, the API at http://localhost:8000/health, the MCP endpoint at http://localhost:8000/mcp, and the MinIO console at http://localhost:9001.
To stop:
docker compose -f examples/skeleton/docker-compose.yml down
See examples/README.md for all four examples and their deployment instructions.
Quickstart — Local Development
1. Install
python -m venv .venv && source .venv/bin/activate
pip install maturin
maturin develop # builds the Rust extension in-place
pip install -r requirements.txt
2. Controller (Oxidizer)
The controller builds a DAG from an alloy config, splits it into execution tiers, and dispatches each node to a worker stream.
from oxidizer import configure_logging
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.oxidizer import Oxidizer
configure_logging() # structlog + Rust pyo3_log bridge
catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))
oxidizer = Oxidizer(catalyst=catalyst, alloy=alloy)
# Submit a DAG run by alloy name (non-blocking — queued to the alloy stream)
oxidizer.submit_run("example")
# Start the controller loop (blocks forever, dispatches runs as they arrive)
oxidizer.start()
3. Worker (Reagent)
Workers consume tasks from the oxidizer stream. Define your processing logic with the @reagent.react() decorator:
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.reagent import Reagent
catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
reagent = Reagent(catalyst=catalyst)
@reagent.react()
def process(data: dict, context: dict):
node_id = context["node_id"]
run_id = context["run_id"]
print(f"Processing {node_id} for run {run_id}")
# data: dict mapping alias → resolved upstream records
# context: run_id, node_id, alloy, layer, node_config, connections
return {"data": data}
The decorated function receives (data, context):
- data: dict mapping alias → resolved upstream records (tiered retrieval handled by the framework)
- context: dict with keys
run_id,node_id,alloy,layer,node_config,connections
For dedicated streams (e.g. routing specific nodes to specialized workers):
@reagent.react(dedicated_stream="gold_summary_stream")
def process(data: dict, context: dict):
...
4. UI / API / MCP (Microscope)
Microscope is a FastMCP server that exposes HTTP endpoints for dashboards and curl, plus MCP tools, resources, and prompts for AI agent integration.
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.microscope import Microscope
catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))
microscope = Microscope(catalyst=catalyst, alloy=alloy)
microscope.run(host="0.0.0.0", port=8000)
See API Reference for the full list of HTTP endpoints, MCP tools, resources, and prompts.
Live Topologies
Oxidizer supports live (streaming) topologies alongside batch DAG runs. A live topology deploys long-running nodes that continuously process data from Redis Streams using a batch+republish model — each worker processes one batch, re-publishes the task, and any worker can claim it next.
# Deploy a live topology
curl -X POST http://localhost:8000/topology/deploy \
-H 'Content-Type: application/json' \
-d '{"alloy_name": "example"}'
# Check status
curl http://localhost:8000/topology/{run_id}/status
# Graceful stop
curl -X POST http://localhost:8000/topology/{run_id}/stop
See Live Topologies for the full architecture, node pause/unpause, rebalancing, and crash recovery details.
Documentation
Detailed documentation lives in the docs/ folder:
| Document | Description |
|---|---|
| Architecture | Component overview, data flow, node state machines, locking, error handling |
| API Reference | All HTTP endpoints, MCP tools, resources, prompts, metrics schemas |
| Configuration | Alloy YAML reference — layers, nodes, output blocks, scheduling, retention |
| Live Topologies | Batch+republish model, topology entity, deploy/stop/rebalance, pause/unpause |
| Batching | XRANGE cursor loop, per-record fan-out, auto-flatten, batch_size config |
| Retention | Task stream cleanup, producer-owned output, hierarchical data_retention |
| Plugins | Python + Rust plugin system with Maturin/PyO3, entry points |
| Testing | Pytest suite, integration tests, Docker Compose test process, pass/fail criteria |
| Examples | Four self-contained pipeline examples (skeleton, batch_etl, live_events, mixed_pipeline) |
| PyPI Publishing | Publishing to PyPI with GitHub Actions, trusted publisher, versioning strategies |
Testing
# Rust unit tests (no external services needed)
cargo test
# Python tests (no external services needed)
python -m pytest tests/ -v
License
TBD
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file oxidizer-0.0.2.tar.gz.
File metadata
- Download URL: oxidizer-0.0.2.tar.gz
- Upload date:
- Size: 550.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b74a42af7409e00672fb93fc06326a4836ca63b80362b1b29350bceb399e0bd0
|
|
| MD5 |
2286c324c237ab6923589281e3ab4b3d
|
|
| BLAKE2b-256 |
a4e9140e45eca080ae3ff0ef6729c0a8d7174ab88b54c1bbd10f877a7fc5375e
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2.tar.gz:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2.tar.gz -
Subject digest:
b74a42af7409e00672fb93fc06326a4836ca63b80362b1b29350bceb399e0bd0 - Sigstore transparency entry: 1113651774
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file oxidizer-0.0.2-cp313-cp313-win_amd64.whl.
File metadata
- Download URL: oxidizer-0.0.2-cp313-cp313-win_amd64.whl
- Upload date:
- Size: 8.3 MB
- Tags: CPython 3.13, Windows x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b242c0a11bf9bc7813c79a3229e0aeb2fe30ec56989d5481a1a32e7a68d93dcb
|
|
| MD5 |
61015e518697c1fd43605a6898caa9ab
|
|
| BLAKE2b-256 |
ca2bc0a23da5608c074fa9e107b500fb8405a05abcaa4fc93bed55156d128ec2
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-win_amd64.whl:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2-cp313-cp313-win_amd64.whl -
Subject digest:
b242c0a11bf9bc7813c79a3229e0aeb2fe30ec56989d5481a1a32e7a68d93dcb - Sigstore transparency entry: 1113651789
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl.
File metadata
- Download URL: oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl
- Upload date:
- Size: 10.5 MB
- Tags: CPython 3.13, musllinux: musl 1.1+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
33950fda95be72a0dd07afa6d73d113bd59db896fa07d8b7132a8eaf9cb71ce8
|
|
| MD5 |
151bf19737ef29594e98cf0b67e3cf74
|
|
| BLAKE2b-256 |
2feedd53921e339de6037a1cf2d72a69e20d01bebdaa922a6b998c9977cec6ed
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl -
Subject digest:
33950fda95be72a0dd07afa6d73d113bd59db896fa07d8b7132a8eaf9cb71ce8 - Sigstore transparency entry: 1113651781
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl.
File metadata
- Download URL: oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl
- Upload date:
- Size: 10.3 MB
- Tags: CPython 3.13, musllinux: musl 1.1+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9bfca0c3610690fc75d46d28447ae80b4988604328b92228618c8fd042b32d79
|
|
| MD5 |
a4bd8c4a0b9882e3a17af02eb33a3d7b
|
|
| BLAKE2b-256 |
c268c27fcf36cf2ead78ee7099d45de96ecbb10f6e18921c58e04495caf3f430
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl -
Subject digest:
9bfca0c3610690fc75d46d28447ae80b4988604328b92228618c8fd042b32d79 - Sigstore transparency entry: 1113651779
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 10.2 MB
- Tags: CPython 3.13, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1edb2885d76daa68f5ec93477b89a5506f8a3d75a208af6f6e0aa480d22bcdbb
|
|
| MD5 |
43f3dcbb3f23e9e14329103add52bb7b
|
|
| BLAKE2b-256 |
f3c9bf47d20db9f4817e0379f1f0842b65a51f7196880c5dacf8586b206b8f00
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl -
Subject digest:
1edb2885d76daa68f5ec93477b89a5506f8a3d75a208af6f6e0aa480d22bcdbb - Sigstore transparency entry: 1113651780
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.
File metadata
- Download URL: oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
- Upload date:
- Size: 10.1 MB
- Tags: CPython 3.13, manylinux: glibc 2.17+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b67730cdcd852afc3b25d3f9b72950dbb8c802a76d3fbcceaabebe029297b8a
|
|
| MD5 |
2fe4bb0e1182012dcda62d30821cefd7
|
|
| BLAKE2b-256 |
f4f2d28a93213f1051b635379b38271cfa802a94e0ea0ea8ab67ac8466e5d454
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl -
Subject digest:
3b67730cdcd852afc3b25d3f9b72950dbb8c802a76d3fbcceaabebe029297b8a - Sigstore transparency entry: 1113651793
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl.
File metadata
- Download URL: oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl
- Upload date:
- Size: 8.7 MB
- Tags: CPython 3.13, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
04c82855fb1b44121f53805b38dfd64defffd00ce6e7a06963b4cc4bfd3e7277
|
|
| MD5 |
cc2a099027d67106cf54f16e7860cf89
|
|
| BLAKE2b-256 |
d8194980bc77404183db8cefd94c83dccf84a1e6a2ebbf5bcad0ca73f322992e
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl -
Subject digest:
04c82855fb1b44121f53805b38dfd64defffd00ce6e7a06963b4cc4bfd3e7277 - Sigstore transparency entry: 1113651797
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type:
File details
Details for the file oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl.
File metadata
- Download URL: oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl
- Upload date:
- Size: 9.6 MB
- Tags: CPython 3.13, macOS 10.12+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e07f19c4faa07d226b21d0366f7509d9cbccb028bfcdfd600bed3589d6d03c4f
|
|
| MD5 |
a7d9654d4d4452036e1f0e5c0d2dcdb4
|
|
| BLAKE2b-256 |
9a5c7a3b49327970f946d28b7cb8d6a6c2b25ee6804047127409be03c0245dee
|
Provenance
The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl:
Publisher:
release.yml on moos-engineering/oxidizer
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl -
Subject digest:
e07f19c4faa07d226b21d0366f7509d9cbccb028bfcdfd600bed3589d6d03c4f - Sigstore transparency entry: 1113651776
- Sigstore integration time:
-
Permalink:
moos-engineering/oxidizer@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Branch / Tag:
refs/tags/v0.0.2 - Owner: https://github.com/moos-engineering
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@7f03293451f8638e67cde8f0030ad131915cc7e2 -
Trigger Event:
push
-
Statement type: