A lightweight, type-hint driven engine for executing Directed Acyclic Graphs (DAGs) and lockstep pipelines in Python.
Project description
SynaFlow 🌊🧠
SynaFlow is a lightweight, pure-Python pipeline engine that uses Type Hints to magically wire and execute Directed Acyclic Graphs (DAGs).
Why the name? It's a combination of Synapse + Flow. Just like synapses automatically wire neurons together to form a neural network, SynaFlow automatically wires your functions together based on their types. And "Flow" represents the lazy, streaming nature of how data moves through those connections.
It solves the "dependency hell" and boilerplate associated with building data pipelines by automatically inferring the flow of data based exclusively on Python's static type annotations.
📖 Read the Core Principles: To understand our decisions regarding Stream Processing, Clean Business Rules, and Cross-Orchestrator architecture, check out our Design Philosophy.
The Problem It Solves
Building data pipelines usually involves two headaches:
- Explicit Wiring: You have to manually define which function outputs go to which function inputs (e.g.,
A >> B >> C), creating verbose and fragile architectures. - Memory Explosions vs. Lazy Evaluation: Passing large datasets around usually means holding them entirely in memory (Lists) or dealing with complex generator management. If you have multiple consumers for a single generator, you usually have to write clunky
itertools.teeboilerplate yourself.
The SynaFlow Solution
SynaFlow looks at the Type Hints of your functions and automatically wires everything together for you. If Step A outputs an int and Step B requires an int, SynaFlow connects them instantly.
Furthermore, SynaFlow has a smart lockstep streaming engine:
- If a producer yields a
Generatorand a consumer expects anIterator, SynaFlow streams the data lazily without ever holding it in memory. - If multiple consumers want that same generator, SynaFlow automatically forks it (
tee) and drives them in parallel (lockstep). - If one consumer explicitly asks for a
list, SynaFlow automatically materializes the data only for that specific branch.
How is it different from other frameworks?
There are many amazing orchestration frameworks out there, but SynaFlow fills a very specific gap: In-process Streaming Micro-Orchestration.
vs. Hamilton
Hamilton is a fantastic tool that also uses Python function signatures to build DAGs. However, Hamilton is heavily geared towards DataFrames and feature engineering, generally expecting functions to return concrete values (columns/scalars). SynaFlow, on the other hand, is built from the ground up to support Native Generators and Lazy Streaming. While Hamilton maps functions to columns, SynaFlow maps functions to continuous data streams, automatically interleaving multiple consumers in lockstep without memory spikes.
vs. Airflow / Prefect / Dagster
These are Macro-Orchestrators. They are designed to orchestrate heavy, distributed tasks across clusters, Docker containers, and different machines. They rely on state databases and massive IO overhead. SynaFlow is a Micro-Orchestrator. It runs entirely within a single Python process. You would use Airflow to trigger a daily job, but you would use SynaFlow inside that job to smartly route and stream millions of rows between your Python functions.
Quickstart
from typing import NamedTuple
from collections.abc import Generator, Iterator
from synaflow import pipeline, step, run
# Define the data required to start your pipeline
class MyParams(NamedTuple):
count: int
# 1. Producer outputs a stream
def producer(count: int) -> Generator[int, None, None]:
yield from range(count)
# 2. Transformer consumes the stream lazily
def transformer(producer: Iterator[int]) -> Generator[int, None, None]:
for val in producer:
yield val * 10
# 3. Consumer automatically gets the stream!
def consumer(transformer: Iterator[int]) -> None:
for x in transformer:
print(f"Consumed: {x}")
# SynaFlow reads the Type Hints and wires the DAG automatically!
my_pipeline = pipeline(
name="example",
params=MyParams,
steps=[
step("producer", fn=producer),
step("transformer", fn=transformer),
step("consumer", fn=consumer)
]
)
# Run it
run(my_pipeline, MyParams(count=5))
# Export the DAG as JSON
print(my_pipeline.to_dict())
When you export the pipeline using my_pipeline.to_dict(), you get a precise representation of the nodes, their inferred dependencies, and their return types:
{
"producer": {
"deps": {
"count": "int"
},
"output": "Generator[int, None, None]",
"fn": "producer",
"on_error": "stop",
"needs_materialize": false
},
"transformer": {
"deps": {
"producer": "Iterator[int]"
},
"output": "Generator[int, None, None]",
"fn": "transformer",
"on_error": "stop",
"needs_materialize": false
},
"consumer": {
"deps": {
"transformer": "Iterator[int]"
},
"output": "None",
"fn": "consumer",
"on_error": "stop",
"needs_materialize": false
},
"count": {
"deps": {},
"output": "int",
"fn": null,
"on_error": null,
"needs_materialize": false
}
}
Execution Semantics & Custom Runners
The native run() function in SynaFlow is designed as an In-Process Lockstep Executor. Its semantics are carefully crafted for streaming:
- Topological Order: Steps are evaluated in topological order, guaranteeing dependencies are resolved before a step starts.
- Lockstep Execution: If multiple steps depend on the same
Generator, SynaFlow forks it (usingitertools.tee) and advances them together (lockstep). It yields one item from the generator, passes it to the first consumer, then the second consumer, before pulling the next item. This ensures peak memory efficiency. - Lazy Materialization: If a step explicitly requests a
listorset, SynaFlow will consume the entire generator and hold it in memory, but only for that specific branch.
Build Your Own Runner!
The pipeline(...) definition is simply a static description of the DAG. It produces a PipelineDef object. You are not locked into our native runner!
Because the DAG is fully decoupled from execution, you or the community can write custom runners to process the PipelineDef in different ways:
- An AsyncRunner that executes independent branches using
asyncio.gather. - A DistributedRunner that compiles the DAG into an Airflow or Ray graph.
- A VisualizerRunner that turns the DAG into an HTML diagram.
Advanced Features
- Auto-DAG compilation and validation before execution.
- Strict type-checking: Pipeline refuses to run if type annotations are incompatible.
- Easily export DAG structures as JSON (
my_pipeline.to_dict()) for snapshot testing or UI rendering.
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file synaflow-0.12.0.tar.gz.
File metadata
- Download URL: synaflow-0.12.0.tar.gz
- Upload date:
- Size: 114.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3e4b437ab3f7735b5f3f091567fa8f06be79a19e4a47f0028e65feea92b8e117
|
|
| MD5 |
96ef6052ea0fa0212ffedcdfe2b905f6
|
|
| BLAKE2b-256 |
dae8de3d5d6c999b8764c25ea5c5acf93aa049fb05e376ca91433c19c8655462
|
Provenance
The following attestation bundles were made for synaflow-0.12.0.tar.gz:
Publisher:
release.yml on humansoftware/synaflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
synaflow-0.12.0.tar.gz -
Subject digest:
3e4b437ab3f7735b5f3f091567fa8f06be79a19e4a47f0028e65feea92b8e117 - Sigstore transparency entry: 1820454267
- Sigstore integration time:
-
Permalink:
humansoftware/synaflow@2f8302a19e18e3f2abc1c389052e6620f286f8c3 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/humansoftware
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2f8302a19e18e3f2abc1c389052e6620f286f8c3 -
Trigger Event:
push
-
Statement type:
File details
Details for the file synaflow-0.12.0-py3-none-any.whl.
File metadata
- Download URL: synaflow-0.12.0-py3-none-any.whl
- Upload date:
- Size: 36.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f30fabab8fa994abe44c4de6435e7dd02e0549e4e2fe3b77cea72b6bff45f990
|
|
| MD5 |
1f19c2dfba0a68a5f95e9c93743e7769
|
|
| BLAKE2b-256 |
36c8ac4685bdb49348c56857edb0d930f3d432733c16bad95d5052171a6961ac
|
Provenance
The following attestation bundles were made for synaflow-0.12.0-py3-none-any.whl:
Publisher:
release.yml on humansoftware/synaflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
synaflow-0.12.0-py3-none-any.whl -
Subject digest:
f30fabab8fa994abe44c4de6435e7dd02e0549e4e2fe3b77cea72b6bff45f990 - Sigstore transparency entry: 1820454291
- Sigstore integration time:
-
Permalink:
humansoftware/synaflow@2f8302a19e18e3f2abc1c389052e6620f286f8c3 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/humansoftware
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2f8302a19e18e3f2abc1c389052e6620f286f8c3 -
Trigger Event:
push
-
Statement type: