A distributed and durable workflow engine
Project description
Flowcore
A distributed and durable workflow engine for Python.
Built by Ezequiel Ranieri โ Backend & Security Engineer specialized in Distributed Systems
What is Flowcore?
I built Flowcore to solve a real problem I kept encountering: complex business processes that needed to survive failures, resume from where they stopped, and scale across workers. Flowcore is a lightweight yet powerful workflow execution engine designed to be durable and distributed. It allows defining complex business processes using an elegant DSL based on Python decorators, ensuring reliable step execution with automatic retries and state persistence at every transition.
Quick Demo
from flowcore.domain.dsl.primitives import task, workflow
from flowcore.domain.dsl.models import Step
@task(name="validate_order", max_retries=3)
def validate_order(ctx: dict):
return {"valid": True}
@workflow(name="order_process", version="1.0.0")
class OrderWorkflow:
steps = [
Step(name="validate", task_name="validate_order", next_steps=["pay"]),
Step(name="pay", task_name="process_payment")
]
Flowcore allows you to define complex, multi-step workflows in Python using a declarative DSL and execute them in a distributed environment with guaranteed persistence and resilience.
Quickstart
# Clone the repository
git clone https://github.com/ezequielranieri/flowcore.git
cd flowcore
# Start the MVP stack
make up
# Apply migrations
make migrate
To trigger a real workflow execution, send a request to the API:
# Using curl
curl -X POST http://localhost:8000/workflows/order_process -H "Content-Type: application/json" -d '{}'
# Or using PowerShell/Invoke-RestMethod
Invoke-RestMethod -Uri http://localhost:8000/workflows/order_process -Method Post -ContentType "application/json" -Body '{}'
API / Quickstart UI
You can visualize and interact with the API endpoints using the Swagger UI.
Features
- โ Declarative DSL: Define workflows and tasks with simple decorators.
- ๐ Auto-discovery: Workers automatically discover and register workflow definitions on startup. Zero manual imports required.
- ๐ธ๏ธ Real DAG Engine: Workflow completion uses networkx graph traversal, correctly handling fan-out and complex topologies.
- โก True Distribution: Each workflow step runs as an independent Celery task, enabling horizontal scaling across workers.
- ๐ Advanced Flow Control: Support for Fan-out, Branching (conditions), and Join/Barrier (
wait_for). - ๐ Resilience: Automatic retries with exponential backoff.
- ๐๏ธ Hexagonal Architecture: Decoupled, testable, and maintainable code.
- ๐ Full Persistence: Every execution state is stored in PostgreSQL.
- ๐ญ Distributed Tracing: Full OpenTelemetry instrumentation with Jaeger. Every workflow and step execution is traced end-to-end.
- ๐ก๏ธ Saga Pattern: Automatic compensating actions when a step fails. Completed steps are rolled back in reverse order.
- ๐ Workflow Versioning: Each execution persists the exact workflow version used. Workers resolve the correct definition by version, ensuring in-flight executions are never affected by workflow changes.
- ๐ป Native CLI: Interact with Flowcore from the terminal using
flowcore run,flowcore status,flowcore listandflowcore workflows. Rich-formatted tables with color-coded status. - ๐ข Multi-tenancy: Full tenant isolation via X-Tenant-ID header. Each tenant sees only their own executions. Cross-tenant access is blocked at the API level.
Why Flowcore?
Flowcore bridges the gap between simple task queues and heavyweight orchestrators.
| Feature | Flowcore | Celery | Prefect | Airflow | Temporal |
|---|---|---|---|---|---|
| Durable Execution | โ | โ | โ ๏ธ | โ | โ |
| Low Overhead | โ | โ | โ | โ | โ |
| Native State | โ | โ | โ | โ | โ |
| Learning Curve | Low | Low | Medium | High | High |
Stack & Technical Decisions
Every technology choice in Flowcore was made deliberately:
- Python 3.11+: Leveraging static typing and performance improvements.
- uv: Ultra-fast dependency management.
- FastAPI: Modern, asynchronous API framework.
- Celery 5.5+: The de-facto standard for distributed tasks in Python.
- SQLAlchemy 2.0: Modern declarative style with async support.
- PostgreSQL: Reliable and relational persistence.
Architecture & Concepts
graph TD
API[FastAPI] -->|Start Workflow| Service[WorkflowService]
Service -->|Create Record| DB[(PostgreSQL)]
Service -->|Enqueue| TaskInit[execute_workflow_task]
TaskInit -->|Enqueue Steps| TaskStep[execute_step_task]
TaskStep -->|Loop/Next| TaskStep
TaskStep -->|Update State| DB
The project follows a Hexagonal Architecture (Ports and Adapters):
- Domain: Pure business logic (DSL, Engine).
- Application: Use cases and orchestration.
- Infrastructure: Persistence implementations (SQLAlchemy).
- Adapters: Input/Output ports (FastAPI, Celery).
Key Concepts
- WorkflowDefinition: The "blueprint" of the process defined by the user.
- WorkflowExecution: A live instance of a workflow currently running.
- StepExecution: The individual state of each step, with its input/output data.
- Registry: The catalog where definitions reside.
Project Structure
flowcore/
โโโ src/
โ โโโ flowcore/
โ โโโ domain/ # Pure logic (DSL, Engine)
โ โโโ application/ # Use cases
โ โโโ infrastructure/ # DB, Repositories
โ โโโ adapters/ # API, Worker
โโโ tests/ # Test suite
โโโ migrations/ # Alembic migrations
โโโ pyproject.toml # uv configuration
โโโ Dockerfile # Optimized image
โโโ Makefile # Automation
Known Limitations
- None reported: All core durability and atomicity requirements are now implemented.
Roadmap
- Phase 1 (MVP): Basic orchestration, persistence, and initial DSL. โ Completed
- Phase 2: Real distributed step execution. โ Completed
- Phase 3: DAG engine with networkx + auto-discovery. โ Completed
- Phase 4: Observability with OpenTelemetry + Jaeger. โ Completed
- Phase 5: Sagas / Compensating Actions. โ Completed
- Phase 6: Native CLI. โ Completed
- Phase 7: Workflow versioning. โ Completed
- Phase 8: Multi-tenancy. โ Completed
Contributing
Contributions are welcome! Please read CONTRIBUTING.md for more details on how to get started.
Author
Ezequiel Ranieri
Backend & Security Engineer | Distributed Systems & Authentication
๐ง ez.ranieri@gmail.com
๐ GitHub
๐ผ LinkedIn
License
MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowcore_engine-0.1.0.tar.gz.
File metadata
- Download URL: flowcore_engine-0.1.0.tar.gz
- Upload date:
- Size: 25.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
787ee43a0ef8d0fec53ee00ec677ee9d1fe505eddd7c0efc479bd727ee2c6f1f
|
|
| MD5 |
f9f889345e27dbb23eb9367c59dd2ded
|
|
| BLAKE2b-256 |
811c18df1c9d9dd46e483b853ac446784745cc910abaefff87b8b4378557e074
|
File details
Details for the file flowcore_engine-0.1.0-py3-none-any.whl.
File metadata
- Download URL: flowcore_engine-0.1.0-py3-none-any.whl
- Upload date:
- Size: 31.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ac0b8a798d095b339e00dcddebeab50ed6cf49e454fb2d740b4fd7f134dd33d
|
|
| MD5 |
5a46570f9be82ac099635a3642916996
|
|
| BLAKE2b-256 |
9db31e0058b054f1a07f11b17b43ad19ba7bf5f6d01198cfafade0c9da0eee2f
|