Your code is the graph. Async, streaming pipelines for AI.
Project description
justpipe
Your code is the graph. A zero-dependency, async orchestration library for building event-driven AI pipelines.
Why justpipe?
Building production AI agents often leads to a mess of nested asyncio loops, manual state management, and brittle error handling. justpipe provides a lightweight, Pythonic abstraction that lets you focus on logic while it handles the orchestration.
- Zero Dependencies: Pure Python. No transitive deps in the core library.
- Async-First: Built from the ground up for modern Python concurrency.
- Developer Experience: "The code is the graph." No complex YAML or UI builders -- your decorators define the topology.
- Battle-Tested: 560+ tests across unit, integration, functional, and fuzzing layers. Built-in retries, backpressure, and structured observability.
Installation
pip install justpipe # Core (zero dependencies)
pip install "justpipe[cli]" # With CLI tools (click, rich)
pip install "justpipe[retry]" # With retry support (tenacity)
Quickstart: AI Search Pipeline
Define a type-safe pipeline that fetches search results, generates a prompt, and streams an LLM response. Steps execute top-to-bottom -- to= wires the flow.
import asyncio
from dataclasses import dataclass, field
from justpipe import Pipe, EventType
@dataclass
class AIState:
query: str
context: list[str] = field(default_factory=list)
response: str = ""
pipe = Pipe(AIState)
@pipe.step(to="generate_prompt")
async def search_vectors(state: AIState):
"""Simulate a RAG vector search."""
state.context.append("Vector DB result: Python is fun.")
@pipe.step(to="call_llm")
async def generate_prompt(state: AIState):
"""Format the prompt using gathered context."""
state.response = f"Context: {state.context}\nQuery: {state.query}"
@pipe.step()
async def call_llm(state: AIState):
"""A streaming step yielding tokens from an LLM."""
for chunk in ["Based ", "on ", "your ", "data..."]:
yield chunk
async def main():
state = AIState(query="Tell me about Python")
async for event in pipe.run(state):
if event.type == EventType.TOKEN:
print(event.payload, end="", flush=True)
print("\nPipeline Complete!")
if __name__ == "__main__":
asyncio.run(main())
The Imports You Need
from justpipe import Pipe, EventType # Core
from justpipe import TestPipe # Testing
from justpipe import Skip, Retry, Raise # Error control
from justpipe.types import StepContext # Advanced: middleware
from justpipe.failures import FailureKind # Advanced: failure classification
Built-in Testing Harness
TestPipe is a first-class testing tool that ships with the library. Mock steps, capture events, and make precise assertions -- no external test utilities required.
from justpipe import TestPipe
async def test_my_workflow():
with TestPipe(pipe) as tester:
# Mock a streaming LLM call
async def mock_llm(state):
yield "Mocked answer"
tester.mock("call_llm", side_effect=mock_llm)
# Execute the pipeline
result = await tester.run(AIState(query="test"))
# Precise assertions
assert result.was_called("search_vectors")
assert "Mocked answer" in result.tokens
Your Code is the Graph
In justpipe, decorators do the heavy lifting. They register your functions into a Directed Acyclic Graph (DAG) during import time. The to= parameter wires the flow.
@pipe.step(to="respond") # Forward reference via string
async def greet(state):
state.message = "Hello"
@pipe.step()
async def respond(state):
yield f"{state.message}, World!"
- Type-Safe DI: Automatically injects
state,context, or even specific items from map workers based on type hints or parameter names. - Mutable State: Designed around Python
dataclasses. No complex state reduction -- just mutate your state object directly. - Context-Aware Routing: Use
@pipe.switchfor dynamic branching or@pipe.mapfor massive fan-out parallelism.
Common Patterns
Parallel Execution
Run steps concurrently by linking a single step to multiple targets. barrier_timeout prevents hangs if a branch fails.
@pipe.step(barrier_timeout=5.0)
async def combine(state):
# Runs only after BOTH fetch_a and fetch_b complete
state.result = state.a + state.b
@pipe.step(to=combine)
async def fetch_a(state):
state.a = await fetch_api_a()
@pipe.step(to=combine)
async def fetch_b(state):
state.b = await fetch_api_b()
Dynamic Mapping (Fan-Out)
Process lists in parallel. justpipe manages the workers and results for you.
@pipe.step("worker")
async def process_item(item: int, state):
# 'item' is automatically injected from the list below
print(f"Processing {item}")
@pipe.map(each="worker", max_concurrency=5)
async def create_batch(state):
# Spawns a 'worker' step for every item returned
return [1, 2, 3, 4, 5]
Conditional Logic (Switch)
Route execution dynamically based on return values.
@pipe.switch(to={
"ok": "process_success",
"error": "handle_failure"
})
async def validate(state) -> str:
return "ok" if state.is_valid else "error"
Sub-pipelines & Composition
Compose complex workflows by nesting pipelines as single steps, allowing for modular and reusable logic.
Suspension & Human-in-the-Loop
Pause execution to wait for external input or human approval, then resume exactly where it left off.
Command Line Interface (CLI)
justpipe ships with a CLI for debugging and performance analysis. Install it with pip install "justpipe[cli]".
# List recent pipeline runs
justpipe list
# Show detailed events for a specific run
justpipe show <run-id>
# Generate an interactive HTML timeline
justpipe timeline <run-id> --format html
# Compare two runs to detect performance regressions
justpipe compare <run-id-1> <run-id-2>
Comparison
| Feature | justpipe | LangGraph | Prefect | Hamilton | Raw asyncio |
|---|---|---|---|---|---|
| Zero dependencies | Yes | No | No | No | Yes |
| Async-native | Yes | Partial | Yes | No | Yes |
| Token streaming | Built-in | Manual | No | No | Manual |
| Graph-from-code | Decorators | State machine | Decorators | Functions | N/A |
| Built-in test harness | TestPipe |
No | No | No | No |
| Backpressure | Built-in | No | No | No | Manual |
| Human-in-the-loop | Suspend |
interrupt |
pause |
No | Manual |
Production-Grade Reliability
A workflow engine is only as good as its test suite. justpipe is built with a rigorous testing philosophy, ensuring your pipelines behave predictably under load and failure.
Multi-Layer Testing
Our suite follows a strict separation of concerns:
- Unit Tests: Isolated testing of internal components using mocks and fakes.
- Integration Tests: Verification of interactions across storage boundaries and module interfaces.
- Functional Tests: End-to-end validation of pipeline contracts, event ordering, and concurrency.
- Fuzzing: Randomized property-based testing to uncover edge cases in graph execution.
Visualization
See your logic come to life. Generate beautiful Mermaid diagrams directly from your pipe instance.
print(pipe.graph())
graph TD
Start(["▶ Start"])
subgraph parallel_n3[Parallel]
direction LR
n8["Search Knowledge Graph"]
n9(["Search Vectors ⚡"])
n10(["Search Web ⚡"])
end
n1["Build Context"]
n3["Embed Query"]
n4(["Format Output ⚡"])
n5(["Generate Response ⚡"])
n6["Parse Query"]
n7["Rank Results"]
End(["■ End"])
n4 --> End
Start --> n6
n1 --> n5
n3 --> n8
n3 --> n9
n3 --> n10
n5 --> n4
n6 --> n3
n7 --> n1
n8 --> n7
n9 --> n7
n10 --> n7
subgraph utilities[Utilities]
direction TB
n0(["Analytics Logger ⚡"]):::isolated
n2["Cache Manager"]:::isolated
end
class n1,n3,n6,n7,n8 step;
class n10,n4,n5,n9 streaming;
class n0,n2 isolated;
class Start,End startEnd;
%% Styling
classDef default fill:#f8f9fa,stroke:#dee2e6,stroke-width:1px;
classDef step fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1;
classDef streaming fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100;
classDef map fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#1b5e20;
classDef switch fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c;
classDef sub fill:#f1f8e9,stroke:#558b2f,stroke-width:2px,color:#33691e;
classDef isolated fill:#fce4ec,stroke:#c2185b,stroke-width:2px,stroke-dasharray: 5 5,color:#880e4f;
classDef startEnd fill:#e8f5e9,stroke:#388e3c,stroke-width:3px,color:#1b5e20;
Development
justpipe uses uv for dependency management.
# Install development dependencies
uv sync --all-extras --dev
# Run full suite
uv run pytest
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file justpipe-0.11.0.tar.gz.
File metadata
- Download URL: justpipe-0.11.0.tar.gz
- Upload date:
- Size: 1.6 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d2f67f000e75e54279f14600c1498792ad5b345d1e579da004d7d3de788ddc04
|
|
| MD5 |
6ade837c753c269f122dc76c3da675bd
|
|
| BLAKE2b-256 |
56a30c9b536a691aa12e61035e453ed4a0c8e8ca3f743fd2b51e4f3347988a29
|
Provenance
The following attestation bundles were made for justpipe-0.11.0.tar.gz:
Publisher:
release.yml on plar/justpipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
justpipe-0.11.0.tar.gz -
Subject digest:
d2f67f000e75e54279f14600c1498792ad5b345d1e579da004d7d3de788ddc04 - Sigstore transparency entry: 975828910
- Sigstore integration time:
-
Permalink:
plar/justpipe@e0b0f972f3032b89f3d656e79d5815e8f6dd1ca8 -
Branch / Tag:
refs/tags/v0.11.0 - Owner: https://github.com/plar
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e0b0f972f3032b89f3d656e79d5815e8f6dd1ca8 -
Trigger Event:
push
-
Statement type:
File details
Details for the file justpipe-0.11.0-py3-none-any.whl.
File metadata
- Download URL: justpipe-0.11.0-py3-none-any.whl
- Upload date:
- Size: 1.1 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5e9bd5cffba195c58dda3dbd4d129d5ab7850c413531018b6e656cd5f9f1ec36
|
|
| MD5 |
ed722ede82d2f57e03c6a3e6d374a6cf
|
|
| BLAKE2b-256 |
35bdfc8e7b952032481d6ee9540d42415b79a81c25d01d91384aab55b6843877
|
Provenance
The following attestation bundles were made for justpipe-0.11.0-py3-none-any.whl:
Publisher:
release.yml on plar/justpipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
justpipe-0.11.0-py3-none-any.whl -
Subject digest:
5e9bd5cffba195c58dda3dbd4d129d5ab7850c413531018b6e656cd5f9f1ec36 - Sigstore transparency entry: 975828911
- Sigstore integration time:
-
Permalink:
plar/justpipe@e0b0f972f3032b89f3d656e79d5815e8f6dd1ca8 -
Branch / Tag:
refs/tags/v0.11.0 - Owner: https://github.com/plar
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e0b0f972f3032b89f3d656e79d5815e8f6dd1ca8 -
Trigger Event:
push
-
Statement type: