Python runtime for bytedance/flowgram.ai workflows — execute visually-authored graph workflows with LLM, HTTP, condition, loop and code (Python sandbox) nodes.
Project description
flowgram-runtime
Python port of bytedance/flowgram.ai packages/runtime.
A workflow runtime engine that parses and executes graph-based workflows containing the same node set as the upstream TS reference implementation:
| Node | Status | Notes |
|---|---|---|
start / end |
✅ | full input/output declaration support |
condition |
✅ | 14 operators × 9 types via rules table |
loop / break / continue |
✅ | sub-context scope chain, locals (item, index) |
block-start / block-end |
✅ | loop sub-graph boundaries |
llm |
✅ | OpenAI-compatible chat completions (async) |
http |
✅ | 6 body types (none / json / form-data / x-www-form-urlencoded / raw-text / binary), retry with exponential backoff |
code |
✅ | Python sandbox via RestrictedPython (60s timeout, blocks imports / dunder access) — note: upstream uses QuickJS; this is the intentional language difference. |
Install
pip install -e ".[dev]" # development
# or after publish:
pip install flowgram-runtime
CLI
After installing the package (pip install flowgram-runtime), the flowgram command becomes
available:
flowgram --version
flowgram serve --host 0.0.0.0 --port 4000
- OpenAPI / Swagger UI: http://localhost:4000/docs
- OpenAPI JSON spec: http://localhost:4000/openapi.json
- tRPC compatible endpoint: http://localhost:4000/trpc/*
REST API
Matches upstream paths exactly so frontends configured for the TS runtime can swap the host.
| Method | Path | Body / Query | Response |
|---|---|---|---|
| GET | /info |
— | server info |
| POST | /task/run |
{schema: string, inputs: object} |
{taskID} |
| GET | /task/report |
?taskID= |
IReport |
| GET | /task/result |
?taskID= |
workflow outputs |
| PUT | /task/cancel |
{taskID} |
{success: bool} |
| POST | /task/validate |
{schema: string, inputs: object} |
{valid, errors?} |
schema is a JSON string of the workflow definition (mirrors upstream contract).
tRPC compatibility
The /trpc/<procedure> namespace accepts the same procedures by both slash and dot notation:
info,task.run,task.report,task.result,task.cancel,task.validate
Supported request formats:
GET /trpc/<proc>?input=<url-encoded-json>POST /trpc/<proc>with body{"json": <payload>}(tRPC v10 wrapper) or bare<payload>- Batch mode:
?batch=1with comma-separated procedure list and indexed input map
Responses are wrapped as {"result": {"data": <output>}} per tRPC convention.
Python SDK
import flowgram_runtime as fg
schema = {
"nodes": [
{"id": "start_0", "type": "start", "meta": {"position": {"x": 0, "y": 0}},
"data": {"outputs": {"type": "object", "properties": {"msg": {"type": "string"}}}}},
{"id": "end_0", "type": "end", "meta": {"position": {"x": 200, "y": 0}},
"data": {
"inputs": {"type": "object", "properties": {"echo": {"type": "string"}}},
"inputsValues": {"echo": {"type": "ref", "content": ["start_0", "msg"]}},
}},
],
"edges": [{"sourceNodeID": "start_0", "targetNodeID": "end_0"}],
}
Fire-and-await
outputs = await fg.run(schema, inputs={"msg": "hello"})
# {'echo': 'hello'}
With a task handle (status / report / cancel)
task = fg.start(schema, inputs={"msg": "hello"})
outputs = await task.wait(timeout=10) # raises TimeoutError on timeout
print(task.status) # WorkflowStatus.Succeeded
print(task.report().reports.keys()) # per-node reports
task.cancel() # idempotent
From sync code
outputs = fg.run_sync(schema, inputs={"msg": "hi"})
Refuses to run inside an event loop — use await fg.run(...) from async code.
Validate without running
result = fg.validate(schema, inputs={"msg": "x"})
if not result.valid:
print(result.errors)
Custom node
class SentimentNode(fg.INodeExecutor):
type = "sentiment"
async def execute(self, ctx: fg.ExecutionContext) -> fg.ExecutionResult:
text = ctx.inputs.get("text", "")
return fg.ExecutionResult(outputs={"score": 0.87})
fg.register_executor(SentimentNode())
# Workflows with `"type": "sentiment"` nodes can now run.
Embed the HTTP server inside your FastAPI
your_app.mount("/_flowgram", fg.create_app())
fg.create_app is lazy — import flowgram_runtime doesn't pull in FastAPI/uvicorn
unless you actually call it.
Schema input forms
fg.run / fg.start / fg.validate accept the schema as a dict, a JSON string,
or a fg.WorkflowSchema instance — pick whatever fits your storage layer.
Lower-level handles (advanced)
The facade covers the 90% case. For full control reach for the underlying types:
from flowgram_runtime import WorkflowApplication, InvokeParams, WorkflowSchema
app = WorkflowApplication.instance()
task_id = app.run(InvokeParams(schema=WorkflowSchema.model_validate(schema), inputs={...}))
Code node — Python sandbox contract
The script must define a function main(p) where p["params"] is the parsed inputs dict.
It should return a dict (which becomes the node outputs); non-dict returns are wrapped as
{"result": <value>}.
# data.script.content
def main(p):
return {"sum": p["params"]["a"] + p["params"]["b"]}
Restrictions enforced by RestrictedPython:
- No
importstatements - No access to dunder attributes (
__class__,__bases__, …) - Restricted builtins (no
eval,exec,open,__import__, …) - 60-second hard timeout
Architecture
src/flowgram_runtime/
├── interface/ # Pydantic models + ABCs (mirrors @flowgram.ai/runtime-interface)
│ ├── schema/ # WorkflowSchema / NodeSchema / EdgeSchema / IFlowValue / IJsonSchema
│ ├── node/ # FlowGramNode enum + node-specific Pydantic models
│ ├── api/ # 6 API I/O models + FlowGramAPIs registry
│ └── runtime/ # IEngine / IExecutor / IContext / IState / IVariableStore / ...
├── core/ # implementation (mirrors @flowgram.ai/runtime-js)
│ ├── domain/ # engine, executor, context, document, state, variable, snapshot,
│ │ # status, message, report, io_center, cache, validation, task, container
│ ├── application/ # WorkflowApplication façade (singleton)
│ ├── nodes/ # 11 node executors
│ └── infrastructure/ # uuid, runtime_type, json_schema_validator, compare_node_groups
└── server/ # FastAPI app: REST + tRPC compat + Swagger UI
Testing
pytest -q
pytest --cov=flowgram_runtime --cov-report=term-missing
Differences vs upstream
- Code node language is Python (upstream is JavaScript via QuickJS WASM). The runtime
contract —
main(p),p["params"]— is preserved. - No tRPC native protocol — we expose a tRPC-compatible HTTP shim on
/trpc/*that accepts the same procedure names and produces tRPC-style responses, sufficient for@trpc/clientto talk to. - Loop scheduling is sequential within a single loop iteration (matches upstream); fan-out
between sibling nodes is concurrent via
asyncio.gather.
License
MIT (matching upstream).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowgram_runtime-0.1.0.tar.gz.
File metadata
- Download URL: flowgram_runtime-0.1.0.tar.gz
- Upload date:
- Size: 59.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5599746df080e8a1d4318fa313b421f564f4375ad6afc6642b1faf86014bd362
|
|
| MD5 |
d0369fbeac79dca0f3f9c04956ea65df
|
|
| BLAKE2b-256 |
955b509d1fddc2d7e7e4565e8ffefb573a33440cb34986000805b43b1a147473
|
Provenance
The following attestation bundles were made for flowgram_runtime-0.1.0.tar.gz:
Publisher:
publish.yml on wn0x00/flowgram-runtime
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flowgram_runtime-0.1.0.tar.gz -
Subject digest:
5599746df080e8a1d4318fa313b421f564f4375ad6afc6642b1faf86014bd362 - Sigstore transparency entry: 1505911060
- Sigstore integration time:
-
Permalink:
wn0x00/flowgram-runtime@e8214bacca1c796fc6f0507166da93c602e8ca99 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/wn0x00
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e8214bacca1c796fc6f0507166da93c602e8ca99 -
Trigger Event:
push
-
Statement type:
File details
Details for the file flowgram_runtime-0.1.0-py3-none-any.whl.
File metadata
- Download URL: flowgram_runtime-0.1.0-py3-none-any.whl
- Upload date:
- Size: 74.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5f3ffabd4f3deb5b6c1351832aa33372bc232e8b42b848de0162f888a4ad5d56
|
|
| MD5 |
3982aea80bf00655abe29dca06fd0bcf
|
|
| BLAKE2b-256 |
421e23f323eebb8bfcf03c6e3c0872c005a0d7329366b4c7cb30e1e0580abc21
|
Provenance
The following attestation bundles were made for flowgram_runtime-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on wn0x00/flowgram-runtime
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flowgram_runtime-0.1.0-py3-none-any.whl -
Subject digest:
5f3ffabd4f3deb5b6c1351832aa33372bc232e8b42b848de0162f888a4ad5d56 - Sigstore transparency entry: 1505911165
- Sigstore integration time:
-
Permalink:
wn0x00/flowgram-runtime@e8214bacca1c796fc6f0507166da93c602e8ca99 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/wn0x00
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e8214bacca1c796fc6f0507166da93c602e8ca99 -
Trigger Event:
push
-
Statement type: