Skip to main content

Zero-cost, crash-proof LLM pipeline orchestrator

Project description

DagPipe

Zero-cost, crash-proof LLM pipeline orchestrator.

Tests Python License Version Dependencies

Building with LLMs is too expensive and too fragile. Pipelines break mid-run. Rate limits waste completed work. Paying for GPT-4 on every node is overkill. DagPipe fixes all three.

It turns any multi-step LLM workflow into a resilient, checkpointed DAG that routes tasks to the right free-tier model — and resumes from the last successful step if anything goes wrong.


Why DagPipe

Problem DagPipe Solution
Pipeline crashes = start over JSON checkpointing — resume from last successful node
Paying for large models on simple tasks Complexity-based routing to free-tier LLMs
LLM returns malformed JSON Pydantic validation + automatic retry with error feedback
Tight coupling to one LLM provider Provider-agnostic — wire any callable as your model
Fragile sequential chains Explicit DAG with topological sort and cycle detection

Installation

pip install dagpipe

Requirements: Python 3.12+ · pydantic ≥ 2.0 · pyyaml


Quickstart

from pathlib import Path
from dagpipe.dag import PipelineOrchestrator, DAGNode
from dagpipe.router import ModelRouter
from dagpipe.constrained import constrained_generate

# ── 1. Define your node functions ─────────────────────────────
def research(context, model):
    # model is whatever callable your router selected
    prompt = [{"role": "user", "content": f"Research: {context['topic']}"}]
    raw = model(prompt)
    return {"summary": raw}

def write_draft(context, model):
    summary = context["research"]["summary"]
    prompt = [{"role": "user", "content": f"Write an article based on: {summary}"}]
    raw = model(prompt)
    return {"draft": raw}

def publish(context, model):
    # Deterministic node — no LLM needed
    print(f"Publishing: {context['write_draft']['draft'][:100]}...")
    return {"status": "published", "url": "https://example.com/article"}


# ── 2. Wire your LLM providers ────────────────────────────────
import groq  # or any OpenAI-compatible client

client = groq.Groq()

def groq_70b(messages):
    return client.chat.completions.create(
        model="llama-3.3-70b-versatile", messages=messages
    ).choices[0].message.content

def groq_8b(messages):
    return client.chat.completions.create(
        model="llama3-8b-8192", messages=messages
    ).choices[0].message.content


# ── 3. Build the router ───────────────────────────────────────
router = ModelRouter(
    low_complexity_fn=groq_8b,       label_low="groq_8b",
    high_complexity_fn=groq_70b,     label_high="groq_70b",
    fallback_fn=groq_8b,             label_fallback="groq_8b_fallback",
    complexity_threshold=0.6,
)


# ── 4. Define the DAG ─────────────────────────────────────────
nodes = [
    DAGNode(id="research",    fn_name="research",    complexity=0.4),
    DAGNode(id="write_draft", fn_name="write_draft", complexity=0.7,
            depends_on=["research"]),
    DAGNode(id="publish",     fn_name="publish",
            depends_on=["write_draft"], is_deterministic=True),
]


# ── 5. Run it ─────────────────────────────────────────────────
orchestrator = PipelineOrchestrator(
    nodes=nodes,
    node_registry={
        "research":    research,
        "write_draft": write_draft,
        "publish":     publish,
    },
    router=router,
    checkpoint_dir=Path(".dagpipe/checkpoints"),
    max_retries=3,
    on_node_complete=lambda node_id, result, duration:
        print(f"  ✓ {node_id} ({duration:.1f}s)"),
)

result = orchestrator.run(initial_state={"topic": "AI in African fintech"})

Crash mid-run? Delete nothing. Just re-run. DagPipe reads the checkpoints and skips completed nodes automatically.


How It Works

Your Tasks (YAML or Python list of DAGNodes)
                    │
                    ▼
         ┌──────────────────┐
         │  Topological     │  resolves execution order,
         │  Sort            │  detects cycles before running
         └────────┬─────────┘
                  │
        ┌─────────▼──────────┐
        │  Checkpoint        │  restores any completed nodes
        │  Restore           │  from previous runs
        └─────────┬──────────┘
                  │
          ┌───────▼────────┐
          │  For each node │◄─────────────────────────┐
          └───────┬────────┘                          │
                  │                                   │
        ┌─────────▼──────────┐    ┌────────────────┐  │
        │  ModelRouter       │───▶│ low / high /   │  │
        │  (complexity score)│    │ fallback fn    │  │
        └─────────┬──────────┘    └────────────────┘  │
                  │                                   │
        ┌─────────▼──────────┐                        │
        │  Constrained       │  forces valid output   │
        │  Generator         │  retries with error    │
        └─────────┬──────────┘  feedback on failure   │
                  │                                   │
        ┌─────────▼──────────┐                        │
        │  Checkpoint Save   │  writes result to disk │
        └─────────┬──────────┘                        │
                  │                                   │
          crash here = resume from ✓            next node

Core Modules

dagpipe.dag — The Orchestrator

The central engine. Loads a DAG from a Python list or YAML file, sorts nodes by dependency, and executes them in order with checkpointing and retry.

from dagpipe.dag import PipelineOrchestrator, DAGNode, load_dag

# Load from YAML
nodes = load_dag(Path("my_pipeline.yaml"))

# Or define in Python
nodes = [DAGNode(id="step_a", fn_name="fn_a", complexity=0.3)]

dagpipe.checkpoints — Crash Recovery

Saves node output to disk after every successful execution. On resume, completed nodes are skipped entirely.

from dagpipe.checkpoints import checkpoint, restore, checkpoint_exists

checkpoint("node_id", {"output": "data"}, checkpoint_dir=Path(".dagpipe"))
data = restore("node_id", checkpoint_dir=Path(".dagpipe"))  # None if not found

dagpipe.router — Intelligent Model Selection

Routes tasks to the cheapest model that can handle them. Tracks rate limit budgets and escalates on retry.

from dagpipe.router import ModelRouter, classify_complexity

score = classify_complexity("implement OAuth authentication", token_count=1200)
# → 0.8 (high — triggers high_complexity_fn)

router = ModelRouter(
    low_complexity_fn=cheap_model,   label_low="7b",
    high_complexity_fn=smart_model,  label_high="70b",
    fallback_fn=backup_model,        label_fallback="backup",
)
fn, label = router.route(complexity=0.8)

dagpipe.constrained — Guaranteed Structured Output

Wraps any LLM call with Pydantic schema validation. On failure, injects the error back into the prompt and retries automatically.

from pydantic import BaseModel
from dagpipe.constrained import constrained_generate

class ArticleOutput(BaseModel):
    title: str
    body: str
    word_count: int

result = constrained_generate(
    messages=[{"role": "user", "content": "Write a short article about AI."}],
    schema=ArticleOutput,
    llm_call_fn=my_llm,
    max_retries=3,
)
# result is a validated ArticleOutput instance — guaranteed

YAML Pipeline Definition

# my_pipeline.yaml
nodes:
  - id: research
    fn: research_fn
    complexity: 0.4
    description: "Gather source material"

  - id: summarize
    fn: summarize_fn
    depends_on: [research]
    complexity: 0.5
    description: "Compress to key points"

  - id: publish
    fn: publish_fn
    depends_on: [summarize]
    complexity: 0.0
    is_deterministic: true
    description: "Push to CMS  no LLM needed"

Use Cases

  • Content pipelines — Research → draft → edit → publish with zero loss on failure
  • Code generation — Spec → scaffold → implement → test across free models
  • Data extraction — Fetch → parse → validate → store with schema enforcement
  • API integrations — Multi-step workflows where any step can fail and retry
  • Automated reporting — Collect → analyze → format → deliver on a schedule

Zero-Cost Stack

DagPipe is designed to run entirely on free tiers:

Provider Model Free Tier
Groq Llama 3.3 70B 30 req/min
Groq Llama 3 8B 30 req/min
Google Gemini 2.0 Flash 15 req/min
Modal Any 7B model 30 GPU-sec/day
Ollama Any model Local, unlimited

Wire any of these as your low_complexity_fn, high_complexity_fn, or fallback_fn. DagPipe is provider-agnostic.


Project Status

Phase 1 — Core Library         ████████████████████  COMPLETE
Phase 2 — PyPI Publish         ████░░░░░░░░░░░░░░░░  IN PROGRESS  
Phase 3 — MCP Servers          ░░░░░░░░░░░░░░░░░░░░  UPCOMING
Phase 4 — Auto-Migrator        ░░░░░░░░░░░░░░░░░░░░  UPCOMING

Test coverage: 37 tests · 4 modules · 0 regressions


Contributing

Issues and PRs welcome. Please read the contribution guidelines before submitting.


License

MIT License — Built for the global developer community.


Built by @devilsfave ·

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagpipe_core-0.1.0.tar.gz (20.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagpipe_core-0.1.0-py3-none-any.whl (14.0 kB view details)

Uploaded Python 3

File details

Details for the file dagpipe_core-0.1.0.tar.gz.

File metadata

  • Download URL: dagpipe_core-0.1.0.tar.gz
  • Upload date:
  • Size: 20.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dagpipe_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 18e3d1f5ba0b6744b77f2e1532f2d253209d6b849eae386948e01b880db80b42
MD5 2fbac59a0e0ff2e8699511cab95515a2
BLAKE2b-256 3f89619c2068d2335e064a7ad7dbce10feb3c16bdb8cf7c17d8c7d3e50c7c899

See more details on using hashes here.

File details

Details for the file dagpipe_core-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dagpipe_core-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for dagpipe_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9221f7e1e7c9883d6ad17c4fb49fcdafe5c18bece8031ebf2efc5e03e638e1fd
MD5 dd7cb0b747f9fa06c75e1363a131007d
BLAKE2b-256 3dfe03ce4a200e31815531bc96910853d2ff984a287155700e54e29faac0f5eb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page