Skip to main content

Python SDK for FlowForge - AI workflow orchestration

Project description

FlowForge SDK

Python SDK for FlowForge - AI workflow orchestration with durable execution.

Installation

pip install flowforge-sdk

Quick Start

from flowforge import FlowForge, Context, step

flowforge = FlowForge(app_id="my-app")

@flowforge.function(
    id="my-workflow",
    trigger=flowforge.trigger.event("my/event"),
)
async def my_workflow(ctx: Context) -> dict:
    result = await step.run("process", lambda: "Hello, World!")
    return {"message": result}

Features

  • Durable execution with automatic checkpointing
  • AI agent support with tool calling
  • Multi-agent networks with routing
  • Human-in-the-loop approvals

Step Primitives

Inside a @flowforge.function, use the global step object:

from flowforge import step

# Run any function with memoization (won't re-run on replay)
result = await step.run("validate", validate_order, order)

# Pause until an event arrives
event = await step.wait_for_event("wait-payment", event="payment/received", match="data.order_id == 'abc'")

# Sleep for a duration
await step.sleep("wait-1h", "1h")

# LLM call with automatic retry
reply = await step.ai("summarize", model="gpt-4o", prompt="Summarize: ...")

# AI agent loop with tool calling
result = await step.agent(
    "research-agent",
    model="claude-sonnet-4-6",
    system="You are a research assistant.",
    messages=[{"role": "user", "content": "Research this topic..."}],
    tools=[...],
    max_tokens=4096,
    max_tool_calls=20,
)

Sending Events

# Send a single event
event_id = await flowforge.send("order/created", data={"order_id": "123"})

# Send multiple events
event_ids = await flowforge.send_many([
    {"name": "user/signup", "data": {"user_id": "1"}},
    {"name": "user/signup", "data": {"user_id": "2"}},
])

Run Management

# Get run details (status, steps, output)
run = await flowforge.get_run("761c0321-...")

# Retry a failed run in-place — keeps all memoized (completed) steps,
# only re-executes from the point of failure
result = await flowforge.retry_run("761c0321-...")

# Cancel a running or pending run
result = await flowforge.cancel_run("761c0321-...")

retry_run is different from replaying: it preserves the memoized results of all completed steps so execution resumes from where it failed rather than starting over from scratch.

Streaming Run Events (SSE)

Stream real-time events from a running workflow via Server-Sent Events:

from flowforge import FlowForge, RunEvent

flowforge = FlowForge(app_id="my-app", api_key="ff_live_...")

# Async iterator
async for event in flowforge.stream_run("run-uuid"):
    print(f"[{event.event_type.value}] {event.data}")

# With callback
async for event in flowforge.stream_run("run-uuid", on_event=lambda e: print(e)):
    pass

The stream automatically closes when the run completes or fails. Available event types:

  • step_started, step_completed, step_failed
  • thinking, thinking_chunk
  • tool_call_started, tool_call_completed
  • approval_required, approval_resolved
  • run_started, run_paused, run_resumed, run_completed, run_failed

Options:

async for event in flowforge.stream_run(
    "run-uuid",
    include_history=True,   # Include past events on connect (default: True)
    timeout=300.0,          # Server-side stream timeout in seconds (default: 300)
    on_event=my_callback,   # Optional callback for each event
):
    if event.is_terminal:
        print("Run finished:", event.data)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowforge_sdk-0.3.5.tar.gz (36.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowforge_sdk-0.3.5-py3-none-any.whl (47.1 kB view details)

Uploaded Python 3

File details

Details for the file flowforge_sdk-0.3.5.tar.gz.

File metadata

  • Download URL: flowforge_sdk-0.3.5.tar.gz
  • Upload date:
  • Size: 36.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowforge_sdk-0.3.5.tar.gz
Algorithm Hash digest
SHA256 3fe8b2a0f40076a67a423d686db41aafcdbcdeabb7ada6d9eaf6fece4015298e
MD5 dd64a93f6b06e36f20ddab1ce3d75c5a
BLAKE2b-256 c35efb80dc0f502b0d8f735ca28508e32a115c5e4f895a9d8c31396dd62fd200

See more details on using hashes here.

Provenance

The following attestation bundles were made for flowforge_sdk-0.3.5.tar.gz:

Publisher: publish-sdks.yml on hoootan/flowforge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flowforge_sdk-0.3.5-py3-none-any.whl.

File metadata

  • Download URL: flowforge_sdk-0.3.5-py3-none-any.whl
  • Upload date:
  • Size: 47.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flowforge_sdk-0.3.5-py3-none-any.whl
Algorithm Hash digest
SHA256 c31e20dc0c5f3869dce787e4b1c7328614a504849bf75cc46ce10b0f4dd7883e
MD5 b7cc16791bfae069d8bf2754880b5b90
BLAKE2b-256 8e0825f6827b982930a7ba77ad378690bdb9c895b132dc3140982f63e9144e08

See more details on using hashes here.

Provenance

The following attestation bundles were made for flowforge_sdk-0.3.5-py3-none-any.whl:

Publisher: publish-sdks.yml on hoootan/flowforge

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page