Skip to main content

A distributed, event-driven DAG orchestration engine built on Redis Streams

Project description

Oxidizer-Lite

A distributed, event-driven DAG orchestration engine built on Redis Streams. Oxidizer-Lite coordinates data pipelines using a controller-worker pattern — configs define the topology, Redis handles dispatch, and workers scale horizontally.

Installation

pip install oxidizer-lite

Or install from source:

git clone https://github.com/moos-engineering/oxidizer-lite.git
cd oxidizer-lite
pip install -e .

Quick Start

1. Start infrastructure:

# Redis (with JSON module)
docker run -d --name redis -p 6379:6379 redis/redis-stack:latest

# MinIO (S3-compatible storage)
docker run -d --name minio -p 9000:9000 -p 9001:9001 \
  -e MINIO_ROOT_USER=minioadmin -e MINIO_ROOT_PASSWORD=minioadmin \
  minio/minio server /data --console-address ":9001"

2. Create a worker:

from oxidizer_lite import Reagent, CatalystConnection

catalyst = CatalystConnection(host="localhost", port=6379, db=0)
reagent = Reagent(catalyst)

@reagent.react()
def process(data: dict, context: dict):
    # Your processing logic here
    return data.get("input", [])

3. Run the controller:

from oxidizer_lite import Oxidizer, CatalystConnection, CrucibleConnection

catalyst = CatalystConnection(host="localhost", port=6379, db=0)
crucible = CrucibleConnection(
    s3_bucket="oxidizer-configs",
    s3_url="http://localhost:9000",
    access_key="minioadmin",
    secret_key="minioadmin"
)

oxidizer = Oxidizer(catalyst, crucible)
oxidizer.oxidize()

4. Start the API server:

from oxidizer_lite import Microscope, Catalyst, Crucible, CatalystConnection, CrucibleConnection

crucible = Crucible(CrucibleConnection(...))
catalyst = Catalyst(CatalystConnection(...))
microscope = Microscope(crucible=crucible, catalyst=catalyst)
microscope.run(host="0.0.0.0", port=8000)

Architecture

YAML Config (S3)
      │
      ▼
┌──────────────┐   Redis Streams    ┌─────────────┐
│  Controller  │◄──────────────────►│  Worker(s)  │
│  (Oxidizer)  │   task dispatch /  │  (Reagent)  │
└──────┬───────┘   checkpoints      └─────┬───────┘
       │                                  │
       ▼                                  ▼
┌──────────────┐                    ┌─────────────┐
│    Redis     │                    │ SQL / API / │
│  State Cache │                    │  Streams    │
└──────────────┘                    └─────────────┘

Controller reads invocations, compiles the DAG, dispatches ready nodes, and processes worker updates. Workers fetch inputs, execute user functions, write outputs, and report checkpoints — all through Redis Streams.

Project Structure

oxidizer-lite/
├── oxidizer_lite/          # Core package
│   ├── oxidizer.py         # DAG controller
│   ├── reagent.py          # Worker decorator
│   ├── catalyst.py         # Redis client
│   ├── anvil.py            # SQL/API engines
│   ├── crucible.py         # S3 storage
│   ├── microscope.py       # REST API + MCP
│   ├── lattice.py          # Config parser
│   ├── topology.py         # DAG compiler
│   ├── phase.py            # Data models
│   ├── incubation.py       # Scheduler
│   └── residue.py          # Logging
├── examples/
│   ├── skeleton/           # Minimal starter
│   └── scd2/               # SCD Type 2 pipeline
├── tests/
├── pyproject.toml
└── LICENSE

Components

Component Module Role
Oxidizer oxidizer.py Core orchestrator — topology lifecycle, node dispatch, checkpoint processing
Reagent reagent.py Distributed worker — input fetching, user function execution, output handling
Catalyst catalyst.py Redis cache and streams — consumer groups, JSON state, TTL management
Anvil anvil.py SQL engine (DuckDB, Iceberg) and REST API engine for data I/O
Crucible crucible.py S3-compatible object storage — configs, artifacts, multi-auth (IAM, SSO, keys)
Lattice lattice.py YAML config parsing, validation, and S3 persistence
Topology topology.py DAG compiler — multi-layer nodes, edges, dependency resolution
Phase phase.py Dataclass models — task messages, I/O methods, checkpoints, errors
Microscope microscope.py REST API + MCP server — invoke topologies, query logs, manage configs
Incubation incubation.py Cron-based scheduler for scheduled nodes
Residue residue.py Structured logging with Redis persistence

Features

I/O Methods

Method Input Output
Streams Redis streams with batching, windowing, consumer groups Write records to downstream streams
SQL SELECT queries with pagination, SCD Type 2 support INSERT, MERGE, UPDATE with auto-schema creation
API HTTP GET with auth (bearer token, API key) HTTP POST with JSON payload

DAG Orchestration

  • YAML-driven topology — multi-layer DAG with dependency resolution
  • 15 node lifecycle states — PENDING, READY, DISPATCHED, RUNNING, LIVE, PAUSED, SUCCESS, FAILED, RETRYING, SKIPPED, etc.
  • Control signals — pause, resume, and shutdown topologies during execution
  • Live streaming nodes — run indefinitely until explicitly stopped

Batch Processing

  • Stateful checkpoints — cursor tracking, batch index, accumulated runtimes and memory per stage
  • Resumable execution — workers resume from the last checkpoint on failure or restart
  • Auto-pagination — SQL and stream inputs paginate automatically based on batch size

Monitoring & Control

  • REST API — health checks, topology invocation, log queries
  • MCP Server (FastMCP) — AI-friendly tools for topology management
  • Structured logging — per-component logs persisted to Redis with TTL
  • Performance profiling — runtime and memory tracking per processing stage

Storage

  • Redis — topology state, stream coordination, JSON cache, log persistence
  • S3-compatible — configs and artifacts via MinIO or AWS S3
  • Iceberg — via AWS Glue Data Catalog with DuckDB

Examples

See the examples/ folder for complete working pipelines:

  • skeleton/ — Minimal starter template (API → Streams → SQL)
  • scd2/ — SCD Type 2 pipeline with Open-Meteo weather API (18 US cities)

API Usage

Invoke topologies via the MCP client:

import asyncio
from fastmcp import Client

async def main():
    async with Client("http://localhost:8000/mcp") as client:
        # List available operations
        tools = await client.list_tools()
        
        # Invoke a topology
        result = await client.call_tool("invoke_topology", {"lattice_id": "scd2"})
        print(result)

asyncio.run(main())

TODO:

[ ] Add Pause / Resume Functionality
[ ] Scheduling - This will come with Rust Integration
[ ] Node Retry Logic (How to Test) [ ] Data Retreival Fallback Updates
[ ] Improve Memory Metric Capture (Maybe Switch to Rust? Maybe update CheckpointMetadata?) [ ] Add Default Override to Config - If Defaults in Config global, layer, node level - override the in code defaults [ ] Ensure TTL on All Necessary keys / streams / etc.

Ideas:

  • Ack Node Logs (Reduce Existing TTL)
  • Add Lineage View to Lattice Page in UI

License

MIT License — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oxidizer_lite-0.1.7.tar.gz (348.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

oxidizer_lite-0.1.7-py3-none-any.whl (350.2 kB view details)

Uploaded Python 3

File details

Details for the file oxidizer_lite-0.1.7.tar.gz.

File metadata

  • Download URL: oxidizer_lite-0.1.7.tar.gz
  • Upload date:
  • Size: 348.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for oxidizer_lite-0.1.7.tar.gz
Algorithm Hash digest
SHA256 e109ad1a63654b0fd061e5680c2bc0ade60f46001d805821af6cc46fa3dbed45
MD5 9c268de9e7b6cabce6d7298ceff1bac7
BLAKE2b-256 411405c0d5e964e2ec1a997ccb72f62e00ccbe367f673f49594badbfe39b4ad7

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer_lite-0.1.7.tar.gz:

Publisher: release.yml on moos-engineering/oxidizer-lite

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file oxidizer_lite-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: oxidizer_lite-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 350.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for oxidizer_lite-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 c46429d914f7c83041988b7dd9264ca03d986a1b0b15f109080da47b5062e3a9
MD5 5accae3e8ed06ab14e0907c954b05b56
BLAKE2b-256 922b393187aa711e018115c32f3e30769a878f0fc21fd9e3f6ed8233caa6106c

See more details on using hashes here.

Provenance

The following attestation bundles were made for oxidizer_lite-0.1.7-py3-none-any.whl:

Publisher: release.yml on moos-engineering/oxidizer-lite

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page