Skip to main content

Python bindings for the Blazen workflow engine

Reason this release was yanked:

Ran out of space, wheels incomplete

Project description

Blazen

Event-driven AI workflow engine, powered by Rust.

PyPI Python License: MPL-2.0

Blazen lets you build multi-step AI workflows as composable, event-driven graphs. Define steps with a decorator, wire them together with typed events, and run everything on a native Rust engine with async Python bindings.

Installation

# Recommended
uv add blazen

# Or with pip
pip install blazen

Requires Python 3.10+.

Quick Start

import asyncio
from blazen import Workflow, step, Event, StartEvent, StopEvent, Context

class GreetEvent(Event):
    name: str

@step
async def parse(ctx: Context, ev: Event):
    return GreetEvent(name=ev.name)

@step
async def greet(ctx: Context, ev: GreetEvent):
    return StopEvent(result={"greeting": f"Hello, {ev.name}!"})

async def main():
    wf = Workflow("hello", [parse, greet])
    handler = await wf.run(name="Blazen")
    result = await handler.result()
    print(result.result)  # {"greeting": "Hello, Blazen!"}

asyncio.run(main())

How it works

  • class GreetEvent(Event) -- Subclassing Event auto-sets event_type to the class name ("GreetEvent"). Annotations like name: str are for documentation only; at runtime all keyword arguments are stored as JSON.
  • @step reads type annotations -- ev: GreetEvent on a step function automatically sets accepts=["GreetEvent"]. The step will only receive events of that type.
  • @step with no type hint or ev: Event -- defaults to accepting StartEvent (the event emitted by wf.run()).
  • ev.name -- Direct attribute access on events. No need for ev.to_dict()["name"].
  • wf.run(name="Blazen") -- Keyword arguments become the StartEvent payload. Steps that accept StartEvent receive an event where ev.name == "Blazen".
  • result.result -- preserves is-identity for non-JSON Python objects. You can pass class instances, Pydantic models, and even live DB connections through StopEvent.result and get the same object back on the other side.

Multi-Step Workflows

Chain steps together using custom event subclasses. Each step declares which events it accepts via its type annotation.

import asyncio
from blazen import Workflow, step, Event, StopEvent, Context

class FetchedEvent(Event):
    text: str
    source: str

class AnalyzedEvent(Event):
    summary: str

@step
async def fetch(ctx: Context, ev: Event):
    # ev is a StartEvent with url=...
    text = f"Content from {ev.url}"
    return FetchedEvent(text=text, source=ev.url)

@step
async def analyze(ctx: Context, ev: FetchedEvent):
    summary = f"Analysis of: {ev.text}"
    return AnalyzedEvent(summary=summary)

@step
async def report(ctx: Context, ev: AnalyzedEvent):
    return StopEvent(result={"summary": ev.summary})

async def main():
    wf = Workflow("pipeline", [fetch, analyze, report])
    handler = await wf.run(url="https://example.com")
    result = await handler.result()
    print(result.result)  # {"summary": "Analysis of: Content from https://example.com"}

asyncio.run(main())

Event Streaming

Stream intermediate events from a running workflow in real time using ctx.write_event_to_stream().

import asyncio
from blazen import Workflow, step, Event, StopEvent, Context

class ProgressEvent(Event):
    step_num: int
    message: str

@step
async def work(ctx: Context, ev: Event):
    for i in range(3):
        ctx.write_event_to_stream(ProgressEvent(step_num=i, message=f"Processing {i}"))
    return StopEvent(result="done")

async def main():
    wf = Workflow("streamer", [work])
    handler = await wf.run()

    async for event in handler.stream_events():
        print(event.event_type, event.step_num, event.message)

    result = await handler.result()
    print(result.result)  # "done"

asyncio.run(main())

write_event_to_stream() publishes to an external broadcast stream. Consumers read it with async for event in handler.stream_events(). These events are not routed through the step graph -- they are for external observation only.

LLM Integration

Blazen includes a built-in multi-provider LLM client. All providers share the same CompletionModel / ChatMessage interface. Responses are returned as typed CompletionResponse objects.

ChatMessage, Role, and CompletionResponse

import os
from blazen import CompletionModel, ChatMessage, Role, CompletionResponse, ProviderOptions

model = CompletionModel.openrouter(options=ProviderOptions(api_key=os.environ["OPENROUTER_API_KEY"], model="openai/gpt-4o"))
response: CompletionResponse = await model.complete([
    ChatMessage.system("You are helpful."),
    ChatMessage.user("What is 2+2?"),
], temperature=0.7, max_tokens=256)

# Typed attribute access
print(response.content)        # "4"
print(response.model)          # model name used
print(response.finish_reason)  # "stop", "tool_calls", etc.
print(response.tool_calls)     # list[ToolCall] or None
print(response.usage)          # TokenUsage with .prompt_tokens, .completion_tokens, .total_tokens

# Dict-style access also works for backwards compatibility
print(response["content"])

Role Enum

from blazen import Role

Role.SYSTEM     # "system"
Role.USER       # "user"
Role.ASSISTANT  # "assistant"
Role.TOOL       # "tool"

# Use with ChatMessage constructor
msg = ChatMessage(role=Role.USER, content="Hello")

Multimodal Messages

Send images alongside text using multimodal factory methods:

from blazen import ChatMessage, ContentPart

# Image from URL
msg = ChatMessage.user_image_url("https://example.com/photo.jpg", "What's in this image?")

# Image from base64
msg = ChatMessage.user_image_base64(base64_data, "image/png", "Describe this.")

# Multiple content parts
msg = ChatMessage.user_parts([
    ContentPart.text(text="Compare these two images:"),
    ContentPart.image_url(url="https://example.com/a.jpg", media_type="image/jpeg"),
    ContentPart.image_url(url="https://example.com/b.jpg", media_type="image/jpeg"),
])

Media Sources

For APIs that take generic media inputs (vision, OCR, audio-paired pipelines), use ImageSource directly or its alias MediaSource. The alias is provided so callers can spell intent at the call site without changing the underlying type.

from blazen import ImageSource, MediaSource  # MediaSource is ImageSource

src1 = ImageSource.url("https://example.com/photo.jpg")
src2 = MediaSource.path("/tmp/scan.png")        # same class, ergonomic alias
src3 = MediaSource.base64(b64_bytes, "image/png")

Supported Providers

Provider Constructor Default Model
OpenAI CompletionModel.openai(options=ProviderOptions(api_key=key, model="gpt-4o")) gpt-4o
Anthropic CompletionModel.anthropic(options=ProviderOptions(api_key=key, model="claude-sonnet-4-20250514")) claude-sonnet-4-20250514
Google Gemini CompletionModel.gemini(options=ProviderOptions(api_key=key, model="gemini-2.0-flash")) gemini-2.0-flash
Azure OpenAI CompletionModel.azure(options=AzureOptions(api_key=key, resource_name="...", deployment_name="...")) (deployment)
OpenRouter CompletionModel.openrouter(options=ProviderOptions(api_key=key, model="...")) --
Groq CompletionModel.groq(options=ProviderOptions(api_key=key, model="...")) --
Together AI CompletionModel.together(options=ProviderOptions(api_key=key, model="...")) --
Mistral CompletionModel.mistral(options=ProviderOptions(api_key=key, model="...")) --
DeepSeek CompletionModel.deepseek(options=ProviderOptions(api_key=key, model="...")) --
Fireworks CompletionModel.fireworks(options=ProviderOptions(api_key=key, model="...")) --
Perplexity CompletionModel.perplexity(options=ProviderOptions(api_key=key, model="...")) --
xAI (Grok) CompletionModel.xai(options=ProviderOptions(api_key=key, model="...")) --
Cohere CompletionModel.cohere(options=ProviderOptions(api_key=key, model="...")) --
AWS Bedrock CompletionModel.bedrock(options=BedrockOptions(api_key=key, region="...", model="...")) --
fal.ai CompletionModel.fal(options=FalOptions(api_key=key, model="...")) --

Using LLMs in Workflows

import os
from blazen import Workflow, step, Event, StopEvent, Context, CompletionModel, ChatMessage, ProviderOptions

class AnswerEvent(Event):
    answer: str

@step
async def ask_llm(ctx: Context, ev: Event):
    model = CompletionModel.anthropic(options=ProviderOptions(api_key=os.environ["ANTHROPIC_API_KEY"], model="claude-sonnet-4-20250514"))
    response = await model.complete([
        ChatMessage.system("Answer concisely."),
        ChatMessage.user(ev.prompt),
    ], max_tokens=256)
    return AnswerEvent(answer=response.content)  # typed attribute access

@step
async def format_answer(ctx: Context, ev: AnswerEvent):
    return StopEvent(result={"answer": ev.answer})

async def main():
    wf = Workflow("llm-pipeline", [ask_llm, format_answer])
    handler = await wf.run(prompt="Explain gravity in one sentence.")
    result = await handler.result()
    print(result.result)

Local Inference

Blazen ships first-class bindings for several local-inference backends. They share a common shape: build an engine, call generate(...) for one-shot output, or stream(...) for an async iterator of typed chunks. Each backend has its own message and result types so you can keep them straight in mixed pipelines.

mistral.rs

from blazen import ChatMessageInput, ChatRole, InferenceResult, InferenceChunkStream

messages = [
    ChatMessageInput(role=ChatRole.SYSTEM, content="You are concise."),
    ChatMessageInput(role=ChatRole.USER, content="Explain entropy."),
]

result: InferenceResult = await engine.generate(messages, max_tokens=256)
print(result.text)

stream: InferenceChunkStream = await engine.stream(messages)
async for chunk in stream:
    if chunk.delta:
        print(chunk.delta, end="", flush=True)
    if chunk.reasoning_delta:
        ...  # optional reasoning trace
    if chunk.tool_calls:
        for call in chunk.tool_calls:  # list[InferenceToolCall]
            ...
    if chunk.finish_reason:
        break

Vision-capable models accept InferenceImage parts built from InferenceImageSource (URL, path, or base64). InferenceUsage is attached to both InferenceResult and the final chunk.

llama.cpp

Symmetric API with its own type family so the two backends never alias:

from blazen import (
    LlamaCppChatMessageInput,
    LlamaCppChatRole,
    LlamaCppInferenceResult,
    LlamaCppInferenceChunkStream,
)

messages = [
    LlamaCppChatMessageInput(role=LlamaCppChatRole.USER, content="Hi!"),
]

result: LlamaCppInferenceResult = await engine.generate(messages)
print(result.text, result.usage)  # usage is LlamaCppInferenceUsage

stream: LlamaCppInferenceChunkStream = await engine.stream(messages)
async for chunk in stream:  # LlamaCppInferenceChunk
    if chunk.delta:
        print(chunk.delta, end="", flush=True)

candle

Candle returns a CandleInferenceResult from its non-streaming path:

from blazen import CandleInferenceResult

result: CandleInferenceResult = await engine.generate(prompt="Once upon a time", max_tokens=128)
print(result.text)

Progress Reporting

Long downloads and model loads accept a progress callback. Subclass ProgressCallback and override on_progress. The default implementation is a no-op, so partial overrides are safe.

from blazen import ProgressCallback

class TqdmProgress(ProgressCallback):
    def __init__(self):
        super().__init__()
        self._bar = None

    def on_progress(self, downloaded: int, total: int | None) -> None:
        # total is None when the upstream doesn't report Content-Length
        if total is not None:
            pct = 100.0 * downloaded / total
            print(f"\r{pct:5.1f}%  {downloaded}/{total} bytes", end="", flush=True)
        else:
            print(f"\r{downloaded} bytes", end="", flush=True)

# Pass instances to APIs that accept callbacks (model-cache download paths, etc.)
await cache.download(model_id="...", progress=TqdmProgress())

Telemetry

Blazen exposes opt-in telemetry initializers. Each one is gated behind a Cargo feature on the underlying wheel (langfuse, otlp, prometheus); calling an initializer for a feature that wasn't compiled in raises an UnsupportedError.

Langfuse

from blazen import LangfuseConfig, init_langfuse

init_langfuse(LangfuseConfig(public_key="pk-...", secret_key="sk-...", host="https://cloud.langfuse.com"))

OpenTelemetry (OTLP)

from blazen import OtlpConfig, init_otlp

init_otlp(OtlpConfig(endpoint="http://localhost:4317", service_name="my-app"))

Prometheus

from blazen import init_prometheus

init_prometheus(9090)  # exposes /metrics on the given port

Branching / Fan-Out

Return a list of events from a step to dispatch multiple events simultaneously. Each event is routed independently to steps that accept its type.

from blazen import Workflow, step, Event, StopEvent, Context

class TaskEvent(Event):
    task_id: int
    payload: str

@step
async def fan_out(ctx: Context, ev: Event):
    return [
        TaskEvent(task_id=1, payload="first"),
        TaskEvent(task_id=2, payload="second"),
        TaskEvent(task_id=3, payload="third"),
    ]

@step
async def process_task(ctx: Context, ev: TaskEvent):
    # Called once per TaskEvent
    return StopEvent(result={"task_id": ev.task_id, "done": True})

Side-Effect Steps

A step can return None and use ctx.send_event() to route events through the internal step graph without returning them. This is useful for steps that perform side effects (logging, saving state) before forwarding.

from blazen import Workflow, step, Event, StopEvent, Context

class ProcessedEvent(Event):
    data: str

@step
async def log_and_forward(ctx: Context, ev: Event):
    ctx.set("received_at", "2025-01-01T00:00:00Z")
    ctx.send_event(ProcessedEvent(data=ev.payload))
    return None  # no direct return -- event sent via ctx

@step
async def finish(ctx: Context, ev: ProcessedEvent):
    received = ctx.get("received_at")
    return StopEvent(result={"data": ev.data, "received_at": received})

ctx.send_event() routes the event through the internal step registry (to steps whose accepts matches the event type). This is different from ctx.write_event_to_stream() which publishes to the external broadcast stream.

Pause and Resume

Snapshot a running workflow and resume it later -- useful for long-running processes, human-in-the-loop patterns, or persisting state across restarts.

# Pause: signal pause, then capture workflow state as JSON
handler = await wf.run(prompt="Hello")
handler.pause()
snapshot_json = await handler.snapshot()
# Save snapshot_json to disk, database, etc.

# Resume: restore from snapshot with the same steps
handler = await Workflow.resume(snapshot_json, [step1, step2])
await handler.resume_in_place()
result = await handler.result()

Note on ctx.session and pause/resume. Values in ctx.session are live references and are deliberately excluded from snapshots. If you store live objects there and then call handler.snapshot(), the workflow's session_pause_policy decides what happens: the default (pickle_or_error) attempts to pickle each entry into the snapshot and raises a clear error if any entry can't be serialised. For workflows that explicitly want ephemeral runs, use ctx.state for anything that must survive pause/resume, and ctx.session for everything else.

Errors

All Blazen-raised exceptions inherit from BlazenError, so a single except BlazenError catches everything the SDK throws while leaving unrelated Exceptions to propagate. Catch narrower bases when you want to react differently per category.

Base classes

Class Raised when
BlazenError Root of the hierarchy.
AuthError Missing/invalid credentials.
RateLimitError Provider rate-limited the request. Inspect retry_after_ms.
TimeoutError Request or workflow exceeded its deadline.
ValidationError Bad input shape (events, options, snapshots).
ContentPolicyError Provider refused the prompt or output for policy reasons.
ProviderError Generic upstream/provider failure. Base for backend-specific errors.
UnsupportedError Feature not compiled into this wheel (e.g. telemetry without the feature).
ComputeError Local-inference compute failure (CUDA OOM, kernel error, etc.).
MediaError Decode/encode failure for image/audio inputs.

Per-backend ProviderError subclasses

These are feature-gated at runtime — they exist on blazen but are only raised when the corresponding backend is bundled in the wheel.

Class Backend
LlamaCppError llama.cpp
CandleLlmError candle (LLM)
CandleEmbedError candle (embeddings)
MistralRsError mistral.rs
WhisperError whisper.cpp
PiperError piper
DiffusionError diffusion (image generation)
FastEmbedError fastembed
TractError tract

Structured attributes

ProviderError (and every per-backend subclass) carries structured fields you can read in except blocks:

  • provider — short provider name ("openai", "llama_cpp", ...)
  • status — HTTP status code if applicable, else None
  • endpoint — request URL/route if applicable
  • request_id — provider-supplied request id if available
  • detail — human-readable detail string
  • raw_body — raw response body as bytes if captured
  • retry_after_ms — milliseconds the provider asked you to wait (mirrored on RateLimitError)
from blazen import (
    BlazenError, AuthError, RateLimitError, ProviderError,
    LlamaCppError, MistralRsError, ContentPolicyError,
)

try:
    response = await model.complete(messages)
except AuthError as e:
    print("bad key:", e)
except RateLimitError as e:
    print(f"slow down; retry in {e.retry_after_ms} ms")
except ContentPolicyError:
    print("provider refused the prompt")
except (LlamaCppError, MistralRsError) as e:
    # Local-inference specific handling
    print(f"local backend {e.provider} failed: {e.detail}")
except ProviderError as e:
    print(f"upstream {e.provider} returned {e.status}: {e.detail}")
except BlazenError:
    raise

Context API

Steps share state through the Context object. Every method on Context is synchronous -- no await needed.

Values are stored using a 4-tier dispatch:

  1. bytes / bytearray -- raw binary (survives snapshots)
  2. JSON-serializable (dict, list, str, int, float, bool, None) -- JSON (survives snapshots)
  3. Picklable objects (Pydantic models, dataclasses, etc.) -- pickled automatically (survives snapshots)
  4. Unpicklable objects (DB connections, file handles, sockets) -- live in-process reference (same-process only, excluded from snapshots)

ctx.get returns the original Python type for all four tiers.

Method Description
ctx.set(key, value) Store a JSON-serializable value.
ctx.get(key) Retrieve a value (returns None if missing).
ctx.set_bytes(key, data) Store raw binary data (bytes). No serialization requirement.
ctx.get_bytes(key) Retrieve raw binary data (returns None if missing).
ctx.send_event(event) Route an event through the internal step graph.
ctx.write_event_to_stream(event) Publish an event to the external broadcast stream.
ctx.run_id() Get the UUID string for the current workflow run.
@step
async def example(ctx: Context, ev: Event):
    ctx.set("counter", 42)              # synchronous
    val = ctx.get("counter")            # synchronous, returns 42
    run = ctx.run_id()                  # synchronous, returns UUID string
    ctx.send_event(SomeEvent(x=1))      # synchronous, routes internally
    ctx.write_event_to_stream(SomeEvent(x=1))  # synchronous, broadcasts externally
    return None

State vs Session namespaces

Alongside the smart-routing ctx.set / ctx.get shortcuts, Context exposes two explicit namespaces so you can make intent clear at the call site:

  • ctx.state -- persistable values (survives pause() / resume() and checkpoint stores). Routes through the same 4-tier dispatch as ctx.set.
  • ctx.session -- live in-process references. Identity is preserved within a single workflow run -- ctx.session["conn"] returns the same Python object across steps. Deliberately excluded from snapshots.
import sqlite3
from blazen import step, Context, StartEvent, StopEvent

@step
async def setup(ctx: Context, ev: StartEvent) -> StopEvent:
    # Persistable JSON state
    ctx.state["input_path"] = "data.csv"
    ctx.state["row_count"] = 0

    # Live in-process references -- identity preserved
    conn = sqlite3.connect(":memory:")
    ctx.session["db"] = conn

    # Same object on every access
    assert ctx.session["db"] is conn

    return StopEvent(result={"ok": True})

Both namespaces support the dict protocol (__setitem__, __getitem__, __contains__, keys).

Binary Storage

set_bytes / get_bytes let you store raw binary data with no serialization requirement. Any type can be stored by converting to bytes yourself (e.g., pickle, msgpack, protobuf). Binary data persists through pause/resume/checkpoint.

import pickle

@step
async def store_model(ctx: Context, ev: Event):
    # Store arbitrary data as bytes
    model_data = pickle.dumps({"weights": [1.0, 2.0, 3.0]})
    ctx.set_bytes("model", model_data)
    return NextEvent()

@step
async def load_model(ctx: Context, ev: NextEvent):
    raw = ctx.get_bytes("model")
    model = pickle.loads(raw)
    return StopEvent(result=model)

API Reference

Class / Function Description
Event(event_type, **kwargs) Base event class. Subclass it: class MyEvent(Event) auto-sets event_type to class name. Direct attribute access: ev.name. Also has ev.to_dict() and ev.event_type.
StartEvent(**kwargs) Emitted by wf.run(**kwargs). Steps with ev: Event or no annotation accept this.
StopEvent(**kwargs) Terminates the workflow. Access the result via result.result.
Context Shared typed storage, event emission, and stream publishing. Use ctx.state for persistable values, ctx.session for live in-process references. Smart-routing ctx.set / ctx.get shortcuts still work. Methods: set, get, set_bytes, get_bytes, send_event, write_event_to_stream, run_id. All synchronous.
@step Decorator for workflow steps. Infers accepts from the ev parameter type annotation. Supports async def and plain def. May also be called as @step(accepts=[...], emits=[...], max_concurrency=N).
Workflow(name, steps, timeout=None) Validated workflow graph. timeout is in seconds (default: 300).
await wf.run(**kwargs) Execute the workflow. Returns a WorkflowHandler. Kwargs become the StartEvent payload.
WorkflowHandler Handle to a running workflow: await handler.result(), async for ev in handler.stream_events(), handler.pause(), await handler.snapshot(), await handler.resume_in_place(), await handler.respond_to_input(request_id, response), await handler.abort().
await Workflow.resume(snapshot_json, steps, timeout=None) Resume a paused workflow from a JSON snapshot. Returns a WorkflowHandler.
CompletionModel.<provider>(options=ProviderOptions(...)) LLM provider. Pass a typed options struct (ProviderOptions, AzureOptions, BedrockOptions, or FalOptions) via options=. Providers: openai, anthropic, gemini, azure, openrouter, groq, together, mistral, deepseek, fireworks, perplexity, xai, cohere, bedrock, fal.
await model.complete(messages, ...) Chat completion. Returns a typed CompletionResponse.
ChatMessage(role=, content=, parts=) Chat message. Constructor with keyword args (role defaults to "user"). Static factories: .system(), .user(), .assistant(), .tool(), .user_image_url(), .user_image_base64(), .user_parts().
Role Role enum: Role.SYSTEM, Role.USER, Role.ASSISTANT, Role.TOOL.
CompletionResponse Typed response: .content, .model, .finish_reason, .tool_calls, .usage. Also supports dict-style response["content"].
ToolCall Tool call object: .id, .name, .arguments.
TokenUsage Token usage: .prompt_tokens, .completion_tokens, .total_tokens.
ContentPart Multimodal content part: .text(text=...), .image_url(url=..., media_type=...), .image_base64(data=..., media_type=...).

Documentation

Full docs: blazen.dev

Source: github.com/ZachHandley/Blazen

License

MPL-2.0 -- see LICENSE for details.

Author: Zach Handley

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

blazen-0.5.2-cp314-cp314-win_amd64.whl (27.9 MB view details)

Uploaded CPython 3.14Windows x86-64

File details

Details for the file blazen-0.5.2-cp314-cp314-win_amd64.whl.

File metadata

  • Download URL: blazen-0.5.2-cp314-cp314-win_amd64.whl
  • Upload date:
  • Size: 27.9 MB
  • Tags: CPython 3.14, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for blazen-0.5.2-cp314-cp314-win_amd64.whl
Algorithm Hash digest
SHA256 484afd0a5d3f9f6b5ecc6d2fb70640556833c165c08a81bb03b41f6d6c332a5d
MD5 c50962c65156be9cb77814427f6ecfe0
BLAKE2b-256 2fae4c3c3138c63a2a7c60b475b47a0e0a1face4cde16d658532cbf76f493c5b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page