Skip to main content

Durable workflow orchestration engine for Python

Project description

Loom - Durable Workflow Orchestration

Loom Logo

Python 3.12+ License: MIT PyPI

A Python-based durable workflow orchestration engine inspired by Temporal and Durable Task Framework. Loom provides event-sourced, deterministic workflow execution with automatic recovery and replay capabilities.

Features

  • Event Sourcing: All workflow state changes persisted as immutable events
  • Deterministic Replay: Workflows reconstruct from event history for recovery
  • Type Safe: Full generic typing support with Workflow[InputT, StateT]
  • Async First: Built on asyncio for high-performance concurrent execution
  • Durable Execution: Workflows survive process crashes and auto-recover
  • Beautiful CLI: Rich console interface with progress tracking
  • Well Tested: Comprehensive test suite with pytest

Quick Start

Installation

pip install loom-core

Or install from source:

git clone https://github.com/satadeep3927/loom.git
cd loom
pip install -e .

Define a Workflow

import asyncio
from typing import TypedDict
import loom


# Define your data types
class OrderInput(TypedDict):
    order_id: str
    customer_email: str


class OrderState(TypedDict):
    payment_confirmed: bool
    email_sent: bool


# Define activities (side effects)
@loom.activity(name="process_payment", retry_count=3, timeout_seconds=30)
async def process_payment(order_id: str) -> bool:
    # Call payment API
    return True


@loom.activity(name="send_email", retry_count=2)
async def send_confirmation_email(email: str, order_id: str) -> None:
    # Send email via service
    pass


# Define workflow
@loom.workflow(name="OrderProcessing", version="1.0.0")
class OrderWorkflow(loom.Workflow[OrderInput, OrderState]):
    
    @loom.step(name="process_payment")
    async def payment_step(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
        success = await ctx.activity(process_payment, ctx.input["order_id"])
        await ctx.state.set("payment_confirmed", success)
        ctx.logger.info(f"Payment processed: {success}")
    
    @loom.step(name="send_confirmation")
    async def notification_step(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
        if ctx.state["payment_confirmed"]:
            await ctx.activity(
                send_confirmation_email,
                ctx.input["customer_email"],
                ctx.input["order_id"]
            )
            await ctx.state.set("email_sent", True)
            ctx.logger.info("Confirmation email sent")

Start a Workflow

The simplest way to start a workflow is using the class method:

import asyncio
import loom


async def main():
    # Start workflow using the class method (recommended)
    handle = await OrderWorkflow.start(
        OrderInput(
            order_id="ORD-12345",
            customer_email="customer@example.com",
        )
    )

    print(f"Workflow started: {handle.workflow_id}")

    # Wait for completion and get result (final state)
    result = await handle.result()
    print(f"Workflow completed with state: {result}")


if __name__ == "__main__":
    asyncio.run(main())

Run the Worker

# Initialize database
loom init

# Start worker with 4 concurrent task processors
loom worker

# Custom configuration
loom worker --workers 8 --poll-interval 1.0

๐ŸŒ Web Dashboard

Start the interactive web dashboard to monitor and manage workflows in real-time:

# Start web server on default port (8000)
loom web

# Custom host and port
loom web --host 0.0.0.0 --port 3000

# Development mode with auto-reload
loom web --reload

Access the dashboard at http://localhost:8000 after starting the server.

The web dashboard provides:

  • ๐Ÿ“Š Real-time workflow monitoring with Server-Sent Events (SSE)
  • ๐Ÿ“ˆ Workflow definition graphs (similar to Airflow DAGs) showing workflow structure
  • ๐Ÿ“‹ Task queue visualization and execution tracking
  • ๐Ÿ“œ Event history with comprehensive audit trails
  • ๐Ÿ“Š Performance metrics and system statistics
  • ๐Ÿ“š Interactive API documentation at /docs

๐ŸŽฏ Complete Example

Here's a complete workflow example demonstrating all features:

import random
from datetime import timedelta
import loom
from loom.core.context import WorkflowContext
from loom.core.workflow import Workflow
from loom.schemas.state import Input, State


class QuizInput(Input):
    lesson_id: str


class QuizState(State):
    quiz_id: str | None
    wait_time: int | None
    submissions: list | None
    result: dict | None


@loom.activity(name="GenerateQuiz")
async def generate_quiz_activity() -> str:
    quiz_id = f"Quiz-{random.randint(1000, 9999)}"
    print(f"Generated Quiz: {quiz_id}")
    return quiz_id


@loom.activity(name="SendQuizToLMS")
async def send_quiz_to_lms_activity(quiz_id: str) -> None:
    print(f"Sent {quiz_id} to LMS")


@loom.activity(name="FetchWaitTime")
async def fetch_wait_time_activity() -> int:
    return 120  # 2 minutes


@loom.activity(name="PullSubmissions")
async def pull_submissions_activity(quiz_id: str) -> list:
    print(f"Pulled submissions for {quiz_id}")
    return ["Submission 1", "Submission 2", "Submission 3"]


@loom.activity(name="AssessResult")
async def assess_result_activity(quiz_id: str) -> dict:
    score = random.randint(50, 100)
    return {"quiz_id": quiz_id, "score": score, "status": "Completed"}


@loom.activity(name="StoreResult")
async def store_result_activity(result: dict) -> None:
    print(f"Stored Result: {result}")


@loom.workflow(
    name="AssessmentWorkflow",
    version="1.0.0",
    description="A workflow for Quiz management."
)
class AssessmentWorkflow(Workflow[QuizInput, QuizState]):

    @loom.step(name="generate_quiz")
    async def generate_quiz(self, ctx: WorkflowContext[QuizInput, QuizState]):
        ctx.logger.info("Generating Quiz...")
        quiz_id = await ctx.activity(generate_quiz_activity)
        await ctx.state.set("quiz_id", quiz_id)

    @loom.step(name="send_to_lms")
    async def send_to_lms(self, ctx: WorkflowContext[QuizInput, QuizState]):
        quiz_id = ctx.state.get("quiz_id")
        ctx.logger.info(f"Sending Quiz {quiz_id} to LMS...")
        await ctx.activity(send_quiz_to_lms_activity, quiz_id)

    @loom.step(name="fetch_wait_time")
    async def fetch_wait_time(self, ctx: WorkflowContext[QuizInput, QuizState]):
        ctx.logger.info("Fetching wait time...")
        wait_time = await ctx.activity(fetch_wait_time_activity)
        await ctx.state.set("wait_time", wait_time)

    @loom.step(name="wait_step")
    async def wait_step(self, ctx: WorkflowContext[QuizInput, QuizState]):
        wait_time = ctx.state.get("wait_time")
        ctx.logger.info(f"Waiting for {wait_time} seconds...")
        await ctx.sleep(delta=timedelta(seconds=wait_time))

    @loom.step(name="pull_submissions")
    async def pull_submissions(self, ctx: WorkflowContext[QuizInput, QuizState]):
        quiz_id = ctx.state.get("quiz_id")
        submissions = await ctx.activity(pull_submissions_activity, quiz_id)
        await ctx.state.set("submissions", submissions)

    @loom.step(name="assess_result")
    async def assess_result(self, ctx: WorkflowContext[QuizInput, QuizState]):
        quiz_id = ctx.state.get("quiz_id")
        result = await ctx.activity(assess_result_activity, quiz_id)
        await ctx.state.set("result", result)

    @loom.step(name="store_result")
    async def store_result(self, ctx: WorkflowContext[QuizInput, QuizState]):
        result = ctx.state.get("result")
        await ctx.activity(store_result_activity, result)
        ctx.logger.info("Workflow completed!")


# Start the workflow
async def main():
    handle = await AssessmentWorkflow.start({"lesson_id": "lesson_123"})
    result = await handle.status()
    print(f"Workflow Status: {result}")


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

This example demonstrates:

  • Multiple steps with sequential execution
  • Activity calls for side effects
  • State management across workflow execution
  • Timer/sleep operations for waiting
  • Logging with workflow context
  • Type safety with generic workflow types

๐Ÿ“š Core Concepts

State Management

Loom provides three ways to manage workflow state, all of which are durable and replay-safe:

1. Single Key Updates (set)

Use ctx.state.set() for individual state changes. Each call emits a STATE_SET event:

@loom.step()
async def process_order(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
    # Set individual keys
    await ctx.state.set("order_id", "ORD-123")
    await ctx.state.set("status", "processing")
    await ctx.state.set("timestamp", "2024-01-15T10:30:00")
    
    # Read state
    order_id = ctx.state["order_id"]  # Dictionary access
    status = ctx.state.get("status")  # Safe get
    items = ctx.state.get("items", [])  # With default

2. Batch Updates (update)

Use ctx.state.update() to replace the entire state atomically. Emits a single STATE_UPDATE event:

@loom.step()
async def update_order_state(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
    # Update entire state with a function that receives current state
    await ctx.state.update(lambda state: {
        **state,  # Preserve existing keys
        "order_id": "ORD-123",
        "status": "shipped",
        "shipped_at": "2024-01-15T14:00:00"
    })

Important: The update function receives the current state and must return the complete new state.

3. Batch Context Manager (batch)

Use async with ctx.state.batch() to collect multiple set() calls into a single STATE_UPDATE event:

@loom.step()
async def batch_update(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
    # Multiple updates batched into single STATE_UPDATE event
    async with ctx.state.batch():
        await ctx.state.set("order_id", "ORD-123")
        await ctx.state.set("status", "processing")
        await ctx.state.set("items", ["item1", "item2"])
        await ctx.state.set("total", 99.99)
    # Single STATE_UPDATE event emitted when context exits

When to use each:

  • set(): Simple, single updates
  • update(): Replace entire state based on current values
  • batch(): Multiple related updates that should be atomic

Workflow Handles

Workflow handles provide control and monitoring of running workflows:

# Start workflow and get handle
handle = await OrderWorkflow.start({"order_id": "ORD-123"})

# Get workflow ID
print(f"Workflow ID: {handle.workflow_id}")

# Check status (returns "RUNNING", "COMPLETED", "FAILED", etc.)
status = await handle.status()
print(f"Status: {status}")

# Wait for completion and get final state
try:
    result = await handle.result()
    print(f"Completed with state: {result}")
except loom.WorkflowExecutionError as e:
    print(f"Workflow failed: {e}")
except loom.ActivityFailedError as e:
    print(f"Activity '{e.activity_name}' failed: {e.error_message}")

# Send signals to running workflow
await handle.signal("approve", {"approved_by": "admin", "timestamp": "2024-01-15"})

# Cancel workflow
await handle.cancel(reason="User requested cancellation")

Exception Handling

โš ๏ธ CRITICAL: Never catch StopReplay in your workflow code!

StopReplay is an internal control flow exception used by Loom to pause workflow replay when waiting for activities, timers, or signals. Catching it will break workflow execution and recovery.

# โŒ WRONG - This will break workflow execution!
@loom.step()
async def bad_step(self, ctx):
    try:
        await ctx.activity(my_activity)
    except Exception:  # This catches StopReplay!
        ctx.logger.error("Error occurred")  # Workflow breaks here
        pass

# โŒ ALSO WRONG
@loom.step()
async def another_bad_step(self, ctx):
    try:
        await ctx.activity(my_activity)
    except:  # Never use bare except
        pass

# โœ… CORRECT - Catch specific exceptions only
@loom.step()
async def good_step(self, ctx):
    try:
        result = await ctx.activity(my_activity)
        await ctx.state.set("result", result)
    except loom.ActivityFailedError as e:
        # Handle activity failure
        ctx.logger.error(f"Activity failed: {e}")
        await ctx.state.set("error", str(e))
        # Workflow can continue or raise to fail

Available Exceptions:

Use these in your application code (not inside workflow steps):

import loom

# Workflow execution
try:
    handle = await MyWorkflow.start(input_data, state)
    result = await handle.result()
except loom.WorkflowNotFoundError:
    print("Workflow doesn't exist")
except loom.WorkflowStillRunningError:
    print("Workflow hasn't completed yet")
except loom.WorkflowExecutionError:
    print("Workflow failed during execution")
except loom.ActivityFailedError as e:
    print(f"Activity '{e.activity_name}' failed: {e.error_message}")
except loom.NonDeterministicWorkflowError:
    print("Workflow code changed in incompatible way")

Why is StopReplay special?

StopReplay is raised internally when the workflow execution reaches a point where it needs to wait for external events:

  • An activity that hasn't completed yet
  • A timer that hasn't fired
  • A signal that hasn't been received

The engine catches this exception to save progress and pause execution. If your code catches it, the engine never receives it, and the workflow cannot properly pause and resume.

Best Practices

โœ… Do:

  • Use activities for side effects: All API calls, database writes, file I/O, etc.

    @loom.activity(name="send_email", retry_count=3)
    async def send_email(to: str, subject: str) -> bool:
        # API call, retry on failure
        await email_service.send(to, subject)
        return True
    
  • Make activities idempotent: Safe to retry multiple times

    @loom.activity(name="create_order")
    async def create_order(order_id: str) -> dict:
        # Check if order exists first (idempotent)
        existing = await db.get_order(order_id)
        if existing:
            return existing
        return await db.create_order(order_id)
    
  • Use batch for related updates: More efficient, single event

    async with ctx.state.batch():
        await ctx.state.set("step", 3)
        await ctx.state.set("progress", 75)
        await ctx.state.set("updated_at", timestamp)
    
  • Use type hints: Better IDE support and type checking

    class MyWorkflow(loom.Workflow[MyInput, MyState]):
        @loom.step()
        async def my_step(self, ctx: loom.WorkflowContext[MyInput, MyState]):
            # ctx.input is MyInput, ctx.state is MyState
            pass
    
  • Log with ctx.logger: Respects replay mode, won't duplicate logs

    ctx.logger.info("Processing order")  # Only logs during actual execution
    ctx.logger.error("Failed to process")  # Not during replay
    
  • Catch specific exceptions: Only catch what you can handle

    try:
        await ctx.activity(risky_activity)
    except loom.ActivityFailedError:
        # Handle specific failure
        pass
    

โŒ Don't:

  • Don't use random in workflows: Breaks determinism

    # โŒ WRONG
    @loom.step()
    async def bad_step(self, ctx):
        value = random.randint(1, 100)  # Different on replay!
        await ctx.state.set("value", value)
    
    # โœ… CORRECT - Use activity
    @loom.activity(name="generate_random")
    async def generate_random() -> int:
        return random.randint(1, 100)
    
    @loom.step()
    async def good_step(self, ctx):
        value = await ctx.activity(generate_random)
        await ctx.state.set("value", value)
    
  • Don't use datetime.now() in workflows: Non-deterministic

    # โŒ WRONG
    @loom.step()
    async def bad_step(self, ctx):
        now = datetime.now()  # Different on replay!
    
    # โœ… CORRECT - Use activity
    @loom.activity(name="get_timestamp")
    async def get_timestamp() -> str:
        return datetime.now().isoformat()
    
  • Don't perform I/O in workflows: Use activities instead

    # โŒ WRONG
    @loom.step()
    async def bad_step(self, ctx):
        data = await http_client.get("https://api.example.com")  # Don't!
    
    # โœ… CORRECT
    @loom.activity(name="fetch_data")
    async def fetch_data() -> dict:
        return await http_client.get("https://api.example.com")
    
  • Don't catch Exception or bare except: Catches StopReplay

    # โŒ WRONG
    try:
        await ctx.activity(my_activity)
    except Exception:  # Catches everything including StopReplay!
        pass
    
    # โœ… CORRECT
    try:
        await ctx.activity(my_activity)
    except loom.ActivityFailedError:  # Specific exception only
        pass
    
  • Don't modify state without ctx.state: Won't be persisted

    # โŒ WRONG
    @loom.step()
    async def bad_step(self, ctx):
        my_state = {"value": 123}
        # State not persisted!
    
    # โœ… CORRECT
    @loom.step()
    async def good_step(self, ctx):
        await ctx.state.set("value", 123)  # Persisted
    

AI-Native Progress Tracking

Loom supports real-time progress tracking with agent attribution, making it easy to show users what AI agents are working on across multiple simultaneous workflows.

Defining Agent Personas

Use the agent parameter on @workflow to give workflows a named persona:

@loom.workflow(name="QuizAssessment", agent="Robin")
class QuizWorkflow(loom.Workflow[QuizInput, QuizState]):
    @loom.step(name="generate_quiz", description="generating your quiz")
    async def generate_quiz(self, ctx):
        ...

    @loom.step(name="grade_quiz", description="grading your answers")
    async def grade_quiz(self, ctx):
        ...

Starting Workflows with an Owner

Pass an owner when starting a workflow to associate it with a specific user:

handle = await QuizWorkflow.start(
    {"lesson_id": "math_101"},
    owner="user_abc"
)

Querying Progress

Get structured progress for all of a user's active workflows in a single call:

progress = await loom.get_progress(owner="user_abc")

for wf in progress["workflows"]:
    print(f"{wf['agent']}: {wf['message']}")
    # Output: "Robin: Robin is generating your quiz"
    #          step 1 of 2, status: RUNNING

Response structure:

{
    "owner": "user_abc",
    "workflows": [
        {
            "workflow_id": "abc123",
            "workflow_name": "QuizAssessment",
            "agent": "Robin",
            "message": "Robin is generating your quiz",
            "current_step": 1,
            "total_steps": 2,
            "status": "RUNNING"
        }
    ]
}

Live Progress Streaming

Use stream_progress to get real-time updates via an async generator:

async for update in loom.stream_progress(owner="user_abc"):
    for wf in update:
        print(f"{wf['agent']}: {wf['message']}")

This yields a new list whenever any workflow's progress changes.

SSE Endpoint (Web Dashboard)

The web dashboard provides a Server-Sent Events endpoint for browser-based progress:

GET /api/progress/stream?owner=user_abc
const es = new EventSource('/api/progress/stream?owner=user_abc');
es.onmessage = (e) => {
    const progress = JSON.parse(e.data);
    progress.forEach(wf => console.log(wf.message));
};

A REST snapshot is also available:

GET /api/progress/user_abc

How It Works

  • agent is set at workflow definition time via @workflow(agent="...")
  • owner is set at start time via Workflow.start(input, owner="...")
  • Progress is derived from existing STEP_START events โ€” no additional tables
  • Step descriptions come from @step(description="...")
  • Only RUNNING and PENDING workflows are included in progress results

CLI Commands

# Initialize database
loom init

# Start distributed worker
loom worker [--workers 4] [--poll-interval 0.5]

# List workflows
loom list [--limit 50] [--status RUNNING]

# Inspect workflow details
loom inspect <workflow-id> [--events]

# Show database statistics
loom stats

๐Ÿ—๏ธ Architecture

Core Components

Architecture

Event Types

  • WORKFLOW_STARTED - Workflow initialization
  • WORKFLOW_COMPLETED - Successful completion
  • WORKFLOW_FAILED - Fatal error occurred
  • STATE_SET - Single state key updated
  • STATE_UPDATE - Batch state update
  • ACTIVITY_SCHEDULED - Activity queued for execution
  • ACTIVITY_COMPLETED - Activity finished successfully
  • ACTIVITY_FAILED - Activity permanently failed
  • TIMER_FIRED - Sleep/delay completed
  • SIGNAL_RECEIVED - External signal received

Project Structure

loom/
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ common/         # Shared utilities
โ”‚   โ”œโ”€โ”€ core/           # Core engine (context, engine, runner, worker)
โ”‚   โ”œโ”€โ”€ database/       # Database layer
โ”‚   โ”œโ”€โ”€ decorators/     # @workflow, @step, @activity
โ”‚   โ”œโ”€โ”€ lib/            # Utilities and progress tracking
โ”‚   โ”œโ”€โ”€ migrations/     # Database migrations
โ”‚   โ””โ”€โ”€ schemas/        # Type definitions
โ”œโ”€โ”€ tests/              # Test suite
โ”œโ”€โ”€ examples/           # Example workflows
โ”œโ”€โ”€ loom.py             # Main package interface
โ””โ”€โ”€ pyproject.toml      # Package configuration

Configuration

Loom uses SQLite by default for simplicity. For production:

  • Consider PostgreSQL/MySQL for scalability
  • Implement connection pooling
  • Add monitoring and alerting
  • Deploy multiple workers for high availability

Contributing

Contributions welcome! Please ensure:

  1. Tests pass: pytest
  2. Code formatted: black .
  3. Type checking: mypy .
  4. Linting: ruff check .

๐Ÿ“ License

MIT License - see LICENSE file for details

๐Ÿ™ Acknowledgments

Inspired by:

๐Ÿ“ง Contact

For questions and support, please open an issue on GitHub.


Built with โค๏ธ using Python 3.12+

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

loom_core-1.28.1.tar.gz (381.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

loom_core-1.28.1-py3-none-any.whl (398.9 kB view details)

Uploaded Python 3

File details

Details for the file loom_core-1.28.1.tar.gz.

File metadata

  • Download URL: loom_core-1.28.1.tar.gz
  • Upload date:
  • Size: 381.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for loom_core-1.28.1.tar.gz
Algorithm Hash digest
SHA256 b35ac4d3204473f317e65d3e211616b157334191fdc1470020201a1527137a3b
MD5 8469fed11a8d67d6e1cd1ded2ffc7ad2
BLAKE2b-256 60887b729b96c889765cbcc91a0d57c9dc89aba629756c76be83a4aace435748

See more details on using hashes here.

File details

Details for the file loom_core-1.28.1-py3-none-any.whl.

File metadata

  • Download URL: loom_core-1.28.1-py3-none-any.whl
  • Upload date:
  • Size: 398.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for loom_core-1.28.1-py3-none-any.whl
Algorithm Hash digest
SHA256 008d49368145e559b968c6d6dba39b25b2d23518a6cde788abbe5b41cd96c3ee
MD5 13e2e0dca48ac00ecb3aa8b0d87d76de
BLAKE2b-256 6a28c5900dae8f4aa994a658f8fd907b48aeabb2eacf5cf1e3bd6818c707fcc5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page