🎲 Dotflow turns an idea into flow! Lightweight Python library for execution pipelines with retry, parallel, cron and async support.
Project description
Dotflow
Dotflow is a lightweight Python library for execution pipelines. Define tasks with decorators, chain them together, and deploy to any cloud — with built-in retry, parallel execution, storage, observability, and cloud deployment.
Why Dotflow?
- Simple —
@actiondecorator +workflow.start(). That's it. - Resilient — Retry, backoff, timeout, checkpoints, and error handling out of the box.
- Observable — OpenTelemetry traces, metrics, and logs. Sentry error tracking.
- Deployable —
dotflow deploy --platform lambdaships your pipeline to AWS in one command. - Portable — Same code runs on Lambda, ECS, Cloud Run, Alibaba FC, Kubernetes, Docker, or GitHub Actions.
Install
pip install dotflow
Quick Start
from dotflow import DotFlow, action
@action
def extract():
return {"users": 150}
@action
def transform(previous_context):
total = previous_context.storage["users"]
return {"users": total, "active": int(total * 0.8)}
@action
def load(previous_context):
print(f"Loaded {previous_context.storage['active']} active users")
workflow = DotFlow()
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
Deploy anywhere
Write your pipeline once. Deploy to any cloud with a single command.
dotflow init
dotflow deploy --platform lambda --project my_pipeline
Supported platforms
| Platform | Deploy method |
|---|---|
docker compose up |
|
dotflow deploy |
|
dotflow deploy --schedule |
|
dotflow deploy |
|
dotflow deploy |
|
dotflow deploy |
|
dotflow deploy |
|
dotflow deploy --schedule |
|
dotflow deploy |
|
dotflow deploy --schedule |
|
kubectl apply |
|
dotflow deploy |
|
dotflow deploy --schedule |
|
dotflow deploy |
Optional extras
pip install dotflow[aws] # S3 storage
pip install dotflow[gcp] # Google Cloud Storage
pip install dotflow[scheduler] # Cron scheduler
pip install dotflow[otel] # OpenTelemetry
pip install dotflow[sentry] # Sentry error tracking
pip install dotflow[deploy-aws] # AWS deploy (Lambda, ECS)
pip install dotflow[deploy-gcp] # GCP deploy (Cloud Run)
pip install dotflow[deploy-alibaba] # Alibaba Cloud deploy (FC)
pip install dotflow[deploy-github] # GitHub Actions deploy
Documentation
| Section | Description |
|---|---|
| Concepts | Workflows, tasks, context, providers, process modes |
| How-to Guides | Step-by-step tutorials for workflows, tasks, and CLI |
| Cloud Deployment | Deploy to AWS, GCP, Alibaba, Kubernetes, Docker, GitHub Actions |
| Integrations | OpenTelemetry, Sentry, Telegram, Discord, S3, GCS, Server |
| Examples | Real-world pipelines: ETL, health checks, async, scheduler |
| Reference | API reference for all classes and providers |
| Custom Providers | Build your own storage, notify, log, tracer, or metrics provider |
Features
Observability
OpenTelemetry docs | Sentry docs | Tracer docs | Metrics docs
Built-in support for OpenTelemetry and Sentry:
from dotflow import Config
from dotflow.providers import LogOpenTelemetry, TracerOpenTelemetry, MetricsOpenTelemetry
# OpenTelemetry: traces, metrics, and structured logs
config = Config(
log=LogOpenTelemetry(service_name="my-pipeline"),
tracer=TracerOpenTelemetry(service_name="my-pipeline"),
metrics=MetricsOpenTelemetry(service_name="my-pipeline"),
)
from dotflow.providers import LogSentry, TracerSentry
# Sentry: error tracking + performance monitoring
config = Config(
log=LogSentry(dsn="https://xxx@sentry.io/123"),
tracer=TracerSentry(),
)
Execution Modes
Dotflow supports 4 execution strategies out of the box:
Sequential (default)
Tasks run one after another. The context from each task flows to the next.
workflow.task.add(step=task_a)
workflow.task.add(step=task_b)
workflow.start() # or mode="sequential"
flowchart LR
A[task_a] --> B[task_b] --> C[Finish]
Background
Same as sequential, but runs in a background thread — non-blocking.
workflow.start(mode="background")
Parallel
Every task runs simultaneously in its own process.
workflow.task.add(step=task_a)
workflow.task.add(step=task_b)
workflow.task.add(step=task_c)
workflow.start(mode="parallel")
flowchart TD
S[Start] --> A[task_a] & B[task_b] & C[task_c]
A & B & C --> F[Finish]
Parallel Groups
Assign tasks to named groups. Groups run in parallel, but tasks within each group run sequentially.
workflow.task.add(step=fetch_users, group_name="users")
workflow.task.add(step=save_users, group_name="users")
workflow.task.add(step=fetch_orders, group_name="orders")
workflow.task.add(step=save_orders, group_name="orders")
workflow.start()
flowchart TD
S[Start] --> G1[Group: users] & G2[Group: orders]
G1 --> A[fetch_users] --> B[save_users]
G2 --> C[fetch_orders] --> D[save_orders]
B & D --> F[Finish]
Retry, Timeout & Backoff
The @action decorator supports built-in resilience options:
@action(retry=3, timeout=10, retry_delay=2, backoff=True)
def unreliable_api_call():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
return response.json()
| Parameter | Type | Default | Description |
|---|---|---|---|
retry |
int |
1 |
Number of attempts before failing |
timeout |
int |
0 |
Max seconds per attempt (0 = no limit) |
retry_delay |
int |
1 |
Seconds to wait between retries |
backoff |
bool |
False |
Exponential backoff (delay doubles each retry) |
Context System
Tasks communicate through a context chain. Each task receives the previous task's output and can access its own initial context.
@action
def step_one():
return "Hello"
@action
def step_two(previous_context, initial_context):
greeting = previous_context.storage # "Hello"
name = initial_context.storage # "World"
return f"{greeting}, {name}!"
workflow = DotFlow()
workflow.task.add(step=step_one)
workflow.task.add(step=step_two, initial_context="World")
workflow.start()
Each Context object contains:
storage— the return value from the tasktask_id— the task identifierworkflow_id— the workflow identifiertime— timestamp of execution
Checkpoint & Resume
Resume a workflow from where it left off. Requires a persistent storage provider and a fixed workflow_id.
from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile
config = Config(storage=StorageFile())
workflow = DotFlow(config=config, workflow_id="my-pipeline-v1")
workflow.task.add(step=step_a)
workflow.task.add(step=step_b)
workflow.task.add(step=step_c)
# First run — executes all tasks and saves checkpoints
workflow.start()
# If step_c failed, fix and re-run — skips step_a and step_b
workflow.start(resume=True)
Storage Providers
Choose where task results are persisted:
In-Memory (default)
from dotflow import DotFlow
workflow = DotFlow() # uses StorageDefault (in-memory)
File System
from dotflow import DotFlow, Config
from dotflow.providers import StorageFile
config = Config(storage=StorageFile(path=".output"))
workflow = DotFlow(config=config)
AWS S3
pip install dotflow[aws]
from dotflow import DotFlow, Config
from dotflow.providers import StorageS3
config = Config(storage=StorageS3(bucket="my-bucket", prefix="pipelines/", region="us-east-1"))
workflow = DotFlow(config=config)
Google Cloud Storage
pip install dotflow[gcp]
from dotflow import DotFlow, Config
from dotflow.providers import StorageGCS
config = Config(storage=StorageGCS(bucket="my-bucket", prefix="pipelines/", project="my-project"))
workflow = DotFlow(config=config)
Notifications
Get notified about task status changes via Telegram or Discord.
from dotflow import Config
from dotflow.providers import NotifyTelegram
config = Config(notify=NotifyTelegram(
token="YOUR_BOT_TOKEN",
chat_id=123456789,
))
from dotflow.providers import NotifyDiscord
config = Config(notify=NotifyDiscord(
webhook_url="https://discord.com/api/webhooks/...",
))
Class-Based Steps
Return a class instance from a task, and Dotflow will automatically discover and execute all @action-decorated methods in source order.
from dotflow import action
class ETLPipeline:
@action
def extract(self):
return {"raw": [1, 2, 3]}
@action
def transform(self, previous_context):
data = previous_context.storage["raw"]
return {"processed": [x * 2 for x in data]}
@action
def load(self, previous_context):
print(f"Loaded: {previous_context.storage['processed']}")
@action
def run_pipeline():
return ETLPipeline()
workflow = DotFlow()
workflow.task.add(step=run_pipeline)
workflow.start()
Task Groups
Organize tasks into named groups for parallel group execution.
workflow.task.add(step=scrape_site_a, group_name="scraping")
workflow.task.add(step=scrape_site_b, group_name="scraping")
workflow.task.add(step=process_data, group_name="processing")
workflow.task.add(step=save_results, group_name="processing")
workflow.start() # groups run in parallel, tasks within each group run sequentially
Callbacks
Execute a function after each task completes — useful for logging, alerting, or side effects.
def on_task_done(task):
print(f"Task {task.task_id} finished with status: {task.status}")
workflow.task.add(step=my_step, callback=on_task_done)
Workflow-level callbacks for success and failure:
def on_success(*args, **kwargs):
print("All tasks completed!")
def on_failure(*args, **kwargs):
print("Something went wrong.")
workflow.start(on_success=on_success, on_failure=on_failure)
Error Handling
Control whether the workflow stops or continues when a task fails:
# Stop on first failure (default)
workflow.start(keep_going=False)
# Continue executing remaining tasks even if one fails
workflow.start(keep_going=True)
Each task tracks its errors with full detail:
- Attempt number
- Exception type and message
- Traceback
Access results after execution:
for task in workflow.result_task():
print(f"Task {task.task_id}: {task.status}")
if task.errors:
print(f" Errors: {task.errors}")
Async Support
@action automatically detects and handles async functions:
import httpx
from dotflow import DotFlow, action
@action(timeout=30)
async def fetch_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
workflow = DotFlow()
workflow.task.add(step=fetch_data)
workflow.start()
Scheduler / Cron
Cron scheduler docs | Default scheduler | Cron overlap (concepts)
Schedule workflows to run automatically using cron expressions.
pip install dotflow[scheduler]
from dotflow import DotFlow, Config, action
from dotflow.providers import SchedulerCron
@action
def sync_data():
return {"synced": True}
config = Config(scheduler=SchedulerCron(cron="*/5 * * * *"))
workflow = DotFlow(config=config)
workflow.task.add(step=sync_data)
workflow.schedule()
Overlap Strategies
Control what happens when a new execution triggers while the previous one is still running:
| Strategy | Description |
|---|---|
skip |
Drops the new run if the previous is still active (default) |
queue |
Buffers one pending run, executes when the current finishes |
parallel |
Runs up to 10 concurrent executions via semaphore |
from dotflow.providers import SchedulerCron
# Queue overlapping executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="queue")
# Allow parallel executions
scheduler = SchedulerCron(cron="*/5 * * * *", overlap="parallel")
The scheduler handles graceful shutdown via SIGINT/SIGTERM signals automatically.
CLI
Run workflows directly from the command line:
# Simple execution
dotflow start --step my_module.my_task
# With initial context
dotflow start --step my_module.my_task --initial-context '{"key": "value"}'
# With callback
dotflow start --step my_module.my_task --callback my_module.on_done
# With execution mode
dotflow start --step my_module.my_task --mode parallel
# With file storage
dotflow start --step my_module.my_task --storage file --path .output
# With S3 storage
dotflow start --step my_module.my_task --storage s3
# With GCS storage
dotflow start --step my_module.my_task --storage gcs
# Schedule with cron
dotflow schedule --step my_module.my_task --cron "*/5 * * * *"
# Schedule with overlap strategy
dotflow schedule --step my_module.my_task --cron "0 * * * *" --overlap queue
# Schedule with resume
dotflow schedule --step my_module.my_task --cron "0 */6 * * *" --storage file --resume
Available CLI commands:
| Command | Description |
|---|---|
dotflow init |
Scaffold a new project with cloud support |
dotflow start |
Run a workflow |
dotflow schedule |
Run a workflow on a cron schedule |
dotflow logs |
View execution logs |
dotflow cloud list |
Show available cloud platforms |
dotflow cloud generate --platform <name> |
Generate deployment files |
dotflow deploy --platform <name> --project <name> |
Deploy to cloud |
Server Provider
Send workflow and task execution data to a remote API (e.g. dotflow-api) in real time.
from dotflow import DotFlow, Config, action
from dotflow.providers import ServerDefault
@action
def my_task():
return {"result": "ok"}
config = Config(
server=ServerDefault(
base_url="http://localhost:8000/api/v1",
user_token="your-api-token",
)
)
workflow = DotFlow(config=config)
workflow.task.add(step=my_task)
workflow.start()
| Parameter | Type | Default | Description |
|---|---|---|---|
base_url |
str |
"" |
API base URL |
user_token |
str |
"" |
API token (X-User-Token header) |
timeout |
float |
5.0 |
HTTP request timeout in seconds |
The server provider automatically:
- Creates the workflow on
DotFlow()init - Creates each task on
task.add() - Updates task status on each transition (In progress, Completed, Failed, Retry)
- Updates workflow status on completion (In progress → Completed)
Dependency Injection via Config
The Config class lets you swap providers for storage, notifications, logging, scheduling, and server:
from dotflow import DotFlow, Config
from dotflow.providers import StorageFile, NotifyTelegram, LogDefault, SchedulerCron, ServerDefault
config = Config(
storage=StorageFile(path=".output"),
notify=NotifyTelegram(token="...", chat_id=123),
log=LogDefault(),
scheduler=SchedulerCron(cron="0 * * * *"),
server=ServerDefault(base_url="...", user_token="..."),
)
workflow = DotFlow(config=config)
Extend Dotflow by implementing the abstract base classes:
| ABC | Methods | Purpose |
|---|---|---|
Storage |
post, get, key |
Custom storage backends |
Notify |
hook_status_task |
Custom notification channels |
Log |
info, error, warning, debug |
Custom logging |
Scheduler |
start, stop |
Custom scheduling strategies |
Tracer |
start_workflow, end_workflow, start_task, end_task |
Distributed tracing |
Metrics |
workflow_started, workflow_completed, workflow_failed, task_completed, task_failed, task_retried |
Counters and histograms |
Server |
create_workflow, update_workflow, create_task, update_task |
Remote API communication |
Results & Inspection
After execution, inspect results directly from the workflow object:
workflow.start()
# List of Task objects
tasks = workflow.result_task()
# List of Context objects (one per task)
contexts = workflow.result_context()
# List of storage values (raw return values)
storages = workflow.result_storage()
# Serialized result (Pydantic model)
result = workflow.result()
Task builder utilities:
workflow.task.count() # Number of tasks
workflow.task.clear() # Remove all tasks
workflow.task.reverse() # Reverse execution order
workflow.task.schema() # Pydantic schema of the workflow
Dynamic Module Import
Reference tasks and callbacks by their module path string instead of importing them directly:
workflow.task.add(step="my_package.tasks.process_data")
workflow.task.add(step="my_package.tasks.save_results", callback="my_package.callbacks.notify")
More Examples
All examples are available in the docs_src/ directory.
Commit Style
| Icon | Type | Description |
|---|---|---|
| ⚙️ | FEATURE | New feature |
| 📝 | PEP8 | Formatting fixes following PEP8 |
| 📌 | ISSUE | Reference to issue |
| 🪲 | BUG | Bug fix |
| 📘 | DOCS | Documentation changes |
| 📦 | PyPI | PyPI releases |
| ❤️️ | TEST | Automated tests |
| ⬆️ | CI/CD | Changes in continuous integration/delivery |
| ⚠️ | SECURITY | Security improvements |
License
This project is licensed under the terms of the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dotflow-0.15.0.dev7.tar.gz.
File metadata
- Download URL: dotflow-0.15.0.dev7.tar.gz
- Upload date:
- Size: 59.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
670644840a274ac169d4ad1aeab32d81ee9b69e5633a1b95d8c9f473b4fc1eea
|
|
| MD5 |
701836e501d3dcf90f30d4faba10c4b3
|
|
| BLAKE2b-256 |
dc835c37b9cabc13867df1ca8dd21cf7f01bd4e3b8834bc62e40518502cdefeb
|
Provenance
The following attestation bundles were made for dotflow-0.15.0.dev7.tar.gz:
Publisher:
python-publish-pypi.yml on dotflow-io/dotflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dotflow-0.15.0.dev7.tar.gz -
Subject digest:
670644840a274ac169d4ad1aeab32d81ee9b69e5633a1b95d8c9f473b4fc1eea - Sigstore transparency entry: 1280203429
- Sigstore integration time:
-
Permalink:
dotflow-io/dotflow@2bbb4204c5b8151f61a2a73cca8c62ea04021508 -
Branch / Tag:
refs/tags/v0.15.0.dev7 - Owner: https://github.com/dotflow-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish-pypi.yml@2bbb4204c5b8151f61a2a73cca8c62ea04021508 -
Trigger Event:
release
-
Statement type:
File details
Details for the file dotflow-0.15.0.dev7-py3-none-any.whl.
File metadata
- Download URL: dotflow-0.15.0.dev7-py3-none-any.whl
- Upload date:
- Size: 94.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8cf80bcfcdccbbf2f3272f2a12c1ab5fac0bda0164705a459c10e463e454577e
|
|
| MD5 |
1bc422638fa8f5381c866856efb68a27
|
|
| BLAKE2b-256 |
9639cf1fc9d520d06764888ef67aa2f56bc43b06769f36ea4eb26f55fd40a9a6
|
Provenance
The following attestation bundles were made for dotflow-0.15.0.dev7-py3-none-any.whl:
Publisher:
python-publish-pypi.yml on dotflow-io/dotflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
dotflow-0.15.0.dev7-py3-none-any.whl -
Subject digest:
8cf80bcfcdccbbf2f3272f2a12c1ab5fac0bda0164705a459c10e463e454577e - Sigstore transparency entry: 1280203439
- Sigstore integration time:
-
Permalink:
dotflow-io/dotflow@2bbb4204c5b8151f61a2a73cca8c62ea04021508 -
Branch / Tag:
refs/tags/v0.15.0.dev7 - Owner: https://github.com/dotflow-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish-pypi.yml@2bbb4204c5b8151f61a2a73cca8c62ea04021508 -
Trigger Event:
release
-
Statement type: