Skip to main content

Your code is the graph. Async, streaming pipelines for AI.

Project description

justpipe

CI Coverage PyPI

Your code is the graph. Async, streaming pipelines for AI.

Installation

pip install justpipe

# With retry support (tenacity)
pip install "justpipe[retry]"

Quick Start

View Example Code

import asyncio
from dataclasses import dataclass
from justpipe import Pipe, EventType

@dataclass
class State:
    message: str = ""

# Type-safe pipeline definition
pipe = Pipe[State, None]()

@pipe.step()
async def respond(state: State):
    yield f"{state.message}, World!"

@pipe.step(to=respond)
async def greet(state: State):
    state.message = "Hello"

async def main():
    state = State()
    async for event in pipe.run(state):
        if event.type == EventType.TOKEN:
            print(event.data)  # "Hello, World!"

asyncio.run(main())

Features

  • Code-as-Graph - Define complex workflows using simple decorators (@step, @map, @switch).
  • Type-Safe - Full generic type support Pipe[StateT, ContextT] with static analysis.
  • Visualization - Generate beautiful Mermaid diagrams with pipe.graph().
  • Resilience - Built-in backpressure, retries, and timeouts.
  • Async & Streaming - Native asyncio support with generator streaming.
  • Zero dependencies - Core library is lightweight (dependencies only for extras).
  • Parallel execution - Fan-out with implicit barrier synchronization.
  • Validated - Graph integrity checks (cycles, broken references) with pipe.validate().
graph TD
    Start(["▶ Start"])

    subgraph parallel_n3[Parallel]
        direction LR
        n8["Search Knowledge Graph"]
        n9(["Search Vectors ⚡"])
        n10(["Search Web ⚡"])
    end

    n1["Build Context"]
    n3["Embed Query"]
    n4(["Format Output ⚡"])
    n5(["Generate Response ⚡"])
    n6["Parse Query"]
    n7["Rank Results"]
    End(["■ End"])

    Start --> n6
    n1 --> n5
    n3 --> n8
    n3 --> n9
    n3 --> n10
    n5 --> n4
    n6 --> n3
    n7 --> n1
    n8 --> n7
    n9 --> n7
    n10 --> n7
    n4 --> End

    subgraph utilities[Utilities]
        direction TB
        n0(["Analytics Logger ⚡"]):::isolated
        n2["Cache Manager"]:::isolated
    end

    %% Styling
    classDef default fill:#f8f9fa,stroke:#dee2e6,stroke-width:1px;
    classDef step fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1;
    classDef streaming fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100;
    classDef map fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#1b5e20;
    classDef switch fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c;
    classDef isolated fill:#fce4ec,stroke:#c2185b,stroke-width:2px,stroke-dasharray: 5 5,color:#880e4f;
    classDef startEnd fill:#e8f5e9,stroke:#388e3c,stroke-width:3px,color:#1b5e20;
    class n1,n3,n6,n7,n8 step;
    class n10,n4,n5,n9 streaming;
    class n0,n2 isolated;
    class Start,End startEnd;

Parallel Execution (DAG)

View Example Code

Static parallelism is defined by linking one step to multiple targets. Use barrier_timeout to prevent the pipeline from hanging if a parallel branch fails silently.

@pipe.step(barrier_timeout=5.0)
async def combine(state):
    # Implicit Barrier: Runs only after BOTH fetch_a and fetch_b complete
    state.result = state.a + state.b

@pipe.step(to=combine)
async def fetch_a(state):
    state.a = await fetch_from_api_a()

@pipe.step(to=combine)
async def fetch_b(state):
    state.b = await fetch_from_api_b()

@pipe.step(to=[fetch_a, fetch_b])
async def start(state):
    pass

Dynamic Parallelism (Map)

View Example Code

Use @pipe.map to process a list of items in parallel. The decorated function must return an iterable.

@pipe.step("worker")
async def worker(item: int, state):
    # 'item' is injected automatically because it's not a state/context arg
    print(f"Processing {item}")

@pipe.map(using=worker)
async def process_batch(state):
    # Spawns 'worker' step for each item in the returned list
    return [1, 2, 3]

Dynamic Routing (Switch)

View Example Code

Use @pipe.switch to route execution based on the return value.

@pipe.step("positive_handler")
async def handle_pos(state): ...

@pipe.step("negative_handler")
async def handle_neg(state): ...

@pipe.switch(routes={
    "pos": "positive_handler", 
    "neg": "negative_handler"
})
async def decide(state) -> str:
    return "pos" if state.value > 0 else "neg"

Alternatively, any step can simply return the name of the next step (as a string) to jump dynamically.

Suspension

View Example Code

Use Suspend to pause execution. The event stream will yield a SUSPEND event and then stop.

from justpipe import Suspend

@pipe.step("validate")
async def validate(state):
    if not state.is_ready:
        return Suspend(reason="wait_for_human")

Sub-pipelines (Composition)

View Example Code

Compose complex workflows by running other pipelines as steps.

sub_pipe = Pipe()
# ... define sub_pipe steps ...

@pipe.sub("execute_sub", using=sub_pipe)
async def delegate(state):
    # Pass the state (or a transformation of it) to the sub-pipeline
    return state

Streaming Tokens

View Example Code

@pipe.step("stream")
async def stream(state):
    for chunk in generate_response():
        yield chunk  # Yields TOKEN events

Reliability & Retries

View Example Code

justpipe has built-in support for tenacity if installed.

pip install "justpipe[retry]"
@pipe.step("flaky_api", retries=3, retry_wait_min=0.1)
async def flaky_api(state):
    # Will automatically retry on exception
    response = await unreliable_api_call()

Backpressure

Protect your application from memory exhaustion by limiting the event queue size. When the queue is full, producer steps will automatically block.

# Set a global limit for the pipe
pipe = Pipe(queue_size=100)

# Or override it at runtime
async for event in pipe.run(state, queue_size=10):
    ...

Middleware

View Example Code

Middleware wraps every step execution. Useful for logging, tracing, or error handling.

from justpipe import simple_logging_middleware, StepContext

pipe.add_middleware(simple_logging_middleware)

def custom_middleware(func, ctx: StepContext):
    async def wrapped(**kwargs):
        print(f"Entering {ctx.name}")
        return await func(**kwargs)
    return wrapped

Lifecycle Hooks

View Example Code

Hooks are useful for managing external resources like database connections or API clients.

@pipe.on_startup
async def setup(context):
    context.db = await connect_to_database()

@pipe.on_shutdown
async def cleanup(context):
    await context.db.close()

Visualization & Introspection

View Example Code

Inspect registered steps or generate Mermaid diagrams.

# Generate Mermaid graph
print(pipe.graph())

# Programmatic introspection
for step in pipe.steps():
    print(f"{step.name} -> {step.targets}")

Error Handling

Define how to recover from failures at the step or pipeline level.

from justpipe import Retry, Skip

# 1. Step-level handler
async def handle_api_error(error, state):
    return Retry() if isinstance(error, TransientError) else Skip()

@pipe.step(on_error=handle_api_error)
async def call_api(state):
    ...

# 2. Global handler
@pipe.on_error
async def global_handler(error, state, step_name):
    print(f"Global catch: {step_name} failed with {error}")

Development

justpipe uses uv for dependency management.

# Install development dependencies
uv sync --all-extras --dev

# Run tests
uv run pytest

# Run linting
uv run ruff check .

# Run type checks
uv run mypy justpipe

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

justpipe-0.5.0.tar.gz (96.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

justpipe-0.5.0-py3-none-any.whl (20.9 kB view details)

Uploaded Python 3

File details

Details for the file justpipe-0.5.0.tar.gz.

File metadata

  • Download URL: justpipe-0.5.0.tar.gz
  • Upload date:
  • Size: 96.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for justpipe-0.5.0.tar.gz
Algorithm Hash digest
SHA256 7db44c36f2a1670ef4f5744a6fab73f524b39d50e2b7723da8240e492ecb748f
MD5 50ee58277e71aac95d835adf24eb819d
BLAKE2b-256 cf96ee8ffd72fa5898ea95dd74e7ff935a7f97876e17b8c371457b226669c86e

See more details on using hashes here.

Provenance

The following attestation bundles were made for justpipe-0.5.0.tar.gz:

Publisher: release.yml on plar/justpipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file justpipe-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: justpipe-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 20.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for justpipe-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b9cc946c2811208726c2fbd1c8de787d635c5364bda598af8799867b49259be0
MD5 1b083e96468fe55763e37bf4ec66c556
BLAKE2b-256 4d3878a1f326400d51fa34c75efd22aade64ce408f8345eeb2eebdea522e4236

See more details on using hashes here.

Provenance

The following attestation bundles were made for justpipe-0.5.0-py3-none-any.whl:

Publisher: release.yml on plar/justpipe

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page