Skip to main content

Python runtime for bytedance/flowgram.ai workflows — execute visually-authored graph workflows with LLM, HTTP, condition, loop and code (Python sandbox) nodes.

Project description

flowgram-runtime

Python port of bytedance/flowgram.ai packages/runtime.

A workflow runtime engine that parses and executes graph-based workflows containing the same node set as the upstream TS reference implementation:

Node Status Notes
start / end full input/output declaration support
condition 14 operators × 9 types via rules table
loop / break / continue sub-context scope chain, locals (item, index)
block-start / block-end loop sub-graph boundaries
llm OpenAI-compatible chat completions (async)
http 6 body types (none / json / form-data / x-www-form-urlencoded / raw-text / binary), retry with exponential backoff
code Python sandbox via RestrictedPython (60s timeout, blocks imports / dunder access) — note: upstream uses QuickJS; this is the intentional language difference.

Install

pip install -e ".[dev]"   # development
# or after publish:
pip install flowgram-runtime

CLI

After installing the package (pip install flowgram-runtime), the flowgram command becomes available:

flowgram --version
flowgram serve --host 0.0.0.0 --port 4000

REST API

Matches upstream paths exactly so frontends configured for the TS runtime can swap the host.

Method Path Body / Query Response
GET /info server info
POST /task/run {schema: string, inputs: object} {taskID}
GET /task/report ?taskID= IReport
GET /task/result ?taskID= workflow outputs
PUT /task/cancel {taskID} {success: bool}
POST /task/validate {schema: string, inputs: object} {valid, errors?}

schema is a JSON string of the workflow definition (mirrors upstream contract).

tRPC compatibility

The /trpc/<procedure> namespace accepts the same procedures by both slash and dot notation:

  • info, task.run, task.report, task.result, task.cancel, task.validate

Supported request formats:

  • GET /trpc/<proc>?input=<url-encoded-json>
  • POST /trpc/<proc> with body {"json": <payload>} (tRPC v10 wrapper) or bare <payload>
  • Batch mode: ?batch=1 with comma-separated procedure list and indexed input map

Responses are wrapped as {"result": {"data": <output>}} per tRPC convention.

Python SDK

import flowgram_runtime as fg

schema = {
    "nodes": [
        {"id": "start_0", "type": "start", "meta": {"position": {"x": 0, "y": 0}},
         "data": {"outputs": {"type": "object", "properties": {"msg": {"type": "string"}}}}},
        {"id": "end_0", "type": "end", "meta": {"position": {"x": 200, "y": 0}},
         "data": {
             "inputs": {"type": "object", "properties": {"echo": {"type": "string"}}},
             "inputsValues": {"echo": {"type": "ref", "content": ["start_0", "msg"]}},
         }},
    ],
    "edges": [{"sourceNodeID": "start_0", "targetNodeID": "end_0"}],
}

Fire-and-await

outputs = await fg.run(schema, inputs={"msg": "hello"})
# {'echo': 'hello'}

With a task handle (status / report / cancel)

task = fg.start(schema, inputs={"msg": "hello"})
outputs = await task.wait(timeout=10)        # raises TimeoutError on timeout
print(task.status)                            # WorkflowStatus.Succeeded
print(task.report().reports.keys())           # per-node reports
task.cancel()                                 # idempotent

From sync code

outputs = fg.run_sync(schema, inputs={"msg": "hi"})

Refuses to run inside an event loop — use await fg.run(...) from async code.

Validate without running

result = fg.validate(schema, inputs={"msg": "x"})
if not result.valid:
    print(result.errors)

Custom node

class SentimentNode(fg.INodeExecutor):
    type = "sentiment"
    async def execute(self, ctx: fg.ExecutionContext) -> fg.ExecutionResult:
        text = ctx.inputs.get("text", "")
        return fg.ExecutionResult(outputs={"score": 0.87})

fg.register_executor(SentimentNode())
# Workflows with `"type": "sentiment"` nodes can now run.

Embed the HTTP server inside your FastAPI

your_app.mount("/_flowgram", fg.create_app())

fg.create_app is lazy — import flowgram_runtime doesn't pull in FastAPI/uvicorn unless you actually call it.

Schema input forms

fg.run / fg.start / fg.validate accept the schema as a dict, a JSON string, or a fg.WorkflowSchema instance — pick whatever fits your storage layer.

Lower-level handles (advanced)

The facade covers the 90% case. For full control reach for the underlying types:

from flowgram_runtime import WorkflowApplication, InvokeParams, WorkflowSchema
app = WorkflowApplication.instance()
task_id = app.run(InvokeParams(schema=WorkflowSchema.model_validate(schema), inputs={...}))

Code node — Python sandbox contract

The script must define a function main(p) where p["params"] is the parsed inputs dict. It should return a dict (which becomes the node outputs); non-dict returns are wrapped as {"result": <value>}.

# data.script.content
def main(p):
    return {"sum": p["params"]["a"] + p["params"]["b"]}

Restrictions enforced by RestrictedPython:

  • No import statements
  • No access to dunder attributes (__class__, __bases__, …)
  • Restricted builtins (no eval, exec, open, __import__, …)
  • 60-second hard timeout

Architecture

src/flowgram_runtime/
├── interface/          # Pydantic models + ABCs (mirrors @flowgram.ai/runtime-interface)
│   ├── schema/         # WorkflowSchema / NodeSchema / EdgeSchema / IFlowValue / IJsonSchema
│   ├── node/           # FlowGramNode enum + node-specific Pydantic models
│   ├── api/            # 6 API I/O models + FlowGramAPIs registry
│   └── runtime/        # IEngine / IExecutor / IContext / IState / IVariableStore / ...
├── core/               # implementation (mirrors @flowgram.ai/runtime-js)
│   ├── domain/         # engine, executor, context, document, state, variable, snapshot,
│   │                   # status, message, report, io_center, cache, validation, task, container
│   ├── application/    # WorkflowApplication façade (singleton)
│   ├── nodes/          # 11 node executors
│   └── infrastructure/ # uuid, runtime_type, json_schema_validator, compare_node_groups
└── server/             # FastAPI app: REST + tRPC compat + Swagger UI

Testing

pytest -q
pytest --cov=flowgram_runtime --cov-report=term-missing

Differences vs upstream

  1. Code node language is Python (upstream is JavaScript via QuickJS WASM). The runtime contract — main(p), p["params"] — is preserved.
  2. No tRPC native protocol — we expose a tRPC-compatible HTTP shim on /trpc/* that accepts the same procedure names and produces tRPC-style responses, sufficient for @trpc/client to talk to.
  3. Loop scheduling is sequential within a single loop iteration (matches upstream); fan-out between sibling nodes is concurrent via asyncio.gather.

License

MIT (matching upstream).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowgram_runtime-0.1.0.tar.gz (59.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowgram_runtime-0.1.0-py3-none-any.whl (74.7 kB view details)

Uploaded Python 3

File details

Details for the file flowgram_runtime-0.1.0.tar.gz.

File metadata

  • Download URL: flowgram_runtime-0.1.0.tar.gz
  • Upload date:
  • Size: 59.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for flowgram_runtime-0.1.0.tar.gz
Algorithm Hash digest
SHA256 5599746df080e8a1d4318fa313b421f564f4375ad6afc6642b1faf86014bd362
MD5 d0369fbeac79dca0f3f9c04956ea65df
BLAKE2b-256 955b509d1fddc2d7e7e4565e8ffefb573a33440cb34986000805b43b1a147473

See more details on using hashes here.

Provenance

The following attestation bundles were made for flowgram_runtime-0.1.0.tar.gz:

Publisher: publish.yml on wn0x00/flowgram-runtime

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flowgram_runtime-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for flowgram_runtime-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5f3ffabd4f3deb5b6c1351832aa33372bc232e8b42b848de0162f888a4ad5d56
MD5 3982aea80bf00655abe29dca06fd0bcf
BLAKE2b-256 421e23f323eebb8bfcf03c6e3c0872c005a0d7329366b4c7cb30e1e0580abc21

See more details on using hashes here.

Provenance

The following attestation bundles were made for flowgram_runtime-0.1.0-py3-none-any.whl:

Publisher: publish.yml on wn0x00/flowgram-runtime

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page