Manage Graph Execution Flow - A unified interface for task orchestration across different task managers
Project description
MageFlow
Manage Graph Execution Flow - A unified interface for task orchestration across different task managers.
Why MageFlow?
Instead of spreading workflow logic throughout your codebase, MageFlow centralizes task orchestration with a clean, unified API. Switch between task managers (Hatchet, Taskiq, etc.) without rewriting your orchestration code.
Key Features
🔗 Task Chaining - Sequential workflows where tasks depend on previous completions
🐝 Task Swarms - Parallel execution with intelligent coordination
📞 Callback System - Robust success/error handling
🎯 Task Signatures - Flexible task definition with validation
⏯️ Lifecycle Control - Pause, resume, and monitor task execution
💾 Persistent State - Redis-backed state management with recovery
Installation
pip install mageflow[hatchet] # For Hatchet backend
Quick Setup
import asyncio
import redis
from hatchet_sdk import Hatchet, ClientConfig
import mageflow
# Configure backend and Redis
config = ClientConfig(token="your-hatchet-token")
redis_client = redis.asyncio.from_url("redis://localhost", decode_responses=True)
hatchet_client = Hatchet(config=config)
# Create MageFlow instance
mf = mageflow.Mageflow(hatchet_client, redis_client=redis_client)
Example Usage
Define Tasks
from pydantic import BaseModel
class ProcessData(BaseModel):
data: str
@mf.task(name="process-data", input_validator=ProcessData)
async def process_data(msg: ProcessData):
return {"processed": msg.data}
@mf.task(name="send-notification")
async def send_notification(msg):
print(f"Notification sent: {msg}")
return {"status": "sent"}
Chain Tasks
# Sequential execution
workflow = await mageflow.chain([
process_data_task,
send_notification_task
], name="data-pipeline")
Parallel Swarms
# Parallel execution
swarm = await mageflow.swarm([
process_user_task,
update_cache_task,
send_email_task
], task_name="user-onboarding")
Task Signatures with Callbacks
task_signature = await mageflow.sign(
task_name="process-order",
task_identifiers={"order_id": "12345"},
success_callbacks=[send_confirmation_task],
error_callbacks=[handle_error_task]
)
Use Cases
- Data Pipelines - ETL operations with error handling
- Microservice Coordination - Orchestrate distributed service calls
- Batch Processing - Parallel processing of large datasets
- User Workflows - Multi-step onboarding and registration
- Content Processing - Media processing with multiple stages
Documentation
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mageflow-0.2.0.tar.gz.
File metadata
- Download URL: mageflow-0.2.0.tar.gz
- Upload date:
- Size: 24.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
53fbcec7722c1076e7a14ddc06d4250d6ed1f992c813b44bef93f5882f430c9c
|
|
| MD5 |
25e8eef23698b1270696f608c57eda9f
|
|
| BLAKE2b-256 |
d4b7547bbae97b8100a0e21680a6783cde58b4a99e74dab7218f08f54e54e7c3
|
Provenance
The following attestation bundles were made for mageflow-0.2.0.tar.gz:
Publisher:
publish.yml on imaginary-cherry/mageflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mageflow-0.2.0.tar.gz -
Subject digest:
53fbcec7722c1076e7a14ddc06d4250d6ed1f992c813b44bef93f5882f430c9c - Sigstore transparency entry: 946126367
- Sigstore integration time:
-
Permalink:
imaginary-cherry/mageflow@830bf977686e03ad6095735d5f1bc8fcb1f7580d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/imaginary-cherry
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@830bf977686e03ad6095735d5f1bc8fcb1f7580d -
Trigger Event:
workflow_run
-
Statement type:
File details
Details for the file mageflow-0.2.0-py3-none-any.whl.
File metadata
- Download URL: mageflow-0.2.0-py3-none-any.whl
- Upload date:
- Size: 36.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0a323f21ac5179a62730de617b4ac66fccbfa5e533238af133b8419ca343caba
|
|
| MD5 |
e3e3ac537748c946cad938bdf5ae1fc4
|
|
| BLAKE2b-256 |
e475ff1867534c4f408bb899e969d52bfe48ba7f786dd22d85462f01ef5042bf
|
Provenance
The following attestation bundles were made for mageflow-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on imaginary-cherry/mageflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
mageflow-0.2.0-py3-none-any.whl -
Subject digest:
0a323f21ac5179a62730de617b4ac66fccbfa5e533238af133b8419ca343caba - Sigstore transparency entry: 946126379
- Sigstore integration time:
-
Permalink:
imaginary-cherry/mageflow@830bf977686e03ad6095735d5f1bc8fcb1f7580d -
Branch / Tag:
refs/heads/main - Owner: https://github.com/imaginary-cherry
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@830bf977686e03ad6095735d5f1bc8fcb1f7580d -
Trigger Event:
workflow_run
-
Statement type: