Skip to main content

A distributed, event-driven DAG orchestration engine built on Redis Streams

Project description

Oxidizer-Lite

A distributed, event-driven DAG orchestration engine built on Redis Streams. Oxidizer-Lite coordinates data pipelines using a controller-worker pattern — configs define the topology, Redis handles dispatch, and workers scale horizontally.

Installation

pip install oxidizer-lite

Or install from source:

git clone https://github.com/moos-engineering/oxidizer-lite.git
cd oxidizer-lite
pip install -e .

Quick Start

1. Start infrastructure:

# Redis (with JSON module)
docker run -d --name redis -p 6379:6379 redis/redis-stack:latest

# MinIO (S3-compatible storage)
docker run -d --name minio -p 9000:9000 -p 9001:9001 \
  -e MINIO_ROOT_USER=minioadmin -e MINIO_ROOT_PASSWORD=minioadmin \
  minio/minio server /data --console-address ":9001"

2. Create a worker:

from oxidizer_lite import Reagent, CatalystConnection

catalyst = CatalystConnection(host="localhost", port=6379, db=0)
reagent = Reagent(catalyst)

@reagent.react()
def process(data: dict, context: dict):
    # Your processing logic here
    return data.get("input", [])

3. Run the controller:

from oxidizer_lite import Oxidizer, CatalystConnection, CrucibleConnection

catalyst = CatalystConnection(host="localhost", port=6379, db=0)
crucible = CrucibleConnection(
    s3_bucket="oxidizer-configs",
    s3_url="http://localhost:9000",
    access_key="minioadmin",
    secret_key="minioadmin"
)

oxidizer = Oxidizer(catalyst, crucible)
oxidizer.oxidize()

4. Start the API server:

from oxidizer_lite import Microscope, Catalyst, Crucible, CatalystConnection, CrucibleConnection

crucible = Crucible(CrucibleConnection(...))
catalyst = Catalyst(CatalystConnection(...))
microscope = Microscope(crucible=crucible, catalyst=catalyst)
microscope.run(host="0.0.0.0", port=8000)

Architecture

YAML Config (S3)
      │
      ▼
┌──────────────┐   Redis Streams    ┌─────────────┐
│  Controller  │◄──────────────────►│  Worker(s)  │
│  (Oxidizer)  │   task dispatch /  │  (Reagent)  │
└──────┬───────┘   checkpoints      └─────┬───────┘
       │                                  │
       ▼                                  ▼
┌──────────────┐                    ┌─────────────┐
│    Redis     │                    │ SQL / API / │
│  State Cache │                    │  Streams    │
└──────────────┘                    └─────────────┘

Controller reads invocations, compiles the DAG, dispatches ready nodes, and processes worker updates. Workers fetch inputs, execute user functions, write outputs, and report checkpoints — all through Redis Streams.

Project Structure

oxidizer-lite/
├── oxidizer_lite/          # Core package
│   ├── oxidizer.py         # DAG controller
│   ├── reagent.py          # Worker decorator
│   ├── catalyst.py         # Redis client
│   ├── anvil.py            # SQL/API engines
│   ├── crucible.py         # S3 storage
│   ├── microscope.py       # REST API + MCP
│   ├── lattice.py          # Config parser
│   ├── topology.py         # DAG compiler
│   ├── phase.py            # Data models
│   ├── incubation.py       # Scheduler
│   └── residue.py          # Logging
├── examples/
│   ├── skeleton/           # Minimal starter
│   └── scd2/               # SCD Type 2 pipeline
├── tests/
├── pyproject.toml
└── LICENSE

Components

Component Module Role
Oxidizer oxidizer.py Core orchestrator — topology lifecycle, node dispatch, checkpoint processing
Reagent reagent.py Distributed worker — input fetching, user function execution, output handling
Catalyst catalyst.py Redis cache and streams — consumer groups, JSON state, TTL management
Anvil anvil.py SQL engine (DuckDB, Iceberg) and REST API engine for data I/O
Crucible crucible.py S3-compatible object storage — configs, artifacts, multi-auth (IAM, SSO, keys)
Lattice lattice.py YAML config parsing, validation, and S3 persistence
Topology topology.py DAG compiler — multi-layer nodes, edges, dependency resolution
Phase phase.py Dataclass models — task messages, I/O methods, checkpoints, errors
Microscope microscope.py REST API + MCP server — invoke topologies, query logs, manage configs
Incubation incubation.py Cron-based scheduler for scheduled nodes
Residue residue.py Structured logging with Redis persistence

Features

I/O Methods

Method Input Output
Streams Redis streams with batching, windowing, consumer groups Write records to downstream streams
SQL SELECT queries with pagination, SCD Type 2 support INSERT, MERGE, UPDATE with auto-schema creation
API HTTP GET with auth (bearer token, API key) HTTP POST with JSON payload

DAG Orchestration

  • YAML-driven topology — multi-layer DAG with dependency resolution
  • 15 node lifecycle states — PENDING, READY, DISPATCHED, RUNNING, LIVE, PAUSED, SUCCESS, FAILED, RETRYING, SKIPPED, etc.
  • Control signals — pause, resume, and shutdown topologies during execution
  • Live streaming nodes — run indefinitely until explicitly stopped

Batch Processing

  • Stateful checkpoints — cursor tracking, batch index, accumulated runtimes and memory per stage
  • Resumable execution — workers resume from the last checkpoint on failure or restart
  • Auto-pagination — SQL and stream inputs paginate automatically based on batch size

Monitoring & Control

  • REST API — health checks, topology invocation, log queries
  • MCP Server (FastMCP) — AI-friendly tools for topology management
  • Structured logging — per-component logs persisted to Redis with TTL
  • Performance profiling — runtime and memory tracking per processing stage

Storage

  • Redis — topology state, stream coordination, JSON cache, log persistence
  • S3-compatible — configs and artifacts via MinIO or AWS S3
  • Iceberg — via AWS Glue Data Catalog with DuckDB

Examples

See the examples/ folder for complete working pipelines:

  • skeleton/ — Minimal starter template (API → Streams → SQL)
  • scd2/ — SCD Type 2 pipeline with Open-Meteo weather API (18 US cities)

API Usage

Invoke topologies via the MCP client:

import asyncio
from fastmcp import Client

async def main():
    async with Client("http://localhost:8000/mcp") as client:
        # List available operations
        tools = await client.list_tools()
        
        # Invoke a topology
        result = await client.call_tool("invoke_topology", {"lattice_id": "scd2"})
        print(result)

asyncio.run(main())

TODO:

[ ] Update Controller to Not Exit On Redis Removal
[ ] Add Pause / Resume Functionality
[ ] Scheduling - This will come with Rust Integration
[ ] Node Retry Logic (How to Test) [ ] Data Retreival Fallback Updates
[ ] Update DuckLake to Use Catalog (Remove USE ___)
[ ] Improve Memory Metric Capture (Maybe Switch to Rust? Maybe update CheckpointMetadata?) [ ] Add Default Override to Config - If Defaults in Config global, layer, node level - override the in code defaults [ ] Ensure TTL on All Necessary keys / streams / etc.

Ideas:

  • Ack Node Logs (Reduce Existing TTL)
  • Add Lineage View to Lattice Page in UI

License

MIT License — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oxidizer_lite-0.1.4.tar.gz (347.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

oxidizer_lite-0.1.4-py3-none-any.whl (349.0 kB view details)

Uploaded Python 3

File details

Details for the file oxidizer_lite-0.1.4.tar.gz.

File metadata

  • Download URL: oxidizer_lite-0.1.4.tar.gz
  • Upload date:
  • Size: 347.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for oxidizer_lite-0.1.4.tar.gz
Algorithm Hash digest
SHA256 4ac9e21302b51e6e5eabfc82cad6343702bd05c84c94eb6a2f346c59ed6c3cf6
MD5 e2761055ad1923fef1fb84a25c23b12b
BLAKE2b-256 93d34422b11b1709f1fc0d63ad5e194dfb2af009722f9420c869e2fa196dfc09

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer_lite-0.1.4.tar.gz:

Publisher: release.yml on moos-engineering/oxidizer-lite

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer_lite-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: oxidizer_lite-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 349.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for oxidizer_lite-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 cd3c5be3c7e0b4fde4b9f4f54db1200ace391c35ba322cbcef7ecbeed260c24b
MD5 3c4dddd7e98e3d60645bad3c031e49ba
BLAKE2b-256 516ead2257c93574f917a4b874501b804292cf55bb74f7a6c349b2fa9d4e1448

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer_lite-0.1.4-py3-none-any.whl:

Publisher: release.yml on moos-engineering/oxidizer-lite

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page