Skip to main content

Durable workflow orchestration engine for Python

Project description

Loom - Durable Workflow Orchestration

Loom Logo

Python 3.12+ License: MIT PyPI

A Python-based durable workflow orchestration engine inspired by Temporal and Durable Task Framework. Loom provides event-sourced, deterministic workflow execution with automatic recovery and replay capabilities.

Features

  • Event Sourcing: All workflow state changes persisted as immutable events
  • Deterministic Replay: Workflows reconstruct from event history for recovery
  • Type Safe: Full generic typing support with Workflow[InputT, StateT]
  • Async First: Built on asyncio for high-performance concurrent execution
  • Durable Execution: Workflows survive process crashes and auto-recover
  • Beautiful CLI: Rich console interface with progress tracking
  • Well Tested: Comprehensive test suite with pytest

Quick Start

Installation

pip install loom-core

Or install from source:

git clone https://github.com/satadeep3927/loom.git
cd loom
pip install -e .

Define a Workflow

import asyncio
from typing import TypedDict
import loom


# Define your data types
class OrderInput(TypedDict):
    order_id: str
    customer_email: str


class OrderState(TypedDict):
    payment_confirmed: bool
    email_sent: bool


# Define activities (side effects)
@loom.activity(name="process_payment", retry_count=3, timeout_seconds=30)
async def process_payment(order_id: str) -> bool:
    # Call payment API
    return True


@loom.activity(name="send_email", retry_count=2)
async def send_confirmation_email(email: str, order_id: str) -> None:
    # Send email via service
    pass


# Define workflow
@loom.workflow(name="OrderProcessing", version="1.0.0")
class OrderWorkflow(loom.Workflow[OrderInput, OrderState]):
    
    @loom.step(name="process_payment")
    async def payment_step(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
        success = await ctx.activity(process_payment, ctx.input["order_id"])
        await ctx.state.set("payment_confirmed", success)
        ctx.logger.info(f"Payment processed: {success}")
    
    @loom.step(name="send_confirmation")
    async def notification_step(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
        if ctx.state["payment_confirmed"]:
            await ctx.activity(
                send_confirmation_email,
                ctx.input["customer_email"],
                ctx.input["order_id"]
            )
            await ctx.state.set("email_sent", True)
            ctx.logger.info("Confirmation email sent")

Note: For state updates, use:

  • await ctx.state.set("key", value) for single values
  • await ctx.state.update(key=lambda _: asyncio.sleep(0, value)) for batch updates (requires awaitable)

See STATE_MANAGEMENT.md for detailed examples.

Start a Workflow

async def main():
    db = loom.Database()
    async with db:
        # Initialize database
        await db.migrate_up()
        
        # Start workflow
        handle = await db.start_workflow(
            OrderWorkflow,
            workflow_input=OrderInput(
                order_id="ORD-12345",
                customer_email="customer@example.com"
            ),
            initial_state=OrderState(
                payment_confirmed=False,
                email_sent=False
            ),
        )
        
        print(f"Workflow started: {handle.workflow_id}")
        
        # Execute workflow tasks
        while True:
            task_executed = await loom.run_once()
            if not task_executed:
                break


if __name__ == "__main__":
    asyncio.run(main())

Run the Worker

# Initialize database
loom init

# Start worker with 4 concurrent task processors
loom worker

# Custom configuration
loom worker --workers 8 --poll-interval 1.0

๐ŸŒ Web Dashboard

Start the interactive web dashboard to monitor and manage workflows in real-time:

# Start web server on default port (8000)
loom web

# Custom host and port
loom web --host 0.0.0.0 --port 3000

# Development mode with auto-reload
loom web --reload

Access the dashboard at http://localhost:8000 after starting the server.

The web dashboard provides:

  • ๐Ÿ“Š Real-time workflow monitoring with Server-Sent Events (SSE)
  • ๐Ÿ“ˆ Workflow definition graphs (similar to Airflow DAGs) showing workflow structure
  • ๐Ÿ“‹ Task queue visualization and execution tracking
  • ๐Ÿ“œ Event history with comprehensive audit trails
  • ๐Ÿ“Š Performance metrics and system statistics
  • ๐Ÿ“š Interactive API documentation at /docs

๐ŸŽฏ Complete Example

Here's a complete workflow example demonstrating all features:

import random
from datetime import timedelta
import loom
from loom.core.context import WorkflowContext
from loom.core.workflow import Workflow
from loom.schemas.state import Input, State


class QuizInput(Input):
    lesson_id: str


class QuizState(State):
    quiz_id: str | None
    wait_time: int | None
    submissions: list | None
    result: dict | None


@loom.activity(name="GenerateQuiz")
async def generate_quiz_activity() -> str:
    quiz_id = f"Quiz-{random.randint(1000, 9999)}"
    print(f"Generated Quiz: {quiz_id}")
    return quiz_id


@loom.activity(name="SendQuizToLMS")
async def send_quiz_to_lms_activity(quiz_id: str) -> None:
    print(f"Sent {quiz_id} to LMS")


@loom.activity(name="FetchWaitTime")
async def fetch_wait_time_activity() -> int:
    return 120  # 2 minutes


@loom.activity(name="PullSubmissions")
async def pull_submissions_activity(quiz_id: str) -> list:
    print(f"Pulled submissions for {quiz_id}")
    return ["Submission 1", "Submission 2", "Submission 3"]


@loom.activity(name="AssessResult")
async def assess_result_activity(quiz_id: str) -> dict:
    score = random.randint(50, 100)
    return {"quiz_id": quiz_id, "score": score, "status": "Completed"}


@loom.activity(name="StoreResult")
async def store_result_activity(result: dict) -> None:
    print(f"Stored Result: {result}")


@loom.workflow(
    name="AssessmentWorkflow",
    version="1.0.0",
    description="A workflow for Quiz management."
)
class AssessmentWorkflow(Workflow[QuizInput, QuizState]):

    @loom.step(name="generate_quiz")
    async def generate_quiz(self, ctx: WorkflowContext[QuizInput, QuizState]):
        ctx.logger.info("Generating Quiz...")
        quiz_id = await ctx.activity(generate_quiz_activity)
        await ctx.state.set("quiz_id", quiz_id)

    @loom.step(name="send_to_lms")
    async def send_to_lms(self, ctx: WorkflowContext[QuizInput, QuizState]):
        quiz_id = ctx.state.get("quiz_id")
        ctx.logger.info(f"Sending Quiz {quiz_id} to LMS...")
        await ctx.activity(send_quiz_to_lms_activity, quiz_id)

    @loom.step(name="fetch_wait_time")
    async def fetch_wait_time(self, ctx: WorkflowContext[QuizInput, QuizState]):
        ctx.logger.info("Fetching wait time...")
        wait_time = await ctx.activity(fetch_wait_time_activity)
        await ctx.state.set("wait_time", wait_time)

    @loom.step(name="wait_step")
    async def wait_step(self, ctx: WorkflowContext[QuizInput, QuizState]):
        wait_time = ctx.state.get("wait_time")
        ctx.logger.info(f"Waiting for {wait_time} seconds...")
        await ctx.sleep(delta=timedelta(seconds=wait_time))

    @loom.step(name="pull_submissions")
    async def pull_submissions(self, ctx: WorkflowContext[QuizInput, QuizState]):
        quiz_id = ctx.state.get("quiz_id")
        submissions = await ctx.activity(pull_submissions_activity, quiz_id)
        await ctx.state.set("submissions", submissions)

    @loom.step(name="assess_result")
    async def assess_result(self, ctx: WorkflowContext[QuizInput, QuizState]):
        quiz_id = ctx.state.get("quiz_id")
        result = await ctx.activity(assess_result_activity, quiz_id)
        await ctx.state.set("result", result)

    @loom.step(name="store_result")
    async def store_result(self, ctx: WorkflowContext[QuizInput, QuizState]):
        result = ctx.state.get("result")
        await ctx.activity(store_result_activity, result)
        ctx.logger.info("Workflow completed!")


# Start the workflow
async def main():
    handle = await AssessmentWorkflow.start({"lesson_id": "lesson_123"})
    result = await handle.status()
    print(f"Workflow Status: {result}")


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

This example demonstrates:

  • Multiple steps with sequential execution
  • Activity calls for side effects
  • State management across workflow execution
  • Timer/sleep operations for waiting
  • Logging with workflow context
  • Type safety with generic workflow types

CLI Commands

# Initialize database
loom init

# Start distributed worker
loom worker [--workers 4] [--poll-interval 0.5]

# List workflows
loom list [--limit 50] [--status RUNNING]

# Inspect workflow details
loom inspect <workflow-id> [--events]

# Show database statistics
loom stats

๐Ÿ—๏ธ Architecture

Core Components

Architecture

Event Types

  • WORKFLOW_STARTED - Workflow initialization
  • WORKFLOW_COMPLETED - Successful completion
  • WORKFLOW_FAILED - Fatal error occurred
  • STATE_SET - Single state key updated
  • STATE_UPDATE - Batch state update
  • ACTIVITY_SCHEDULED - Activity queued for execution
  • ACTIVITY_COMPLETED - Activity finished successfully
  • ACTIVITY_FAILED - Activity permanently failed
  • TIMER_FIRED - Sleep/delay completed
  • SIGNAL_RECEIVED - External signal received

Project Structure

loom/
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ common/         # Shared utilities
โ”‚   โ”œโ”€โ”€ core/           # Core engine (context, engine, runner, worker)
โ”‚   โ”œโ”€โ”€ database/       # Database layer
โ”‚   โ”œโ”€โ”€ decorators/     # @workflow, @step, @activity
โ”‚   โ”œโ”€โ”€ lib/            # Utilities and progress tracking
โ”‚   โ”œโ”€โ”€ migrations/     # Database migrations
โ”‚   โ””โ”€โ”€ schemas/        # Type definitions
โ”œโ”€โ”€ tests/              # Test suite
โ”œโ”€โ”€ examples/           # Example workflows
โ”œโ”€โ”€ loom.py             # Main package interface
โ””โ”€โ”€ pyproject.toml      # Package configuration

Configuration

Loom uses SQLite by default for simplicity. For production:

  • Consider PostgreSQL/MySQL for scalability
  • Implement connection pooling
  • Add monitoring and alerting
  • Deploy multiple workers for high availability

Contributing

Contributions welcome! Please ensure:

  1. Tests pass: pytest
  2. Code formatted: black .
  3. Type checking: mypy .
  4. Linting: ruff check .

๐Ÿ“ License

MIT License - see LICENSE file for details

๐Ÿ™ Acknowledgments

Inspired by:

๐Ÿ“ง Contact

For questions and support, please open an issue on GitHub.


Built with โค๏ธ using Python 3.12+

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

loom_core-1.11.0.tar.gz (364.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

loom_core-1.11.0-py3-none-any.whl (375.5 kB view details)

Uploaded Python 3

File details

Details for the file loom_core-1.11.0.tar.gz.

File metadata

  • Download URL: loom_core-1.11.0.tar.gz
  • Upload date:
  • Size: 364.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for loom_core-1.11.0.tar.gz
Algorithm Hash digest
SHA256 9d6df93a275818be2e7063568a3b03e4b044d189db9118cab551bf7e88174b8f
MD5 26340727e0231a61e01da3667251169a
BLAKE2b-256 78064fae2a39226d74dc2618c06317f5f555b4072733ef452c97cce43fe160c7

See more details on using hashes here.

File details

Details for the file loom_core-1.11.0-py3-none-any.whl.

File metadata

  • Download URL: loom_core-1.11.0-py3-none-any.whl
  • Upload date:
  • Size: 375.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for loom_core-1.11.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ba5e4cb9c2d39c73ad47d1f02d3527434e75c156b26a73f78ef89f3fb046d0f6
MD5 243cbce07242810ab1fc0a43917e7d0d
BLAKE2b-256 5bb657d70e5cc33de34a02d66a00e59f70c3fde0030972eafa76bdd166abcf75

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page