Skip to main content

Your code is the graph. Async, streaming pipelines for AI.

Project description

justpipe

CI Coverage PyPI

Your code is the graph. Async, streaming pipelines for AI.

Installation

pip install justpipe

# With retry support (tenacity)
pip install "justpipe[retry]"

Quick Start

View Example Code

import asyncio
from dataclasses import dataclass
from justpipe import Pipe, EventType

@dataclass
class State:
    message: str = ""

# Type-safe pipeline definition
pipe = Pipe[State, None]()

@pipe.step()
async def respond(state: State):
    yield f"{state.message}, World!"

@pipe.step(to=respond)
async def greet(state: State):
    state.message = "Hello"

async def main():
    state = State()
    async for event in pipe.run(state):
        if event.type == EventType.TOKEN:
            print(event.data)  # "Hello, World!"

asyncio.run(main())

Features

  • Code-as-Graph - Define complex workflows using simple decorators (@step, @map, @switch).
  • Type-Safe - Full generic type support Pipe[StateT, ContextT] with static analysis.
  • Visualization - Generate beautiful Mermaid diagrams with pipe.graph().
  • Resilience - Built-in backpressure, retries, and timeouts.
  • Async & Streaming - Native asyncio support with generator streaming.
  • Zero dependencies - Core library is lightweight (dependencies only for extras).
  • Parallel execution - Fan-out with implicit barrier synchronization.
  • Validated - Graph integrity checks (cycles, broken references) with pipe.validate().
graph TD
    Start(["▶ Start"])

    subgraph parallel_n3[Parallel]
        direction LR
        n8["Search Knowledge Graph"]
        n9(["Search Vectors ⚡"])
        n10(["Search Web ⚡"])
    end

    n1["Build Context"]
    n3["Embed Query"]
    n4(["Format Output ⚡"])
    n5(["Generate Response ⚡"])
    n6["Parse Query"]
    n7["Rank Results"]
    End(["■ End"])

    Start --> n6
    n1 --> n5
    n3 --> n8
    n3 --> n9
    n3 --> n10
    n5 --> n4
    n6 --> n3
    n7 --> n1
    n8 --> n7
    n9 --> n7
    n10 --> n7
    n4 --> End

    subgraph utilities[Utilities]
        direction TB
        n0(["Analytics Logger ⚡"]):::isolated
        n2["Cache Manager"]:::isolated
    end

    %% Styling
    classDef default fill:#f8f9fa,stroke:#dee2e6,stroke-width:1px;
    classDef step fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1;
    classDef streaming fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100;
    classDef map fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#1b5e20;
    classDef switch fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c;
    classDef isolated fill:#fce4ec,stroke:#c2185b,stroke-width:2px,stroke-dasharray: 5 5,color:#880e4f;
    classDef startEnd fill:#e8f5e9,stroke:#388e3c,stroke-width:3px,color:#1b5e20;
    class n1,n3,n6,n7,n8 step;
    class n10,n4,n5,n9 streaming;
    class n0,n2 isolated;
    class Start,End startEnd;

Parallel Execution (DAG)

View Example Code

Static parallelism is defined by linking one step to multiple targets. Use barrier_timeout to prevent the pipeline from hanging if a parallel branch fails silently.

@pipe.step(barrier_timeout=5.0)
async def combine(state):
    # Implicit Barrier: Runs only after BOTH fetch_a and fetch_b complete
    state.result = state.a + state.b

@pipe.step(to=combine)
async def fetch_a(state):
    state.a = await fetch_from_api_a()

@pipe.step(to=combine)
async def fetch_b(state):
    state.b = await fetch_from_api_b()

@pipe.step(to=[fetch_a, fetch_b])
async def start(state):
    pass

Dynamic Parallelism (Map)

View Example Code

Use @pipe.map to process a list of items in parallel. The decorated function must return an iterable.

@pipe.step("worker")
async def worker(item: int, state):
    # 'item' is injected automatically because it's not a state/context arg
    print(f"Processing {item}")

@pipe.map(using=worker)
async def process_batch(state):
    # Spawns 'worker' step for each item in the returned list
    return [1, 2, 3]

Dynamic Routing (Switch)

View Example Code

Use @pipe.switch to route execution based on the return value.

@pipe.step("positive_handler")
async def handle_pos(state): ...

@pipe.step("negative_handler")
async def handle_neg(state): ...

@pipe.switch(routes={
    "pos": "positive_handler", 
    "neg": "negative_handler"
})
async def decide(state) -> str:
    return "pos" if state.value > 0 else "neg"

Alternatively, any step can simply return the name of the next step (as a string) to jump dynamically.

Suspension

View Example Code

Use Suspend to pause execution. The event stream will yield a SUSPEND event and then stop.

from justpipe import Suspend

@pipe.step("validate")
async def validate(state):
    if not state.is_ready:
        return Suspend(reason="wait_for_human")

Sub-pipelines (Composition)

View Example Code

Compose complex workflows by running other pipelines as steps.

sub_pipe = Pipe()
# ... define sub_pipe steps ...

@pipe.sub("execute_sub", using=sub_pipe)
async def delegate(state):
    # Pass the state (or a transformation of it) to the sub-pipeline
    return state

Streaming Tokens

View Example Code

@pipe.step("stream")
async def stream(state):
    for chunk in generate_response():
        yield chunk  # Yields TOKEN events

Reliability & Retries

View Example Code

justpipe has built-in support for tenacity if installed.

pip install "justpipe[retry]"
@pipe.step("flaky_api", retries=3, retry_wait_min=0.1)
async def flaky_api(state):
    # Will automatically retry on exception
    response = await unreliable_api_call()

Backpressure

Protect your application from memory exhaustion by limiting the event queue size. When the queue is full, producer steps will automatically block.

# Set a global limit for the pipe
pipe = Pipe(queue_size=100)

# Or override it at runtime
async for event in pipe.run(state, queue_size=10):
    ...

Middleware

View Example Code

Middleware wraps every step execution. Useful for logging, tracing, or error handling.

from justpipe import simple_logging_middleware, StepContext

pipe.add_middleware(simple_logging_middleware)

def custom_middleware(func, ctx: StepContext):
    async def wrapped(**kwargs):
        print(f"Entering {ctx.name}")
        return await func(**kwargs)
    return wrapped

Lifecycle Hooks

View Example Code

Hooks are useful for managing external resources like database connections or API clients. Hooks use the same dependency-injection rules as steps, so you can type or name parameters for state/context.

@pipe.on_startup
async def setup(context):
    context.db = await connect_to_database()

@pipe.on_shutdown
async def cleanup(context):
    await context.db.close()

Visualization & Introspection

View Example Code

Inspect registered steps or generate Mermaid diagrams.

# Generate Mermaid graph
print(pipe.graph())

# Programmatic introspection
for step in pipe.steps():
    print(f"{step.name} -> {step.targets}")

Error Handling

Define how to recover from failures at the step or pipeline level.

from justpipe import Retry, Skip

# 1. Step-level handler
async def handle_api_error(error, state):
    return Retry() if isinstance(error, TransientError) else Skip()

@pipe.step(on_error=handle_api_error)
async def call_api(state):
    ...

# 2. Global handler
@pipe.on_error
async def global_handler(error, state, step_name):
    print(f"Global catch: {step_name} failed with {error}")

Development

justpipe uses uv for dependency management.

# Install development dependencies
uv sync --all-extras --dev

# Run tests
uv run pytest

# Run linting
uv run ruff check .

# Run type checks
uv run mypy justpipe

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

justpipe-0.5.4.tar.gz (101.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

justpipe-0.5.4-py3-none-any.whl (30.1 kB view details)

Uploaded Python 3

File details

Details for the file justpipe-0.5.4.tar.gz.

File metadata

  • Download URL: justpipe-0.5.4.tar.gz
  • Upload date:
  • Size: 101.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for justpipe-0.5.4.tar.gz
Algorithm Hash digest
SHA256 96707172bd2a5edbd298097a40c1e80804ea30008ce0def41e323b7ad8960410
MD5 0999f47f52e9ed311e29658136a9446a
BLAKE2b-256 f48c95227731a424ac99b9ad4a2008ce3ab2b406706f0769e29aee2934c1c48c

See more details on using hashes here.

Provenance

The following attestation bundles were made for justpipe-0.5.4.tar.gz:

Publisher: release.yml on plar/justpipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file justpipe-0.5.4-py3-none-any.whl.

File metadata

  • Download URL: justpipe-0.5.4-py3-none-any.whl
  • Upload date:
  • Size: 30.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for justpipe-0.5.4-py3-none-any.whl
Algorithm Hash digest
SHA256 1ca9b70b87fbf1054631219230cab091ca62dda3b8be03c2d180e8b66869a0b9
MD5 758e9c4f076d4767c3a153f0d2bd7e1a
BLAKE2b-256 483474cd9d551ed0d05f01a384c337a432a3e12e020f90321c1b244ef21171af

See more details on using hashes here.

Provenance

The following attestation bundles were made for justpipe-0.5.4-py3-none-any.whl:

Publisher: release.yml on plar/justpipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page