Python SDK for FlowForge - AI workflow orchestration
Project description
FlowForge SDK
Python SDK for FlowForge - AI workflow orchestration with durable execution.
Installation
pip install flowforge-sdk
Quick Start
from flowforge import FlowForge, Context, step
flowforge = FlowForge(app_id="my-app")
@flowforge.function(
id="my-workflow",
trigger=flowforge.trigger.event("my/event"),
)
async def my_workflow(ctx: Context) -> dict:
result = await step.run("process", lambda: "Hello, World!")
return {"message": result}
Features
- Durable execution with automatic checkpointing
- AI agent support with tool calling
- Multi-agent networks with routing
- Human-in-the-loop approvals
Step Primitives
Inside a @flowforge.function, use the global step object:
from flowforge import step
# Run any function with memoization (won't re-run on replay)
result = await step.run("validate", validate_order, order)
# Pause until an event arrives
event = await step.wait_for_event("wait-payment", event="payment/received", match="data.order_id == 'abc'")
# Sleep for a duration
await step.sleep("wait-1h", "1h")
# LLM call with automatic retry
reply = await step.ai("summarize", model="gpt-4o", prompt="Summarize: ...")
# AI agent loop with tool calling
result = await step.agent(
"research-agent",
model="claude-sonnet-4-6",
system="You are a research assistant.",
messages=[{"role": "user", "content": "Research this topic..."}],
tools=[...],
max_tokens=4096,
max_tool_calls=20,
)
Sending Events
# Send a single event
event_id = await flowforge.send("order/created", data={"order_id": "123"})
# Send multiple events
event_ids = await flowforge.send_many([
{"name": "user/signup", "data": {"user_id": "1"}},
{"name": "user/signup", "data": {"user_id": "2"}},
])
Run Management
# Get run details (status, steps, output)
run = await flowforge.get_run("761c0321-...")
# Retry a failed run in-place — keeps all memoized (completed) steps,
# only re-executes from the point of failure
result = await flowforge.retry_run("761c0321-...")
# Cancel a running or pending run
result = await flowforge.cancel_run("761c0321-...")
retry_run is different from replaying: it preserves the memoized results of
all completed steps so execution resumes from where it failed rather than
starting over from scratch.
Streaming Run Events (SSE)
Stream real-time events from a running workflow via Server-Sent Events:
from flowforge import FlowForge, RunEvent
flowforge = FlowForge(app_id="my-app", api_key="ff_live_...")
# Async iterator
async for event in flowforge.stream_run("run-uuid"):
print(f"[{event.event_type.value}] {event.data}")
# With callback
async for event in flowforge.stream_run("run-uuid", on_event=lambda e: print(e)):
pass
The stream automatically closes when the run completes or fails. Available event types:
step_started,step_completed,step_failedthinking,thinking_chunktool_call_started,tool_call_completedapproval_required,approval_resolvedrun_started,run_paused,run_resumed,run_completed,run_failed
Options:
async for event in flowforge.stream_run(
"run-uuid",
include_history=True, # Include past events on connect (default: True)
timeout=300.0, # Server-side stream timeout in seconds (default: 300)
on_event=my_callback, # Optional callback for each event
):
if event.is_terminal:
print("Run finished:", event.data)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowforge_sdk-0.4.1.tar.gz.
File metadata
- Download URL: flowforge_sdk-0.4.1.tar.gz
- Upload date:
- Size: 42.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
77ebda22b256ebe1fbf92600104bbaf1720df9e7208fe023ab51c0cf589e245a
|
|
| MD5 |
c58381073e5a20e7d0c80a1960e6419f
|
|
| BLAKE2b-256 |
8400025eae806e62d64cddd592c63164188098eac13b2aa873c34f5dd2ac20fa
|
Provenance
The following attestation bundles were made for flowforge_sdk-0.4.1.tar.gz:
Publisher:
publish-sdks.yml on hoootan/flowforge
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flowforge_sdk-0.4.1.tar.gz -
Subject digest:
77ebda22b256ebe1fbf92600104bbaf1720df9e7208fe023ab51c0cf589e245a - Sigstore transparency entry: 1351444407
- Sigstore integration time:
-
Permalink:
hoootan/flowforge@2ac032f8420400dbc1bc5047294651d0f87a204b -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hoootan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-sdks.yml@2ac032f8420400dbc1bc5047294651d0f87a204b -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file flowforge_sdk-0.4.1-py3-none-any.whl.
File metadata
- Download URL: flowforge_sdk-0.4.1-py3-none-any.whl
- Upload date:
- Size: 52.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e6e5993eca8cd30ad39dc7139b18c814da75fecd1a3cf87fc6f2f5d3917e1d8e
|
|
| MD5 |
f07bf6d983b6dd377af12655c8812b83
|
|
| BLAKE2b-256 |
2c6d83f006045f9329bfea9ec097e0064c52df272c6c49333500a5c0020f69c9
|
Provenance
The following attestation bundles were made for flowforge_sdk-0.4.1-py3-none-any.whl:
Publisher:
publish-sdks.yml on hoootan/flowforge
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flowforge_sdk-0.4.1-py3-none-any.whl -
Subject digest:
e6e5993eca8cd30ad39dc7139b18c814da75fecd1a3cf87fc6f2f5d3917e1d8e - Sigstore transparency entry: 1351444509
- Sigstore integration time:
-
Permalink:
hoootan/flowforge@2ac032f8420400dbc1bc5047294651d0f87a204b -
Branch / Tag:
refs/heads/main - Owner: https://github.com/hoootan
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-sdks.yml@2ac032f8420400dbc1bc5047294651d0f87a204b -
Trigger Event:
workflow_dispatch
-
Statement type: