Skip to main content

Event-driven workflow orchestration framework with flexible connection, state management, and workflow orchestration capabilities

Project description

Routilux โšก

PyPI version Python 3.7+ License Documentation CI codecov

Routilux is a powerful, event-driven workflow orchestration framework that makes building complex data pipelines and workflows effortless. With its intuitive API and flexible architecture, you can create sophisticated workflows in minutes, not hours.

โœจ Why Routilux?

  • ๐Ÿš€ Event Queue Architecture: Non-blocking event emission with unified execution model for both sequential and concurrent modes
  • ๐Ÿ”— Flexible Connections: Many-to-many relationships between routines with intelligent data routing
  • ๐Ÿ“Š Built-in State Management: Track execution state, performance metrics, and history out of the box
  • ๐Ÿ›ก๏ธ Robust Error Handling: Multiple strategies (STOP, CONTINUE, RETRY, SKIP) with automatic recovery
  • โšก Concurrent Execution: Automatic parallelization for I/O-bound operations via unified event queue
  • ๐Ÿ’พ Persistence & Recovery: Save and resume workflows from any point with pending task serialization
  • ๐ŸŽฏ Production Ready: Comprehensive error handling, execution tracking, and monitoring
  • ๐ŸŽจ Simplified API: Automatic flow detection - no need to pass flow parameter in most cases

๐ŸŽฏ Perfect For

  • Data Pipelines: ETL processes, data transformation workflows
  • API Orchestration: Coordinating multiple API calls with complex dependencies
  • Event Processing: Real-time event streams and reactive systems
  • Workflow Automation: Business process automation and task scheduling
  • Microservices Coordination: Managing interactions between services
  • LLM Agent Workflows: Complex AI agent orchestration and chaining

๐Ÿ“ฆ Installation

Quick Install (Recommended)

pip install routilux

That's it! You're ready to go.

Development Setup with uv (Recommended)

This project uses uv for fast dependency management. Install uv first:

curl -LsSf https://astral.sh/uv/install.sh | sh

Then set up the development environment:

Recommended: For active development

# Install package with all development dependencies (recommended)
make dev-install

# Or manually with uv (dev group is installed by default)
uv sync --group docs --all-extras

Alternative: Dependencies only (for CI/CD or code review)

# Create virtual environment and install dependencies only (without installing the package)
# Useful for: CI/CD pipelines, code review, or when you only need development tools
make setup-venv

# Later, if you need to install the package:
make install

Understanding dependency groups vs extras:

  • Dependency groups (dev, docs): Development dependencies that are not published to PyPI. The dev group is installed by default with uv sync.
  • Extras: Currently none, but may be added in the future.

All make commands will automatically use uv if available, otherwise fall back to pip.

Development Install (Legacy - using pip)

For development with all dependencies using pip:

pip install -e ".[dev]"
# Or using Makefile
make dev-install

๐Ÿ–ฅ๏ธ CLI

Routilux includes a command-line interface for workflow management:

# Install with CLI support
pip install routilux[cli]

# Run a workflow
routilux run --workflow flow.yaml

# Start server
routilux server start

# See all commands
routilux --help

CLI Commands

  • routilux init - Initialize a new project with example files
  • routilux run - Execute a workflow from a DSL file
  • routilux server - Start the HTTP server for API access
  • routilux list - List available routines or flows
  • routilux validate - Validate a workflow DSL file

See CLI Documentation for details.

๐Ÿš€ Quick Start

For development with all dependencies using pip:

pip install -e ".[dev]"
# Or using Makefile
make dev-install

๐Ÿš€ Quick Start

Create Your First Workflow in 3 Steps

Step 1: Define a Routine

from routilux import Routine

class DataProcessor(Routine):
    def __init__(self):
        super().__init__()
        # Define input slot
        self.input_slot = self.define_slot("input", handler=self.process_data)
        # Define output event
        self.output_event = self.define_event("output", ["result"])
    
    def process_data(self, data=None, **kwargs):
        # Flow is automatically detected from routine context
        result = f"Processed: {data}"
        self._stats["processed_count"] = self._stats.get("processed_count", 0) + 1
        self.emit("output", result=result)  # No need to pass flow!

Step 2: Create and Connect a Flow

from routilux import Flow

flow = Flow(flow_id="my_workflow")

processor1 = DataProcessor()
processor2 = DataProcessor()

id1 = flow.add_routine(processor1, "processor1")
id2 = flow.add_routine(processor2, "processor2")

# Connect: processor1's output โ†’ processor2's input
flow.connect(id1, "output", id2, "input")

Step 3: Execute

job_state = flow.execute(id1, entry_params={"data": "Hello, Routilux!"})
print(job_state.status)  # "completed"
print(processor1.stats())  # {"processed_count": 1}

๐ŸŽ‰ Done! You've created your first workflow.

๐Ÿ’ก Key Features

๐Ÿ”„ Event Queue Architecture

Routines communicate through events and slots using a unified event queue pattern:

# Multiple routines can listen to the same event
flow.connect(processor1, "output", processor2, "input")
flow.connect(processor1, "output", processor3, "input")  # Fan-out

# Multiple events can feed into the same slot
flow.connect(processor1, "output", aggregator, "input")
flow.connect(processor2, "output", aggregator, "input")  # Fan-in

# emit() is non-blocking - returns immediately after enqueuing tasks
# Flow is automatically detected from routine context
self.emit("output", data="value")  # No flow parameter needed!

๐ŸŽ›๏ธ Flexible State Management

Track everything automatically:

# Access routine state
stats = routine.stats()  # {"processed_count": 42, "errors": 0}

# Track execution history
history = job_state.get_execution_history()

# Performance metrics
perf = flow.execution_tracker.get_routine_performance("processor1")

๐Ÿ›ก๏ธ Built-in Error Handling

Choose the right strategy for your use case:

from routilux import ErrorHandler, ErrorStrategy

# Stop on error (default)
flow.set_error_handler(ErrorHandler(ErrorStrategy.STOP))

# Continue and log errors
flow.set_error_handler(ErrorHandler(ErrorStrategy.CONTINUE))

# Retry with exponential backoff
flow.set_error_handler(ErrorHandler(
    ErrorStrategy.RETRY,
    max_retries=3,
    retry_delay=1.0,
    backoff_multiplier=2.0
))

โšก Unified Execution Model

Both sequential and concurrent modes use the same event queue mechanism:

# Sequential mode (default): max_workers=1
flow = Flow()  # Sequential by default

# Concurrent mode: max_workers>1
flow.set_execution_strategy("concurrent", max_workers=4)

# Tasks are processed fairly in queue order
# Long chains don't block shorter ones
job_state = flow.execute(entry_routine_id)
flow.wait_for_completion()  # Wait for async tasks

๐Ÿ’พ Persistence & Recovery

Save and resume workflows:

# Save workflow state
job_state.save("workflow_state.json")

# Later, resume from saved state
saved_state = JobState.load("workflow_state.json")
flow.resume(saved_state)

๐Ÿ“š Documentation

๐Ÿ“– Full documentation available at: routilux.readthedocs.io

Documentation Highlights

  • ๐Ÿ“˜ User Guide: Comprehensive guide covering all features
  • ๐Ÿ”ง API Reference: Complete API documentation
  • ๐Ÿ’ป Examples: Real-world code examples
  • ๐Ÿ—๏ธ Design: Architecture and design principles

Build Documentation Locally

pip install -e ".[docs]"
cd docs && make html

๐ŸŽ“ Examples

Check out the examples/ directory for practical examples:

Core Examples

  • basic_example.py - Your first workflow
  • data_processing.py - Multi-stage data pipeline
  • concurrent_flow_demo.py - Parallel execution
  • error_handling_example.py - Error handling strategies
  • state_management_example.py - State tracking and recovery
  • builtin_routines_demo.py - Using built-in routines

Real-World Usage Patterns

  • data_pipeline.py - Multi-stage data processing with validation, transformation, and quality checks
  • async_orchestration.py - Concurrent task execution with result aggregation
  • long_running_workflow.py - Pause/resume execution with state persistence and recovery
  • error_handling.py - Retry patterns and fallback mechanisms

Run examples:

python examples/basic_example.py
python examples/data_pipeline.py
python examples/async_orchestration.py
python examples/long_running_workflow.py
python examples/error_handling.py

๐Ÿงฉ Built-in Routines

Routilux comes with a rich set of built-in routines ready to use:

  • Text Processing: TextClipper, TextRenderer, ResultExtractor
  • Data Processing: DataTransformer, DataValidator, DataFlattener
  • Control Flow: ConditionalRouter for dynamic routing
  • Utilities: TimeProvider for timestamps
from routilux.builtin_routines import ConditionalRouter, DataTransformer

# Use built-in routines directly
router = ConditionalRouter()
transformer = DataTransformer()

๐Ÿ—๏ธ Project Structure

routilux/
โ”œโ”€โ”€ routilux/              # Main package
โ”‚   โ”œโ”€โ”€ routine.py         # Routine base class
โ”‚   โ”œโ”€โ”€ flow.py            # Flow manager
โ”‚   โ”œโ”€โ”€ job_state.py       # State management
โ”‚   โ”œโ”€โ”€ connection.py      # Connection management
โ”‚   โ”œโ”€โ”€ event.py           # Event class
โ”‚   โ”œโ”€โ”€ slot.py            # Slot class
โ”‚   โ”œโ”€โ”€ error_handler.py   # Error handling
โ”‚   โ””โ”€โ”€ execution_tracker.py # Performance tracking
โ”œโ”€โ”€ tests/                 # Comprehensive test suite
โ”œโ”€โ”€ examples/              # Usage examples
โ””โ”€โ”€ docs/                  # Sphinx documentation

๐Ÿงช Testing

Routilux comes with comprehensive tests:

# Run all tests
make test-all

# Run with coverage
make test-cov

# Run specific test suite
pytest tests/                    # Core tests
pytest routilux/builtin_routines/  # Built-in routines tests

๐Ÿค Contributing

We welcome contributions! Here's how you can help:

  1. Star the project โญ - Show your support
  2. Report bugs ๐Ÿ› - Help us improve
  3. Suggest features ๐Ÿ’ก - Share your ideas
  4. Submit PRs ๐Ÿ”ง - Contribute code

๐Ÿข About Agentsmith

Routilux is part of the Agentsmith open-source ecosystem. Agentsmith is a ToB AI agent and algorithm development platform, currently deployed in multiple highway management companies, securities firms, and regulatory agencies in China. The Agentsmith team is gradually open-sourcing the platform by removing proprietary code and algorithm modules, as well as enterprise-specific customizations, while decoupling the system for modular use by the open-source community.

๐ŸŒŸ Agentsmith Open-Source Projects

  • Varlord โš™๏ธ - Configuration management library with multi-source support
  • Routilux โšก - Event-driven workflow orchestration framework
  • Serilux ๐Ÿ“ฆ - Flexible serialization framework for Python objects
  • Lexilux ๐Ÿš€ - Unified LLM API client library

These projects are modular components extracted from the Agentsmith platform, designed to be used independently or together to build powerful applications.

๐Ÿ“„ License

Routilux is licensed under the Apache License 2.0. See LICENSE for details.

๐Ÿ”— Links

โญ Show Your Support

If Routilux helps you build amazing workflows, consider giving it a star on GitHub!


Built with โค๏ธ by the Routilux Team

Making workflow orchestration simple, powerful, and fun.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

routilux-0.13.0.tar.gz (596.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

routilux-0.13.0-py3-none-any.whl (368.5 kB view details)

Uploaded Python 3

File details

Details for the file routilux-0.13.0.tar.gz.

File metadata

  • Download URL: routilux-0.13.0.tar.gz
  • Upload date:
  • Size: 596.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for routilux-0.13.0.tar.gz
Algorithm Hash digest
SHA256 9682ab0b5882f84f50fc3e0931f298cd48c12716047efb518fd6a355306977f0
MD5 dff9b9ab24ed171f031494e604607cfa
BLAKE2b-256 0f87dad18cd8c206d84afa32132e64e4d1a60fe56decbc05d8ac6d695061c344

See more details on using hashes here.

File details

Details for the file routilux-0.13.0-py3-none-any.whl.

File metadata

  • Download URL: routilux-0.13.0-py3-none-any.whl
  • Upload date:
  • Size: 368.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for routilux-0.13.0-py3-none-any.whl
Algorithm Hash digest
SHA256 95241f8b62d8aa479af7256d9dfc9e0274447d14861a16d54c043a5bf76285d6
MD5 8b019f3f685077bf29de57e56278935d
BLAKE2b-256 e530b8839a73b5437b457d58dac59cb27f8d4b02950c62523681968a063a39d1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page