Skip to main content

Distributed DAG pipeline engine with Rust core and Python bindings

Project description

Oxidizer ALPHA

A distributed DAG execution engine built with Python and Rust (via PyO3 / Maturin). Oxidizer orchestrates multi-tier data pipelines across workers using Redis Streams for messaging, Redis JSON for state, and S3/MinIO for config storage.

Supports both batch DAG runs and live (streaming) topologies. Live nodes use a stateless batch+republish model — each worker processes one batch, re-publishes the task back to the stream, and any worker can claim it next. Redistribution across workers is automatic; scaling requires zero coordination.

Architecture

Component Role
Oxidizer DAG controller — builds execution tiers, dispatches nodes to workers
Reagent Worker — consumes tasks from a stream and runs user-defined processing
Microscope FastMCP control plane — HTTP API, MCP tools, resources, and prompts
Catalyst Redis/Valkey cache client (Rust core) — streams, state, locking
Alloy Pipeline config manager — loads, validates, and stores YAML configs in S3/MinIO
Anvil S3 operations toolkit — static helpers for object storage via Rust engine
Formula DAG engine — constructs runs from cached configs, manages DAG lifecycle
Residue Structured logging — structlog + Rust pyo3_log bridge, optional Redis log sink

All I/O (Redis, S3, stream reads/writes) runs in async Rust via Tokio. Python only handles business logic.

Project Structure

oxidizer/
├── examples/                   # Self-contained pipeline examples
│   ├── Dockerfile              # Shared multi-stage build
│   ├── main.py                 # Controller entrypoint
│   ├── api.py                  # API entrypoint
│   ├── skeleton/               # Minimal starter template (batch)
│   ├── batch_etl/              # E-Commerce ETL pipeline (batch)
│   ├── live_events/            # IoT sensor monitoring (live)
│   └── mixed_pipeline/         # Fintech transactions (mixed)
├── oxidizer/                   # Python package
│   ├── ui.html                 # Dashboard UI (served by Microscope)
│   └── *.py                    # Core modules
├── rust/src/                   # Rust source (PyO3 extension)
├── tests/                      # Pytest + Rust unit tests
├── Cargo.toml
├── pyproject.toml
└── requirements.txt

Each example folder contains its own docker-compose.yml, worker file(s), and YAML config. See examples/README.md for details.

Prerequisites

  • Python 3.10+
  • Rust 1.70+ (for local builds)
  • Docker & Docker Compose (for containerized runs)
  • Redis (with JSON module) and MinIO (or S3-compatible store)

Quickstart — Docker Compose

The fastest way to run the full stack (using the skeleton example):

docker compose -f examples/skeleton/docker-compose.yml up --build -d

Each example starts the following core services:

Service Description
main Oxidizer controller — executes DAG runs
worker (×N) Reagent workers — process nodes from streams
api Microscope — FastMCP server on port 8000 (HTTP + MCP + Dashboard)
redis Redis Stack (with JSON module) on port 6379
minio MinIO object store on ports 9000/9001
minio-init Sidecar — creates the bucket and uploads alloy configs

Some examples add additional workers (e.g. dedicated stream workers). See each example's docker-compose.yml for specifics.

Once running, the dashboard is at http://localhost:8000, the API at http://localhost:8000/health, the MCP endpoint at http://localhost:8000/mcp, and the MinIO console at http://localhost:9001.

To stop:

docker compose -f examples/skeleton/docker-compose.yml down

See examples/README.md for all four examples and their deployment instructions.

Quickstart — Local Development

1. Install

python -m venv .venv && source .venv/bin/activate
pip install maturin
maturin develop          # builds the Rust extension in-place
pip install -r requirements.txt

2. Controller (Oxidizer)

The controller builds a DAG from an alloy config, splits it into execution tiers, and dispatches each node to a worker stream.

from oxidizer import configure_logging
from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.oxidizer import Oxidizer

configure_logging()  # structlog + Rust pyo3_log bridge

catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))

oxidizer = Oxidizer(catalyst=catalyst, alloy=alloy)

# Submit a DAG run by alloy name (non-blocking — queued to the alloy stream)
oxidizer.submit_run("example")

# Start the controller loop (blocks forever, dispatches runs as they arrive)
oxidizer.start()

3. Worker (Reagent)

Workers consume tasks from the oxidizer stream. Define your processing logic with the @reagent.react() decorator:

from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.reagent import Reagent

catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
reagent = Reagent(catalyst=catalyst)

@reagent.react()
def process(data: dict, context: dict):
    node_id = context["node_id"]
    run_id = context["run_id"]
    print(f"Processing {node_id} for run {run_id}")
    # data: dict mapping alias → resolved upstream records
    # context: run_id, node_id, alloy, layer, node_config, connections
    return {"data": data}

The decorated function receives (data, context):

  • data: dict mapping alias → resolved upstream records (tiered retrieval handled by the framework)
  • context: dict with keys run_id, node_id, alloy, layer, node_config, connections

For dedicated streams (e.g. routing specific nodes to specialized workers):

@reagent.react(dedicated_stream="gold_summary_stream")
def process(data: dict, context: dict):
    ...

4. UI / API / MCP (Microscope)

Microscope is a FastMCP server that exposes HTTP endpoints for dashboards and curl, plus MCP tools, resources, and prompts for AI agent integration.

from oxidizer.catalyst import Catalyst, CatalystConnection
from oxidizer.alloy import Alloy, AlloyConnection
from oxidizer.microscope import Microscope

catalyst = Catalyst(connection=CatalystConnection(url="localhost"))
alloy = Alloy(connection=AlloyConnection(endpoint="localhost:9000"))

microscope = Microscope(catalyst=catalyst, alloy=alloy)
microscope.run(host="0.0.0.0", port=8000)

See API Reference for the full list of HTTP endpoints, MCP tools, resources, and prompts.

Live Topologies

Oxidizer supports live (streaming) topologies alongside batch DAG runs. A live topology deploys long-running nodes that continuously process data from Redis Streams using a batch+republish model — each worker processes one batch, re-publishes the task, and any worker can claim it next.

# Deploy a live topology
curl -X POST http://localhost:8000/topology/deploy \
  -H 'Content-Type: application/json' \
  -d '{"alloy_name": "example"}'

# Check status
curl http://localhost:8000/topology/{run_id}/status

# Graceful stop
curl -X POST http://localhost:8000/topology/{run_id}/stop

See Live Topologies for the full architecture, node pause/unpause, rebalancing, and crash recovery details.

Documentation

Detailed documentation lives in the docs/ folder:

Document Description
Architecture Component overview, data flow, node state machines, locking, error handling
API Reference All HTTP endpoints, MCP tools, resources, prompts, metrics schemas
Configuration Alloy YAML reference — layers, nodes, output blocks, scheduling, retention
Live Topologies Batch+republish model, topology entity, deploy/stop/rebalance, pause/unpause
Batching XRANGE cursor loop, per-record fan-out, auto-flatten, batch_size config
Retention Task stream cleanup, producer-owned output, hierarchical data_retention
Plugins Python + Rust plugin system with Maturin/PyO3, entry points
Testing Pytest suite, integration tests, Docker Compose test process, pass/fail criteria
Examples Four self-contained pipeline examples (skeleton, batch_etl, live_events, mixed_pipeline)
PyPI Publishing Publishing to PyPI with GitHub Actions, trusted publisher, versioning strategies

Testing

# Rust unit tests (no external services needed)
cargo test

# Python tests (no external services needed)
python -m pytest tests/ -v

License

TBD

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oxidizer-0.0.2.tar.gz (550.3 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

oxidizer-0.0.2-cp313-cp313-win_amd64.whl (8.3 MB view details)

Uploaded CPython 3.13Windows x86-64

oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl (10.5 MB view details)

Uploaded CPython 3.13musllinux: musl 1.1+ x86-64

oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl (10.3 MB view details)

Uploaded CPython 3.13musllinux: musl 1.1+ ARM64

oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (10.2 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.17+ x86-64

oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (10.1 MB view details)

Uploaded CPython 3.13manylinux: glibc 2.17+ ARM64

oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl (8.7 MB view details)

Uploaded CPython 3.13macOS 11.0+ ARM64

oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl (9.6 MB view details)

Uploaded CPython 3.13macOS 10.12+ x86-64

File details

Details for the file oxidizer-0.0.2.tar.gz.

File metadata

  • Download URL: oxidizer-0.0.2.tar.gz
  • Upload date:
  • Size: 550.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for oxidizer-0.0.2.tar.gz
Algorithm Hash digest
SHA256 b74a42af7409e00672fb93fc06326a4836ca63b80362b1b29350bceb399e0bd0
MD5 2286c324c237ab6923589281e3ab4b3d
BLAKE2b-256 a4e9140e45eca080ae3ff0ef6729c0a8d7174ab88b54c1bbd10f877a7fc5375e

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2.tar.gz:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer-0.0.2-cp313-cp313-win_amd64.whl.

File metadata

  • Download URL: oxidizer-0.0.2-cp313-cp313-win_amd64.whl
  • Upload date:
  • Size: 8.3 MB
  • Tags: CPython 3.13, Windows x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for oxidizer-0.0.2-cp313-cp313-win_amd64.whl
Algorithm Hash digest
SHA256 b242c0a11bf9bc7813c79a3229e0aeb2fe30ec56989d5481a1a32e7a68d93dcb
MD5 61015e518697c1fd43605a6898caa9ab
BLAKE2b-256 ca2bc0a23da5608c074fa9e107b500fb8405a05abcaa4fc93bed55156d128ec2

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-win_amd64.whl:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl.

File metadata

File hashes

Hashes for oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl
Algorithm Hash digest
SHA256 33950fda95be72a0dd07afa6d73d113bd59db896fa07d8b7132a8eaf9cb71ce8
MD5 151bf19737ef29594e98cf0b67e3cf74
BLAKE2b-256 2feedd53921e339de6037a1cf2d72a69e20d01bebdaa922a6b998c9977cec6ed

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-musllinux_1_1_x86_64.whl:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl.

File metadata

File hashes

Hashes for oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl
Algorithm Hash digest
SHA256 9bfca0c3610690fc75d46d28447ae80b4988604328b92228618c8fd042b32d79
MD5 a4bd8c4a0b9882e3a17af02eb33a3d7b
BLAKE2b-256 c268c27fcf36cf2ead78ee7099d45de96ecbb10f6e18921c58e04495caf3f430

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-musllinux_1_1_aarch64.whl:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 1edb2885d76daa68f5ec93477b89a5506f8a3d75a208af6f6e0aa480d22bcdbb
MD5 43f3dcbb3f23e9e14329103add52bb7b
BLAKE2b-256 f3c9bf47d20db9f4817e0379f1f0842b65a51f7196880c5dacf8586b206b8f00

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 3b67730cdcd852afc3b25d3f9b72950dbb8c802a76d3fbcceaabebe029297b8a
MD5 2fe4bb0e1182012dcda62d30821cefd7
BLAKE2b-256 f4f2d28a93213f1051b635379b38271cfa802a94e0ea0ea8ab67ac8466e5d454

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 04c82855fb1b44121f53805b38dfd64defffd00ce6e7a06963b4cc4bfd3e7277
MD5 cc2a099027d67106cf54f16e7860cf89
BLAKE2b-256 d8194980bc77404183db8cefd94c83dccf84a1e6a2ebbf5bcad0ca73f322992e

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-macosx_11_0_arm64.whl:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 e07f19c4faa07d226b21d0366f7509d9cbccb028bfcdfd600bed3589d6d03c4f
MD5 a7d9654d4d4452036e1f0e5c0d2dcdb4
BLAKE2b-256 9a5c7a3b49327970f946d28b7cb8d6a6c2b25ee6804047127409be03c0245dee

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer-0.0.2-cp313-cp313-macosx_10_12_x86_64.whl:

Publisher: release.yml on moos-engineering/oxidizer

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page