Your code is the graph. Async, streaming pipelines for AI.
Project description
justpipe
Your code is the graph. Async, streaming pipelines for AI.
Installation
pip install justpipe
# With retry support (tenacity)
pip install "justpipe[retry]"
Quick Start
import asyncio
from dataclasses import dataclass
from justpipe import Pipe, EventType
@dataclass
class State:
message: str = ""
# Type-safe pipeline definition
pipe = Pipe[State, None]()
@pipe.step()
async def respond(state: State):
yield f"{state.message}, World!"
@pipe.step(to=respond)
async def greet(state: State):
state.message = "Hello"
async def main():
state = State()
async for event in pipe.run(state):
if event.type == EventType.TOKEN:
print(event.data) # "Hello, World!"
asyncio.run(main())
Features
- Code-as-Graph - Define complex workflows using simple decorators (
@step,@map,@switch). - Type-Safe - Full generic type support
Pipe[StateT, ContextT]with static analysis. - Visualization - Generate beautiful Mermaid diagrams with
pipe.graph(). - Resilience - Built-in backpressure, retries, and timeouts.
- Async & Streaming - Native
asynciosupport with generator streaming. - Zero dependencies - Core library is lightweight (dependencies only for extras).
- Parallel execution - Fan-out with implicit barrier synchronization.
- Validated - Graph integrity checks (cycles, broken references) with
pipe.validate().
graph TD
Start(["▶ Start"])
subgraph parallel_n3[Parallel]
direction LR
n8["Search Knowledge Graph"]
n9(["Search Vectors ⚡"])
n10(["Search Web ⚡"])
end
n1["Build Context"]
n3["Embed Query"]
n4(["Format Output ⚡"])
n5(["Generate Response ⚡"])
n6["Parse Query"]
n7["Rank Results"]
End(["■ End"])
Start --> n6
n1 --> n5
n3 --> n8
n3 --> n9
n3 --> n10
n5 --> n4
n6 --> n3
n7 --> n1
n8 --> n7
n9 --> n7
n10 --> n7
n4 --> End
subgraph utilities[Utilities]
direction TB
n0(["Analytics Logger ⚡"]):::isolated
n2["Cache Manager"]:::isolated
end
%% Styling
classDef default fill:#f8f9fa,stroke:#dee2e6,stroke-width:1px;
classDef step fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1;
classDef streaming fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100;
classDef map fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#1b5e20;
classDef switch fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c;
classDef isolated fill:#fce4ec,stroke:#c2185b,stroke-width:2px,stroke-dasharray: 5 5,color:#880e4f;
classDef startEnd fill:#e8f5e9,stroke:#388e3c,stroke-width:3px,color:#1b5e20;
class n1,n3,n6,n7,n8 step;
class n10,n4,n5,n9 streaming;
class n0,n2 isolated;
class Start,End startEnd;
Parallel Execution (DAG)
Static parallelism is defined by linking one step to multiple targets. Use barrier_timeout to prevent the pipeline from hanging if a parallel branch fails silently.
@pipe.step(barrier_timeout=5.0)
async def combine(state):
# Implicit Barrier: Runs only after BOTH fetch_a and fetch_b complete
state.result = state.a + state.b
@pipe.step(to=combine)
async def fetch_a(state):
state.a = await fetch_from_api_a()
@pipe.step(to=combine)
async def fetch_b(state):
state.b = await fetch_from_api_b()
@pipe.step(to=[fetch_a, fetch_b])
async def start(state):
pass
Dynamic Parallelism (Map)
Use @pipe.map to process a list of items in parallel. The decorated function must return an iterable.
@pipe.step("worker")
async def worker(item: int, state):
# 'item' is injected automatically because it's not a state/context arg
print(f"Processing {item}")
@pipe.map(using=worker)
async def process_batch(state):
# Spawns 'worker' step for each item in the returned list
return [1, 2, 3]
Dynamic Routing (Switch)
Use @pipe.switch to route execution based on the return value.
@pipe.step("positive_handler")
async def handle_pos(state): ...
@pipe.step("negative_handler")
async def handle_neg(state): ...
@pipe.switch(routes={
"pos": "positive_handler",
"neg": "negative_handler"
})
async def decide(state) -> str:
return "pos" if state.value > 0 else "neg"
Alternatively, any step can simply return the name of the next step (as a string) to jump dynamically.
Suspension
Use Suspend to pause execution. The event stream will yield a SUSPEND event and then stop.
from justpipe import Suspend
@pipe.step("validate")
async def validate(state):
if not state.is_ready:
return Suspend(reason="wait_for_human")
Sub-pipelines (Composition)
Compose complex workflows by running other pipelines as steps.
sub_pipe = Pipe()
# ... define sub_pipe steps ...
@pipe.sub("execute_sub", using=sub_pipe)
async def delegate(state):
# Pass the state (or a transformation of it) to the sub-pipeline
return state
Streaming Tokens
@pipe.step("stream")
async def stream(state):
for chunk in generate_response():
yield chunk # Yields TOKEN events
Reliability & Retries
justpipe has built-in support for tenacity if installed.
pip install "justpipe[retry]"
@pipe.step("flaky_api", retries=3, retry_wait_min=0.1)
async def flaky_api(state):
# Will automatically retry on exception
response = await unreliable_api_call()
Backpressure
Protect your application from memory exhaustion by limiting the event queue size. When the queue is full, producer steps will automatically block.
# Set a global limit for the pipe
pipe = Pipe(queue_size=100)
# Or override it at runtime
async for event in pipe.run(state, queue_size=10):
...
Middleware
Middleware wraps every step execution. Useful for logging, tracing, or error handling.
from justpipe import simple_logging_middleware, StepContext
pipe.add_middleware(simple_logging_middleware)
def custom_middleware(func, ctx: StepContext):
async def wrapped(**kwargs):
print(f"Entering {ctx.name}")
return await func(**kwargs)
return wrapped
Lifecycle Hooks
Hooks are useful for managing external resources like database connections or API clients.
@pipe.on_startup
async def setup(context):
context.db = await connect_to_database()
@pipe.on_shutdown
async def cleanup(context):
await context.db.close()
Visualization & Introspection
Inspect registered steps or generate Mermaid diagrams.
# Generate Mermaid graph
print(pipe.graph())
# Programmatic introspection
for step in pipe.steps():
print(f"{step.name} -> {step.targets}")
Error Handling
Define how to recover from failures at the step or pipeline level.
from justpipe import Retry, Skip
# 1. Step-level handler
async def handle_api_error(error, state):
return Retry() if isinstance(error, TransientError) else Skip()
@pipe.step(on_error=handle_api_error)
async def call_api(state):
...
# 2. Global handler
@pipe.on_error
async def global_handler(error, state, step_name):
print(f"Global catch: {step_name} failed with {error}")
Development
justpipe uses uv for dependency management.
# Install development dependencies
uv sync --all-extras --dev
# Run tests
uv run pytest
# Run linting
uv run ruff check .
# Run type checks
uv run mypy justpipe
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file justpipe-0.5.0.tar.gz.
File metadata
- Download URL: justpipe-0.5.0.tar.gz
- Upload date:
- Size: 96.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7db44c36f2a1670ef4f5744a6fab73f524b39d50e2b7723da8240e492ecb748f
|
|
| MD5 |
50ee58277e71aac95d835adf24eb819d
|
|
| BLAKE2b-256 |
cf96ee8ffd72fa5898ea95dd74e7ff935a7f97876e17b8c371457b226669c86e
|
Provenance
The following attestation bundles were made for justpipe-0.5.0.tar.gz:
Publisher:
release.yml on plar/justpipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
justpipe-0.5.0.tar.gz -
Subject digest:
7db44c36f2a1670ef4f5744a6fab73f524b39d50e2b7723da8240e492ecb748f - Sigstore transparency entry: 836104545
- Sigstore integration time:
-
Permalink:
plar/justpipe@61b14ed644f2f436b02ce218efb3ff02e86010ac -
Branch / Tag:
refs/tags/v0.5.0 - Owner: https://github.com/plar
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@61b14ed644f2f436b02ce218efb3ff02e86010ac -
Trigger Event:
push
-
Statement type:
File details
Details for the file justpipe-0.5.0-py3-none-any.whl.
File metadata
- Download URL: justpipe-0.5.0-py3-none-any.whl
- Upload date:
- Size: 20.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b9cc946c2811208726c2fbd1c8de787d635c5364bda598af8799867b49259be0
|
|
| MD5 |
1b083e96468fe55763e37bf4ec66c556
|
|
| BLAKE2b-256 |
4d3878a1f326400d51fa34c75efd22aade64ce408f8345eeb2eebdea522e4236
|
Provenance
The following attestation bundles were made for justpipe-0.5.0-py3-none-any.whl:
Publisher:
release.yml on plar/justpipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
justpipe-0.5.0-py3-none-any.whl -
Subject digest:
b9cc946c2811208726c2fbd1c8de787d635c5364bda598af8799867b49259be0 - Sigstore transparency entry: 836104549
- Sigstore integration time:
-
Permalink:
plar/justpipe@61b14ed644f2f436b02ce218efb3ff02e86010ac -
Branch / Tag:
refs/tags/v0.5.0 - Owner: https://github.com/plar
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@61b14ed644f2f436b02ce218efb3ff02e86010ac -
Trigger Event:
push
-
Statement type: