Python bindings for the Blazen workflow engine
Project description
Blazen
Event-driven AI workflow engine, powered by Rust.
Blazen lets you build multi-step AI workflows as composable, event-driven graphs. Define steps with a decorator, wire them together with typed events, and run everything on a native Rust engine with async Python bindings.
Installation
# Recommended
uv add blazen
# Or with pip
pip install blazen
Requires Python 3.10+.
Quick Start
import asyncio
from blazen import Workflow, step, Event, StartEvent, StopEvent, Context
class GreetEvent(Event):
name: str
@step
async def parse(ctx: Context, ev: Event):
return GreetEvent(name=ev.name)
@step
async def greet(ctx: Context, ev: GreetEvent):
return StopEvent(result={"greeting": f"Hello, {ev.name}!"})
async def main():
wf = Workflow("hello", [parse, greet])
handler = await wf.run(name="Blazen")
result = await handler.result()
print(result.result) # {"greeting": "Hello, Blazen!"}
asyncio.run(main())
How it works
class GreetEvent(Event)-- SubclassingEventauto-setsevent_typeto the class name ("GreetEvent"). Annotations likename: strare for documentation only; at runtime all keyword arguments are stored as JSON.@stepreads type annotations --ev: GreetEventon a step function automatically setsaccepts=["GreetEvent"]. The step will only receive events of that type.@stepwith no type hint orev: Event-- defaults to acceptingStartEvent(the event emitted bywf.run()).ev.name-- Direct attribute access on events. No need forev.to_dict()["name"].wf.run(name="Blazen")-- Keyword arguments become theStartEventpayload. Steps that acceptStartEventreceive an event whereev.name == "Blazen".result.result-- preservesis-identity for non-JSON Python objects. You can pass class instances, Pydantic models, and even live DB connections throughStopEvent.resultand get the same object back on the other side.
Multi-Step Workflows
Chain steps together using custom event subclasses. Each step declares which events it accepts via its type annotation.
import asyncio
from blazen import Workflow, step, Event, StopEvent, Context
class FetchedEvent(Event):
text: str
source: str
class AnalyzedEvent(Event):
summary: str
@step
async def fetch(ctx: Context, ev: Event):
# ev is a StartEvent with url=...
text = f"Content from {ev.url}"
return FetchedEvent(text=text, source=ev.url)
@step
async def analyze(ctx: Context, ev: FetchedEvent):
summary = f"Analysis of: {ev.text}"
return AnalyzedEvent(summary=summary)
@step
async def report(ctx: Context, ev: AnalyzedEvent):
return StopEvent(result={"summary": ev.summary})
async def main():
wf = Workflow("pipeline", [fetch, analyze, report])
handler = await wf.run(url="https://example.com")
result = await handler.result()
print(result.result) # {"summary": "Analysis of: Content from https://example.com"}
asyncio.run(main())
Event Streaming
Stream intermediate events from a running workflow in real time using ctx.write_event_to_stream().
import asyncio
from blazen import Workflow, step, Event, StopEvent, Context
class ProgressEvent(Event):
step_num: int
message: str
@step
async def work(ctx: Context, ev: Event):
for i in range(3):
ctx.write_event_to_stream(ProgressEvent(step_num=i, message=f"Processing {i}"))
return StopEvent(result="done")
async def main():
wf = Workflow("streamer", [work])
handler = await wf.run()
async for event in handler.stream_events():
print(event.event_type, event.step_num, event.message)
result = await handler.result()
print(result.result) # "done"
asyncio.run(main())
write_event_to_stream() publishes to an external broadcast stream. Consumers read it with async for event in handler.stream_events(). These events are not routed through the step graph -- they are for external observation only.
LLM Integration
Blazen includes a built-in multi-provider LLM client. All providers share the same CompletionModel / ChatMessage interface. Responses are returned as typed CompletionResponse objects.
ChatMessage, Role, and CompletionResponse
import os
from blazen import CompletionModel, ChatMessage, Role, CompletionResponse, ProviderOptions
model = CompletionModel.openrouter(options=ProviderOptions(api_key=os.environ["OPENROUTER_API_KEY"], model="openai/gpt-4o"))
response: CompletionResponse = await model.complete([
ChatMessage.system("You are helpful."),
ChatMessage.user("What is 2+2?"),
], temperature=0.7, max_tokens=256)
# Typed attribute access
print(response.content) # "4"
print(response.model) # model name used
print(response.finish_reason) # "stop", "tool_calls", etc.
print(response.tool_calls) # list[ToolCall] or None
print(response.usage) # TokenUsage with .prompt_tokens, .completion_tokens, .total_tokens
# Dict-style access also works for backwards compatibility
print(response["content"])
Role Enum
from blazen import Role
Role.SYSTEM # "system"
Role.USER # "user"
Role.ASSISTANT # "assistant"
Role.TOOL # "tool"
# Use with ChatMessage constructor
msg = ChatMessage(role=Role.USER, content="Hello")
Multimodal Messages
Send images alongside text using multimodal factory methods:
from blazen import ChatMessage, ContentPart
# Image from URL
msg = ChatMessage.user_image_url("https://example.com/photo.jpg", "What's in this image?")
# Image from base64
msg = ChatMessage.user_image_base64(base64_data, "image/png", "Describe this.")
# Multiple content parts
msg = ChatMessage.user_parts([
ContentPart.text(text="Compare these two images:"),
ContentPart.image_url(url="https://example.com/a.jpg", media_type="image/jpeg"),
ContentPart.image_url(url="https://example.com/b.jpg", media_type="image/jpeg"),
])
Supported Providers
| Provider | Constructor | Default Model |
|---|---|---|
| OpenAI | CompletionModel.openai(options=ProviderOptions(api_key=key, model="gpt-4o")) |
gpt-4o |
| Anthropic | CompletionModel.anthropic(options=ProviderOptions(api_key=key, model="claude-sonnet-4-20250514")) |
claude-sonnet-4-20250514 |
| Google Gemini | CompletionModel.gemini(options=ProviderOptions(api_key=key, model="gemini-2.0-flash")) |
gemini-2.0-flash |
| Azure OpenAI | CompletionModel.azure(options=AzureOptions(api_key=key, resource_name="...", deployment_name="...")) |
(deployment) |
| OpenRouter | CompletionModel.openrouter(options=ProviderOptions(api_key=key, model="...")) |
-- |
| Groq | CompletionModel.groq(options=ProviderOptions(api_key=key, model="...")) |
-- |
| Together AI | CompletionModel.together(options=ProviderOptions(api_key=key, model="...")) |
-- |
| Mistral | CompletionModel.mistral(options=ProviderOptions(api_key=key, model="...")) |
-- |
| DeepSeek | CompletionModel.deepseek(options=ProviderOptions(api_key=key, model="...")) |
-- |
| Fireworks | CompletionModel.fireworks(options=ProviderOptions(api_key=key, model="...")) |
-- |
| Perplexity | CompletionModel.perplexity(options=ProviderOptions(api_key=key, model="...")) |
-- |
| xAI (Grok) | CompletionModel.xai(options=ProviderOptions(api_key=key, model="...")) |
-- |
| Cohere | CompletionModel.cohere(options=ProviderOptions(api_key=key, model="...")) |
-- |
| AWS Bedrock | CompletionModel.bedrock(options=BedrockOptions(api_key=key, region="...", model="...")) |
-- |
| fal.ai | CompletionModel.fal(options=FalOptions(api_key=key, model="...")) |
-- |
Using LLMs in Workflows
import os
from blazen import Workflow, step, Event, StopEvent, Context, CompletionModel, ChatMessage, ProviderOptions
class AnswerEvent(Event):
answer: str
@step
async def ask_llm(ctx: Context, ev: Event):
model = CompletionModel.anthropic(options=ProviderOptions(api_key=os.environ["ANTHROPIC_API_KEY"], model="claude-sonnet-4-20250514"))
response = await model.complete([
ChatMessage.system("Answer concisely."),
ChatMessage.user(ev.prompt),
], max_tokens=256)
return AnswerEvent(answer=response.content) # typed attribute access
@step
async def format_answer(ctx: Context, ev: AnswerEvent):
return StopEvent(result={"answer": ev.answer})
async def main():
wf = Workflow("llm-pipeline", [ask_llm, format_answer])
handler = await wf.run(prompt="Explain gravity in one sentence.")
result = await handler.result()
print(result.result)
Branching / Fan-Out
Return a list of events from a step to dispatch multiple events simultaneously. Each event is routed independently to steps that accept its type.
from blazen import Workflow, step, Event, StopEvent, Context
class TaskEvent(Event):
task_id: int
payload: str
@step
async def fan_out(ctx: Context, ev: Event):
return [
TaskEvent(task_id=1, payload="first"),
TaskEvent(task_id=2, payload="second"),
TaskEvent(task_id=3, payload="third"),
]
@step
async def process_task(ctx: Context, ev: TaskEvent):
# Called once per TaskEvent
return StopEvent(result={"task_id": ev.task_id, "done": True})
Side-Effect Steps
A step can return None and use ctx.send_event() to route events through the internal step graph without returning them. This is useful for steps that perform side effects (logging, saving state) before forwarding.
from blazen import Workflow, step, Event, StopEvent, Context
class ProcessedEvent(Event):
data: str
@step
async def log_and_forward(ctx: Context, ev: Event):
ctx.set("received_at", "2025-01-01T00:00:00Z")
ctx.send_event(ProcessedEvent(data=ev.payload))
return None # no direct return -- event sent via ctx
@step
async def finish(ctx: Context, ev: ProcessedEvent):
received = ctx.get("received_at")
return StopEvent(result={"data": ev.data, "received_at": received})
ctx.send_event() routes the event through the internal step registry (to steps whose accepts matches the event type). This is different from ctx.write_event_to_stream() which publishes to the external broadcast stream.
Pause and Resume
Snapshot a running workflow and resume it later -- useful for long-running processes, human-in-the-loop patterns, or persisting state across restarts.
# Pause: signal pause, then capture workflow state as JSON
handler = await wf.run(prompt="Hello")
handler.pause()
snapshot_json = await handler.snapshot()
# Save snapshot_json to disk, database, etc.
# Resume: restore from snapshot with the same steps
handler = await Workflow.resume(snapshot_json, [step1, step2])
await handler.resume_in_place()
result = await handler.result()
Note on
ctx.sessionand pause/resume. Values inctx.sessionare live references and are deliberately excluded from snapshots. If you store live objects there and then callhandler.snapshot(), the workflow'ssession_pause_policydecides what happens: the default (pickle_or_error) attempts to pickle each entry into the snapshot and raises a clear error if any entry can't be serialised. For workflows that explicitly want ephemeral runs, usectx.statefor anything that must survive pause/resume, andctx.sessionfor everything else.
Context API
Steps share state through the Context object. Every method on Context is synchronous -- no await needed.
Values are stored using a 4-tier dispatch:
bytes/bytearray-- raw binary (survives snapshots)- JSON-serializable (
dict,list,str,int,float,bool,None) -- JSON (survives snapshots) - Picklable objects (Pydantic models, dataclasses, etc.) -- pickled automatically (survives snapshots)
- Unpicklable objects (DB connections, file handles, sockets) -- live in-process reference (same-process only, excluded from snapshots)
ctx.get returns the original Python type for all four tiers.
| Method | Description |
|---|---|
ctx.set(key, value) |
Store a JSON-serializable value. |
ctx.get(key) |
Retrieve a value (returns None if missing). |
ctx.set_bytes(key, data) |
Store raw binary data (bytes). No serialization requirement. |
ctx.get_bytes(key) |
Retrieve raw binary data (returns None if missing). |
ctx.send_event(event) |
Route an event through the internal step graph. |
ctx.write_event_to_stream(event) |
Publish an event to the external broadcast stream. |
ctx.run_id() |
Get the UUID string for the current workflow run. |
@step
async def example(ctx: Context, ev: Event):
ctx.set("counter", 42) # synchronous
val = ctx.get("counter") # synchronous, returns 42
run = ctx.run_id() # synchronous, returns UUID string
ctx.send_event(SomeEvent(x=1)) # synchronous, routes internally
ctx.write_event_to_stream(SomeEvent(x=1)) # synchronous, broadcasts externally
return None
State vs Session namespaces
Alongside the smart-routing ctx.set / ctx.get shortcuts, Context exposes two explicit namespaces so you can make intent clear at the call site:
ctx.state-- persistable values (survivespause()/resume()and checkpoint stores). Routes through the same 4-tier dispatch asctx.set.ctx.session-- live in-process references. Identity is preserved within a single workflow run --ctx.session["conn"]returns the same Python object across steps. Deliberately excluded from snapshots.
import sqlite3
from blazen import step, Context, StartEvent, StopEvent
@step
async def setup(ctx: Context, ev: StartEvent) -> StopEvent:
# Persistable JSON state
ctx.state["input_path"] = "data.csv"
ctx.state["row_count"] = 0
# Live in-process references -- identity preserved
conn = sqlite3.connect(":memory:")
ctx.session["db"] = conn
# Same object on every access
assert ctx.session["db"] is conn
return StopEvent(result={"ok": True})
Both namespaces support the dict protocol (__setitem__, __getitem__, __contains__, keys).
Binary Storage
set_bytes / get_bytes let you store raw binary data with no serialization requirement. Any type can be stored by converting to bytes yourself (e.g., pickle, msgpack, protobuf). Binary data persists through pause/resume/checkpoint.
import pickle
@step
async def store_model(ctx: Context, ev: Event):
# Store arbitrary data as bytes
model_data = pickle.dumps({"weights": [1.0, 2.0, 3.0]})
ctx.set_bytes("model", model_data)
return NextEvent()
@step
async def load_model(ctx: Context, ev: NextEvent):
raw = ctx.get_bytes("model")
model = pickle.loads(raw)
return StopEvent(result=model)
API Reference
| Class / Function | Description |
|---|---|
Event(event_type, **kwargs) |
Base event class. Subclass it: class MyEvent(Event) auto-sets event_type to class name. Direct attribute access: ev.name. Also has ev.to_dict() and ev.event_type. |
StartEvent(**kwargs) |
Emitted by wf.run(**kwargs). Steps with ev: Event or no annotation accept this. |
StopEvent(**kwargs) |
Terminates the workflow. Access the result via result.result. |
Context |
Shared typed storage, event emission, and stream publishing. Use ctx.state for persistable values, ctx.session for live in-process references. Smart-routing ctx.set / ctx.get shortcuts still work. Methods: set, get, set_bytes, get_bytes, send_event, write_event_to_stream, run_id. All synchronous. |
@step |
Decorator for workflow steps. Infers accepts from the ev parameter type annotation. Supports async def and plain def. May also be called as @step(accepts=[...], emits=[...], max_concurrency=N). |
Workflow(name, steps, timeout=None) |
Validated workflow graph. timeout is in seconds (default: 300). |
await wf.run(**kwargs) |
Execute the workflow. Returns a WorkflowHandler. Kwargs become the StartEvent payload. |
WorkflowHandler |
Handle to a running workflow: await handler.result(), async for ev in handler.stream_events(), handler.pause(), await handler.snapshot(), await handler.resume_in_place(), await handler.respond_to_input(request_id, response), await handler.abort(). |
await Workflow.resume(snapshot_json, steps, timeout=None) |
Resume a paused workflow from a JSON snapshot. Returns a WorkflowHandler. |
CompletionModel.<provider>(options=ProviderOptions(...)) |
LLM provider. Pass a typed options struct (ProviderOptions, AzureOptions, BedrockOptions, or FalOptions) via options=. Providers: openai, anthropic, gemini, azure, openrouter, groq, together, mistral, deepseek, fireworks, perplexity, xai, cohere, bedrock, fal. |
await model.complete(messages, ...) |
Chat completion. Returns a typed CompletionResponse. |
ChatMessage(role=, content=, parts=) |
Chat message. Constructor with keyword args (role defaults to "user"). Static factories: .system(), .user(), .assistant(), .tool(), .user_image_url(), .user_image_base64(), .user_parts(). |
Role |
Role enum: Role.SYSTEM, Role.USER, Role.ASSISTANT, Role.TOOL. |
CompletionResponse |
Typed response: .content, .model, .finish_reason, .tool_calls, .usage. Also supports dict-style response["content"]. |
ToolCall |
Tool call object: .id, .name, .arguments. |
TokenUsage |
Token usage: .prompt_tokens, .completion_tokens, .total_tokens. |
ContentPart |
Multimodal content part: .text(text=...), .image_url(url=..., media_type=...), .image_base64(data=..., media_type=...). |
Documentation
Full docs: blazen.dev
Source: github.com/ZachHandley/Blazen
License
AGPL-3.0 -- see LICENSE for details.
Author: Zach Handley
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file blazen-0.1.126.tar.gz.
File metadata
- Download URL: blazen-0.1.126.tar.gz
- Upload date:
- Size: 514.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
81340e91b1f872adaf5d847c8f5f02a7961acc1b866598763efba06b108923b7
|
|
| MD5 |
037eb527f36a17d56e4755f5b3f5a5fd
|
|
| BLAKE2b-256 |
3743801e6c1472ef3f3c8db846ef380ffaad12aa929bd0fe236e5bb438abeec5
|
File details
Details for the file blazen-0.1.126-cp314-cp314-manylinux_2_38_x86_64.whl.
File metadata
- Download URL: blazen-0.1.126-cp314-cp314-manylinux_2_38_x86_64.whl
- Upload date:
- Size: 4.7 MB
- Tags: CPython 3.14, manylinux: glibc 2.38+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
479e91eee4ad5f34882c2a84185228d2b8cb7dd10fd2315deef6d2ae027db308
|
|
| MD5 |
53e5e9f30c2ca6d5582724df8b081f87
|
|
| BLAKE2b-256 |
75dc2112d840cd61f06432be831786127409cda0f83615248f8721eabb6e3833
|
File details
Details for the file blazen-0.1.126-cp313-cp313-manylinux_2_38_x86_64.whl.
File metadata
- Download URL: blazen-0.1.126-cp313-cp313-manylinux_2_38_x86_64.whl
- Upload date:
- Size: 4.7 MB
- Tags: CPython 3.13, manylinux: glibc 2.38+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4285d95b486e64f4ecac5e7d4936ca16de37ce8784507e919b91f9ee982f2956
|
|
| MD5 |
14c6cbb689681eb2c4ea4cec3c901868
|
|
| BLAKE2b-256 |
64fcc240183a77208f859c074911ad8ba9ea11f4b45a3f9d5c3644c9243cf0fb
|
File details
Details for the file blazen-0.1.126-cp312-cp312-manylinux_2_38_x86_64.whl.
File metadata
- Download URL: blazen-0.1.126-cp312-cp312-manylinux_2_38_x86_64.whl
- Upload date:
- Size: 4.7 MB
- Tags: CPython 3.12, manylinux: glibc 2.38+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f727b71e35ec45a1a1214aebbf143a864c89ef2b51ad98d5c33f3d38253b4550
|
|
| MD5 |
e05aa4a195b67122d7c324f9621d572b
|
|
| BLAKE2b-256 |
1754e142aadc61e8b8b4ddda9453908120143b3a060d168fb2b380a6309b82d6
|
File details
Details for the file blazen-0.1.126-cp311-cp311-manylinux_2_38_x86_64.whl.
File metadata
- Download URL: blazen-0.1.126-cp311-cp311-manylinux_2_38_x86_64.whl
- Upload date:
- Size: 4.7 MB
- Tags: CPython 3.11, manylinux: glibc 2.38+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
79b66f46d32141bd6ea5abe4ade1a4e426afac6a572d3db2b9288e44a1b7b329
|
|
| MD5 |
263e6d370eebdede046e170d54cf1a43
|
|
| BLAKE2b-256 |
16d454b4baa505d04f2bf355a4595f7264f69477ab2d8216a257cd906bae5fd7
|
File details
Details for the file blazen-0.1.126-cp310-cp310-manylinux_2_38_x86_64.whl.
File metadata
- Download URL: blazen-0.1.126-cp310-cp310-manylinux_2_38_x86_64.whl
- Upload date:
- Size: 4.7 MB
- Tags: CPython 3.10, manylinux: glibc 2.38+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8cf37195bbded6afb8f28d31e1b08895feb6e2a2e8f1614fbaf047702e9ddba0
|
|
| MD5 |
244aa49f55ef777e9ec8524093fd134c
|
|
| BLAKE2b-256 |
9a02d3181176b629bc6208240cd8e94c7ed7e1275339e0e4fd9a71a58be81943
|