Async workflow orchestration framework with a lightweight DSL.
Project description
SynthFlow
Async workflow orchestration framework with a lightweight DSL.
Experimental project written with Codex. Do not use in production environments.
Why SynthFlow
- Lightweight workflow DSL for async orchestration
- Core control flow:
PARALLEL,IF,OR,SWITCH - Cross-node value passing via
ResultRef - Plugin pipeline for runtime policies (
Retry,Timeout) - Readable tree visualization via
flow.visualize()
Install
Requires Python 3.10+
From PyPI (stable)
pip install synthflow-py
From GitHub (latest development version)
pip install git+https://github.com/sszgr/synthflow.git
Run Tests
Use the standard library test runner:
python3 -m unittest discover -s tests -p 'test_*.py'
Core Concepts
Node: execution unit; implementasync def run(...)Flow: workflow runner and visualizerResultRef: reference another node's output in.input(...)PARALLEL: run branches concurrentlyIF/OR/SWITCH: basic control flow DSLRetry/Timeout: node plugins via.use(...)
Quick Start
import asyncio
from synthflow.core.flow import Flow
from synthflow.core.node import Node, ResultRef
class A(Node):
async def run(self, a, b, c):
return a + b + c
flow = Flow(
A(id="a1").input(1, 2, [3, 4, 5])
>> A(id="a2").input(
ResultRef("a1").item(2), # 5
3,
4,
)
>> A(id="a3").input(
ResultRef("a2").map(lambda x: x * 2),
1,
1,
)
)
flow.visualize()
asyncio.run(flow.run())
Get execution state and timeline:
context = asyncio.run(flow.run(return_context=True))
print(context.state) # ExecutionState.SUCCEEDED / FAILED / CANCELLED
print(context.events) # state transitions with timestamps
print(context.node_events) # per-node lifecycle events
print(context.store) # DataStore
Stream execution events as they happen:
async for event in flow.run_stream():
if event.event == "token":
print(event.data["text"], end="", flush=True)
DSL Example (Parallel + IF + OR)
from synthflow.core.dsl import IF, OR, PARALLEL
from synthflow.core.flow import Flow
from synthflow.core.node import Node, ResultRef
flow = Flow(
Seed(id="seed").input([2, 5, 8, 13, 21])
>> PARALLEL(
SumNode(id="sum_branch").input(ResultRef("seed")),
MaxNode(id="max_branch").input(ResultRef("seed")),
EvenCountNode(id="even_branch").input(ResultRef("seed")),
id="stats_parallel",
on_conflict="overwrite", # overwrite | keep | error
)
>> BuildSummary(id="summary").input(
ResultRef("sum_branch"),
ResultRef("max_branch"),
ResultRef("even_branch"),
)
>> IF(
condition=OR(
lambda store: (store.get_node_result("sum_branch") or 0) > 40,
lambda store: (store.get_node_result("max_branch") or 0) > 20,
),
then_node=Alert(id="alert").input(ResultRef("summary")),
else_node=Normal(id="normal").input(ResultRef("summary")),
id="risk_if",
)
)
Parallel Semantics
- Branches run concurrently.
- If any branch fails, remaining branches are cancelled and the flow raises an exception.
- Merge conflict policy is controlled by
on_conflict:overwrite(default): later branch value overwrites earlier valuekeep: preserve first valueerror: raise on conflicting values
Full runnable example: examples/general_pipeline.py
More Examples
- General pipeline:
PYTHONPATH=. python3 examples/general_pipeline.py - Order fulfillment (realistic e-commerce flow):
PYTHONPATH=. python3 examples/order_fulfillment_demo.py - API aggregation gateway flow:
PYTHONPATH=. python3 examples/api_aggregation_demo.py - DeepSeek streaming orchestration:
DEEPSEEK_API_KEY=... PYTHONPATH=. python3 examples/deepseek_streaming_demo.py - FastAPI chat SSE demo:
pip install fastapi uvicorn && DEEPSEEK_API_KEY=... PYTHONPATH=. uvicorn examples.fastapi_chat_sse_demo:app --reload - GitHub trending repos (recent, formatted table):
PYTHONPATH=. python3 examples/github_trending_demo.py - Type validation:
PYTHONPATH=. python3 examples/type_validation_demo.py - Graphviz export (writes
flow.dot):PYTHONPATH=. python3 examples/graphviz_export_demo.py - Execution context events:
PYTHONPATH=. python3 examples/execution_context_demo.py
DeepSeek streaming demo environment variables:
DEEPSEEK_API_KEY: requiredDEEPSEEK_BASE_URL: optional, defaults tohttps://api.deepseek.comDEEPSEEK_MODEL: optional, defaults todeepseek-chat
The FastAPI SSE demo serves a browser chat page at http://127.0.0.1:8000/ and streams assistant tokens from /chat/stream.
Plugins
Attach plugins on a node with .use(...):
from synthflow.plugins import Retry, Timeout
node = SomeNode().use(Retry(retries=2, delay=0.1)).use(Timeout(seconds=2.0))
Visualize
flow.visualize() prints a tree-style orchestration view, including branch labels:
Flow
└── Seed(seed)
└── Parallel(stats_parallel)
├── [parallel-1] SumNode(sum_branch)
├── [parallel-2] MaxNode(max_branch)
├── [parallel-3] EvenCountNode(even_branch)
└── BuildSummary(summary)
flow.to_graphviz() returns Graphviz DOT text for rendering diagrams in tooling/CI:
dot = flow.to_graphviz()
print(dot)
Type Validation
Optionally declare node input/output schemas:
from synthflow.types import Field
class MyNode(Node):
params_schema = {"x": Field(int)}
output_schema = Field(dict)
async def run(self, x):
return {"value": x}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file synthflow_py-0.1.2.tar.gz.
File metadata
- Download URL: synthflow_py-0.1.2.tar.gz
- Upload date:
- Size: 27.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1ae5bfae60dfa884651d1325f566822f11e436143294e0c5e85e4fce8144213
|
|
| MD5 |
e8ec6533961132386e0ee6a114665b52
|
|
| BLAKE2b-256 |
de5b8c89814973514e30b62b3e006a68420e151ab2f4c77911083b6294caecbd
|
File details
Details for the file synthflow_py-0.1.2-py3-none-any.whl.
File metadata
- Download URL: synthflow_py-0.1.2-py3-none-any.whl
- Upload date:
- Size: 26.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b49e3fae983aa57120e549dae7e3b3c8e28d3cca3df9548dc9b80679b6380bf
|
|
| MD5 |
b017c87bff3804647aa99e47c1bdea67
|
|
| BLAKE2b-256 |
b0a3ff38e7c821258afdd56741f9103291f70cb0db858de1a80450fd10671ecd
|