Durable workflow orchestration engine for Python
Project description
Loom - Durable Workflow Orchestration
A Python-based durable workflow orchestration engine inspired by Temporal and Durable Task Framework. Loom provides event-sourced, deterministic workflow execution with automatic recovery and replay capabilities.
Features
- Event Sourcing: All workflow state changes persisted as immutable events
- Deterministic Replay: Workflows reconstruct from event history for recovery
- Type Safe: Full generic typing support with
Workflow[InputT, StateT] - Async First: Built on asyncio for high-performance concurrent execution
- Durable Execution: Workflows survive process crashes and auto-recover
- Beautiful CLI: Rich console interface with progress tracking
- Well Tested: Comprehensive test suite with pytest
Quick Start
Installation
pip install loom-core
Or install from source:
git clone https://github.com/yourusername/loom.git
cd loom
pip install -e .
Define a Workflow
import asyncio
from typing import TypedDict
import loom
# Define your data types
class OrderInput(TypedDict):
order_id: str
customer_email: str
class OrderState(TypedDict):
payment_confirmed: bool
email_sent: bool
# Define activities (side effects)
@loom.activity(name="process_payment", retry_count=3, timeout_seconds=30)
async def process_payment(order_id: str) -> bool:
# Call payment API
return True
@loom.activity(name="send_email", retry_count=2)
async def send_confirmation_email(email: str, order_id: str) -> None:
# Send email via service
pass
# Define workflow
@loom.workflow(name="OrderProcessing", version="1.0.0")
class OrderWorkflow(loom.Workflow[OrderInput, OrderState]):
@loom.step(name="process_payment")
async def payment_step(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
success = await ctx.activity(process_payment, ctx.input["order_id"])
await ctx.state.set("payment_confirmed", success)
ctx.logger.info(f"Payment processed: {success}")
@loom.step(name="send_confirmation")
async def notification_step(self, ctx: loom.WorkflowContext[OrderInput, OrderState]):
if ctx.state["payment_confirmed"]:
await ctx.activity(
send_confirmation_email,
ctx.input["customer_email"],
ctx.input["order_id"]
)
await ctx.state.set("email_sent", True)
ctx.logger.info("Confirmation email sent")
Note: For state updates, use:
await ctx.state.set("key", value)for single valuesawait ctx.state.update(key=lambda _: asyncio.sleep(0, value))for batch updates (requires awaitable)
See STATE_MANAGEMENT.md for detailed examples.
Start a Workflow
async def main():
db = loom.Database()
async with db:
# Initialize database
await db.migrate_up()
# Start workflow
handle = await db.start_workflow(
OrderWorkflow,
workflow_input=OrderInput(
order_id="ORD-12345",
customer_email="customer@example.com"
),
initial_state=OrderState(
payment_confirmed=False,
email_sent=False
),
)
print(f"Workflow started: {handle.workflow_id}")
# Execute workflow tasks
while True:
task_executed = await loom.run_once()
if not task_executed:
break
if __name__ == "__main__":
asyncio.run(main())
Run the Worker
# Initialize database
loom init
# Start worker with 4 concurrent task processors
loom worker
# Custom configuration
loom worker --workers 8 --poll-interval 1.0
CLI Commands
# Initialize database
loom init
# Start distributed worker
loom worker [--workers 4] [--poll-interval 0.5]
# List workflows
loom list [--limit 50] [--status RUNNING]
# Inspect workflow details
loom inspect <workflow-id> [--events]
# Show database statistics
loom stats
🏗️ Architecture
Core Components
Event Types
WORKFLOW_STARTED- Workflow initializationWORKFLOW_COMPLETED- Successful completionWORKFLOW_FAILED- Fatal error occurredSTATE_SET- Single state key updatedSTATE_UPDATE- Batch state updateACTIVITY_SCHEDULED- Activity queued for executionACTIVITY_COMPLETED- Activity finished successfullyACTIVITY_FAILED- Activity permanently failedTIMER_FIRED- Sleep/delay completedSIGNAL_RECEIVED- External signal received
📚 Documentation
See .copilot-instructions.md for comprehensive development guidelines including:
- Event sourcing patterns
- Deterministic execution rules
- Activity best practices
- Testing strategies
- Common pitfalls to avoid
🧪 Testing
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_workflow.py
# Verbose output
pytest -v
Project Structure
loom/
├── src/
│ ├── common/ # Shared utilities
│ ├── core/ # Core engine (context, engine, runner, worker)
│ ├── database/ # Database layer
│ ├── decorators/ # @workflow, @step, @activity
│ ├── lib/ # Utilities and progress tracking
│ ├── migrations/ # Database migrations
│ └── schemas/ # Type definitions
├── tests/ # Test suite
├── examples/ # Example workflows
├── loom.py # Main package interface
└── pyproject.toml # Package configuration
Configuration
Loom uses SQLite by default for simplicity. For production:
- Consider PostgreSQL/MySQL for scalability
- Implement connection pooling
- Add monitoring and alerting
- Deploy multiple workers for high availability
Contributing
Contributions welcome! Please ensure:
- Tests pass:
pytest - Code formatted:
black . - Type checking:
mypy . - Linting:
ruff check .
📝 License
MIT License - see LICENSE file for details
🙏 Acknowledgments
Inspired by:
- Temporal - The workflow orchestration platform
- Durable Task Framework - Microsoft's durable task library
- Cadence - Uber's workflow platform
📧 Contact
For questions and support, please open an issue on GitHub.
Built with ❤️ using Python 3.12+
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file loom_core-0.1.6.tar.gz.
File metadata
- Download URL: loom_core-0.1.6.tar.gz
- Upload date:
- Size: 36.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e05462dc7029675b6bd1298b54cce81683356f33aff4e9589f18f4e3904ce0f3
|
|
| MD5 |
3d94763f255b24748572a8a69c684ef6
|
|
| BLAKE2b-256 |
33600b89c16f8c0846bcacec5db4e16e494e99db15f439a3f29752ed7d1ffeb9
|
File details
Details for the file loom_core-0.1.6-py3-none-any.whl.
File metadata
- Download URL: loom_core-0.1.6-py3-none-any.whl
- Upload date:
- Size: 42.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c8bfffbd9c022fab482ef6d9e69cd3e8ac2a8640dc28057cbad26336017a7d45
|
|
| MD5 |
bf936a3dafe10c24817f93fd16542405
|
|
| BLAKE2b-256 |
db5790649c24377c22bc85bb8098fafadedae01fa98218deba3ef0acd0d50978
|