Skip to main content

A lightweight event-driven workflow system for building AI agents and processing pipelines

Project description

Microflow

A lightweight event-driven workflow system for building AI agents and processing pipelines in just 60 lines of Python.

Installation

pip install microflow

Quick Start

import asyncio
from microflow import WorkflowManager, WorkflowEvent

# Create a workflow manager
workflow = WorkflowManager()

# Define a handler
async def greeting_handler(event, ctx):
    name = event.data.get("name", "World")
    yield WorkflowEvent.progress("greeting", f"Processing greeting for {name}")
    yield WorkflowEvent(name="completed", data={"message": f"Hello, {name}!"})

# Register the handler
workflow.register("greet", greeting_handler)

# Run the workflow
async def main():
    initial_event = WorkflowEvent(name="greet", data={"name": "Alice"})
    
    async for event in workflow.process(initial_event):
        if event.name == "progress":
            print(f"Progress: {event.data['step']} - {event.data['description']}")
        elif event.name == "completed":
            print(f"Result: {event.data['message']}")

if __name__ == "__main__":
    asyncio.run(main())

Features

  • Event-Driven Architecture: Chain operations through events
  • Progress Reporting: Built-in support for progress updates
  • Error Handling: Graceful error propagation
  • Flexible Workflows: Easily modify and extend workflows
  • Minimal Dependencies: No external dependencies required

Core Components

The entire framework is just 60 lines of Python code, consisting of two main components:

WorkflowEvent

The fundamental data structure that flows through the system:

@dataclass
class WorkflowEvent:
    name: str                      # Event type identifier
    data: Dict[str, Any] = None    # Event payload
    metadata: Dict[str, Any] = None  # Context information
    error: Optional[str] = None    # Error information if applicable

Special events include:

  • progress events: For reporting status updates
  • error events: For handling failures

WorkflowManager

Orchestrates the flow of events through registered handlers:

class WorkflowManager:
    def register(self, event_name: str, handler: EventHandler) -> Self
    async def process(self, initial_event: WorkflowEvent) -> AsyncGenerator[WorkflowEvent, None]

How It Works

Registering Handlers

Handlers are registered for specific event types:

workflow = WorkflowManager()

# Register a handler for the "start" event
workflow.register("start", start_handler)

# Register multiple handlers for the same event (they'll run in sequence)
workflow.register("process_data", validation_handler)
workflow.register("process_data", transformation_handler)

# Chain registration is supported
workflow.register("event1", handler1).register("event2", handler2)

Creating Event Handlers

An event handler is an async function that takes a WorkflowEvent and yields one or more events:

async def search_handler(event: WorkflowEvent, ctx: Dict[str, Any]) -> AsyncGenerator[WorkflowEvent, None]:
    # Report progress
    yield WorkflowEvent.progress("search", "Searching for information...")
    
    # Perform work
    search_results = await perform_search(event.data["query"])
    
    # Return results with a new event type
    yield WorkflowEvent(
        name="search_completed",
        data={"search_results": search_results},
        metadata=event.metadata
    )

Chaining Handlers

Handlers chain together by yielding events that trigger other handlers:

  1. Handler A processes an event and yields a new event with name="process_data"
  2. WorkflowManager sees this event and finds handlers registered for "process_data"
  3. Those handlers run in sequence, potentially yielding more events

Progress Reporting

Special handling for progress events allows for status updates without breaking the chain:

async def complex_handler(event: WorkflowEvent, ctx: Dict[str, Any]):
    # Report progress without changing workflow direction
    yield WorkflowEvent.progress("step1", "Starting processing...")
    
    # Do some work...
    
    yield WorkflowEvent.progress("step2", "Halfway done...")
    
    # Continue the workflow with a new event
    yield WorkflowEvent(name="next_step", data={"result": "success"})

Error Handling

Errors can be propagated through the workflow:

async def risky_handler(event: WorkflowEvent, ctx: Dict[str, Any]):
    try:
        result = await risky_operation()
        yield WorkflowEvent(name="success", data={"result": result})
    except Exception as e:
        yield WorkflowEvent(name="error", error=str(e), metadata=event.metadata)

Example Workflow

For more complex examples, see the examples directory which includes:

  1. A simple agent workflow that processes queries, searches for information, and generates responses
  2. A weather assistant that analyzes queries, fetches weather data, and generates streaming responses

Benefits

  • Decoupling: Components communicate through events without direct dependencies
  • Extensibility: Easy to add new handlers or modify workflow without changing existing code
  • Observability: Progress events provide visibility into the workflow state
  • Error Handling: Centralized error management through error events

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

microflow-0.1.0.tar.gz (9.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

microflow-0.1.0-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file microflow-0.1.0.tar.gz.

File metadata

  • Download URL: microflow-0.1.0.tar.gz
  • Upload date:
  • Size: 9.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.24

File hashes

Hashes for microflow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4dbb077e7ee714b2d71b960e260a99cf2e99be245021655d0ab5dd56285fa233
MD5 460472fa0ef8149b9278e0f269ed233e
BLAKE2b-256 ed04ac238eb72d2a77fbe4c0de3a3577ad430a89098c264515f7dd7e3251ce87

See more details on using hashes here.

File details

Details for the file microflow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: microflow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.24

File hashes

Hashes for microflow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ca3710e65a8973c68fd4aa8c82c31a15ce3de40b1d512b1e3098e1a72dfb3628
MD5 b1cc2bd011e83b49154f8f988dd1bffd
BLAKE2b-256 da4a7da57f31d0008c8ed515ba655c7c7067e0e130633cf6530a4fed2e4c290c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page