A lightweight event-driven workflow system for building AI agents and processing pipelines
Project description
Microflow
A lightweight event-driven workflow system for building AI agents and processing pipelines in just 60 lines of Python.
Installation
pip install microflow
Quick Start
import asyncio
from microflow import WorkflowManager, WorkflowEvent
# Create a workflow manager
workflow = WorkflowManager()
# Define a handler
async def greeting_handler(event, ctx):
name = event.data.get("name", "World")
yield WorkflowEvent.progress("greeting", f"Processing greeting for {name}")
yield WorkflowEvent(name="completed", data={"message": f"Hello, {name}!"})
# Register the handler
workflow.register("greet", greeting_handler)
# Run the workflow
async def main():
initial_event = WorkflowEvent(name="greet", data={"name": "Alice"})
async for event in workflow.process(initial_event):
if event.name == "progress":
print(f"Progress: {event.data['step']} - {event.data['description']}")
elif event.name == "completed":
print(f"Result: {event.data['message']}")
if __name__ == "__main__":
asyncio.run(main())
Features
- Event-Driven Architecture: Chain operations through events
- Progress Reporting: Built-in support for progress updates
- Error Handling: Graceful error propagation
- Flexible Workflows: Easily modify and extend workflows
- Minimal Dependencies: No external dependencies required
Core Components
The entire framework is just 60 lines of Python code, consisting of two main components:
WorkflowEvent
The fundamental data structure that flows through the system:
@dataclass
class WorkflowEvent:
name: str # Event type identifier
data: Dict[str, Any] = None # Event payload
metadata: Dict[str, Any] = None # Context information
error: Optional[str] = None # Error information if applicable
Special events include:
progressevents: For reporting status updateserrorevents: For handling failures
WorkflowManager
Orchestrates the flow of events through registered handlers:
class WorkflowManager:
def register(self, event_name: str, handler: EventHandler) -> Self
async def process(self, initial_event: WorkflowEvent) -> AsyncGenerator[WorkflowEvent, None]
How It Works
Registering Handlers
Handlers are registered for specific event types:
workflow = WorkflowManager()
# Register a handler for the "start" event
workflow.register("start", start_handler)
# Register multiple handlers for the same event (they'll run in sequence)
workflow.register("process_data", validation_handler)
workflow.register("process_data", transformation_handler)
# Chain registration is supported
workflow.register("event1", handler1).register("event2", handler2)
Creating Event Handlers
An event handler is an async function that takes a WorkflowEvent and yields one or more events:
async def search_handler(event: WorkflowEvent, ctx: Dict[str, Any]) -> AsyncGenerator[WorkflowEvent, None]:
# Report progress
yield WorkflowEvent.progress("search", "Searching for information...")
# Perform work
search_results = await perform_search(event.data["query"])
# Return results with a new event type
yield WorkflowEvent(
name="search_completed",
data={"search_results": search_results},
metadata=event.metadata
)
Chaining Handlers
Handlers chain together by yielding events that trigger other handlers:
- Handler A processes an event and yields a new event with name="process_data"
- WorkflowManager sees this event and finds handlers registered for "process_data"
- Those handlers run in sequence, potentially yielding more events
Progress Reporting
Special handling for progress events allows for status updates without breaking the chain:
async def complex_handler(event: WorkflowEvent, ctx: Dict[str, Any]):
# Report progress without changing workflow direction
yield WorkflowEvent.progress("step1", "Starting processing...")
# Do some work...
yield WorkflowEvent.progress("step2", "Halfway done...")
# Continue the workflow with a new event
yield WorkflowEvent(name="next_step", data={"result": "success"})
Error Handling
Errors can be propagated through the workflow:
async def risky_handler(event: WorkflowEvent, ctx: Dict[str, Any]):
try:
result = await risky_operation()
yield WorkflowEvent(name="success", data={"result": result})
except Exception as e:
yield WorkflowEvent(name="error", error=str(e), metadata=event.metadata)
Example Workflow
For more complex examples, see the examples directory which includes:
- A simple agent workflow that processes queries, searches for information, and generates responses
- A weather assistant that analyzes queries, fetches weather data, and generates streaming responses
Benefits
- Decoupling: Components communicate through events without direct dependencies
- Extensibility: Easy to add new handlers or modify workflow without changing existing code
- Observability: Progress events provide visibility into the workflow state
- Error Handling: Centralized error management through error events
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file microflow-0.1.0.tar.gz.
File metadata
- Download URL: microflow-0.1.0.tar.gz
- Upload date:
- Size: 9.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4dbb077e7ee714b2d71b960e260a99cf2e99be245021655d0ab5dd56285fa233
|
|
| MD5 |
460472fa0ef8149b9278e0f269ed233e
|
|
| BLAKE2b-256 |
ed04ac238eb72d2a77fbe4c0de3a3577ad430a89098c264515f7dd7e3251ce87
|
File details
Details for the file microflow-0.1.0-py3-none-any.whl.
File metadata
- Download URL: microflow-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.5.24
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ca3710e65a8973c68fd4aa8c82c31a15ce3de40b1d512b1e3098e1a72dfb3628
|
|
| MD5 |
b1cc2bd011e83b49154f8f988dd1bffd
|
|
| BLAKE2b-256 |
da4a7da57f31d0008c8ed515ba655c7c7067e0e130633cf6530a4fed2e4c290c
|