Async workflow orchestration framework with a lightweight DSL.
Project description
SynthFlow
Async workflow orchestration framework with a lightweight DSL.
Experimental project written with Codex. Do not use in production environments.
Why SynthFlow
- Lightweight workflow DSL for async orchestration
- Core control flow:
PARALLEL,IF,OR,SWITCH - Cross-node value passing via
ResultRef - Plugin pipeline for runtime policies (
Retry,Timeout) - Readable tree visualization via
flow.visualize()
Install
Requires Python 3.10+
From PyPI (stable)
pip install synthflow-py
From GitHub (latest development version)
pip install git+https://github.com/sszgr/synthflow.git
Core Concepts
Node: execution unit; implementasync def run(...)Flow: workflow runner and visualizerResultRef: reference another node's output in.input(...)PARALLEL: run branches concurrentlyIF/OR/SWITCH: basic control flow DSLRetry/Timeout: node plugins via.use(...)
Quick Start
import asyncio
from synthflow.core.flow import Flow
from synthflow.core.node import Node, ResultRef
class A(Node):
async def run(self, a, b, c):
return a + b + c
flow = Flow(
A(id="a1").input(1, 2, [3, 4, 5])
>> A(id="a2").input(
ResultRef("a1").item(2), # 5
3,
4,
)
>> A(id="a3").input(
ResultRef("a2").map(lambda x: x * 2),
1,
1,
)
)
flow.visualize()
asyncio.run(flow.run())
DSL Example (Parallel + IF + OR)
from synthflow.core.dsl import IF, OR, PARALLEL
from synthflow.core.flow import Flow
from synthflow.core.node import Node, ResultRef
flow = Flow(
Seed(id="seed").input([2, 5, 8, 13, 21])
>> PARALLEL(
SumNode(id="sum_branch").input(ResultRef("seed")),
MaxNode(id="max_branch").input(ResultRef("seed")),
EvenCountNode(id="even_branch").input(ResultRef("seed")),
id="stats_parallel",
)
>> BuildSummary(id="summary").input(
ResultRef("sum_branch"),
ResultRef("max_branch"),
ResultRef("even_branch"),
)
>> IF(
condition=OR(
lambda store: (store.get_node_result("sum_branch") or 0) > 40,
lambda store: (store.get_node_result("max_branch") or 0) > 20,
),
then_node=Alert(id="alert").input(ResultRef("summary")),
else_node=Normal(id="normal").input(ResultRef("summary")),
id="risk_if",
)
)
Full runnable example: examples/general_pipeline.py
Plugins
Attach plugins on a node with .use(...):
from synthflow.plugins import Retry, Timeout
node = SomeNode().use(Retry(retries=2, delay=0.1)).use(Timeout(seconds=2.0))
Visualize
flow.visualize() prints a tree-style orchestration view, including branch labels:
Flow
└── Seed(seed)
└── Parallel(stats_parallel)
├── [parallel-1] SumNode(sum_branch)
├── [parallel-2] MaxNode(max_branch)
├── [parallel-3] EvenCountNode(even_branch)
└── BuildSummary(summary)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file synthflow_py-0.1.1.tar.gz.
File metadata
- Download URL: synthflow_py-0.1.1.tar.gz
- Upload date:
- Size: 13.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
63a07bbfd3f16d1eb5749f61b12777fc9d1fc4b0270d7b3c350be0a2b0edb650
|
|
| MD5 |
b2e2ccb5d6d6f8d5d3ae9ce3630af4dc
|
|
| BLAKE2b-256 |
c8766df71291d90bac17e0b04b26ca496c5f9d6e98d92cba9ac703e4ec389e07
|
File details
Details for the file synthflow_py-0.1.1-py3-none-any.whl.
File metadata
- Download URL: synthflow_py-0.1.1-py3-none-any.whl
- Upload date:
- Size: 15.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1230e34beb664a3aa005c565b4404a828777d0ff717bca121ab101cdecebef6b
|
|
| MD5 |
95c7f9dd7e1785fd1adafb152ffbb961
|
|
| BLAKE2b-256 |
8d0e7cdbf48cfbef241e03791c536da455060f67f6dd4bd9bdb32ae5fd7db771
|