Skip to main content

A state-machine based orchestrator for long-running AI and other jobs.

Project description

EN | ES | RU

Avtomatika Orchestrator

License: MIT Python 3.11+ Code Style: Ruff

Avtomatika is a powerful, state-driven engine for managing complex asynchronous workflows in Python. It provides a robust framework for building scalable and resilient applications by separating process logic from execution logic.

This document serves as a comprehensive guide for developers looking to build pipelines (blueprints) and embed the Orchestrator into their applications.

Table of Contents

Core Concept: Orchestrator, Blueprints, and Workers

The project is based on a simple yet powerful architectural pattern that separates process logic from execution logic.

  • Orchestrator (OrchestratorEngine) — The Director. It manages the entire process from start to finish, tracks state, handles errors, and decides what should happen next. It does not perform business tasks itself.
  • Blueprints (Blueprint) — The Script. Each blueprint is a detailed plan (a state machine) for a specific business process. It describes the steps (states) and the rules for transitioning between them.
  • Workers (Worker) — The Team of Specialists. These are independent, specialized executors. Each worker knows how to perform a specific set of tasks (e.g., "process video," "send email") and reports back to the Orchestrator.

Ecosystem

Avtomatika is part of a larger ecosystem:

  • Avtomatika Protocol: Shared package containing protocol definitions, data models, and utilities ensuring consistency across all components.
  • Avtomatika Worker SDK: The official Python SDK for building workers that connect to this engine.
  • HLN Protocol: The architectural specification and manifesto behind the system (Hierarchical Logic Network).
  • Full Example: A complete reference project demonstrating the engine and workers in action.

Installation

  • Install the core engine only:

    pip install avtomatika
    
  • Install with Redis support (recommended for production):

    pip install "avtomatika[redis]"
    
  • Install with history storage support (SQLite, PostgreSQL):

    pip install "avtomatika[history]"
    
  • Install with telemetry support (Prometheus, OpenTelemetry):

    pip install "avtomatika[telemetry]"
    
  • Install with S3 support (Payload Offloading):

    pip install "avtomatika[s3]"
    
  • Install all dependencies, including for testing:

    pip install "avtomatika[all,test]"
    

Quick Start: Usage as a Library

You can easily integrate and run the orchestrator engine within your own application.

# my_app.py
import asyncio
from avtomatika import OrchestratorEngine, StateMachineBlueprint
from avtomatika.context import ActionFactory
from avtomatika.storage import MemoryStorage
from avtomatika.config import Config

# 1. General Configuration
storage = MemoryStorage()
config = Config() # Loads configuration from environment variables

# Explicitly set tokens for this example
# Client token must be sent in the 'X-Client-Token' header.
config.CLIENT_TOKEN = "my-secret-client-token"
# Worker token must be sent in the 'X-Worker-Token' header.
config.GLOBAL_WORKER_TOKEN = "my-secret-worker-token"

# 2. Define the Workflow Blueprint
my_blueprint = StateMachineBlueprint(
    name="my_first_blueprint",
    api_version="v1",
    api_endpoint="/jobs/my_flow"
)

# Use dependency injection to get only the data you need.
@my_blueprint.handler_for("start", is_start=True)
async def start_handler(job_id: str, initial_data: dict, actions: ActionFactory):
    """The initial state for each new job."""
    print(f"Job {job_id} | Start: {initial_data}")
    actions.transition_to("end")

# You can still request the full context object if you prefer.
@my_blueprint.handler_for("end", is_end=True)
async def end_handler(context):
    """The final state. The pipeline ends here."""
    print(f"Job {context.job_id} | Complete.")

# 3. Initialize the Orchestrator Engine
engine = OrchestratorEngine(storage, config)
engine.register_blueprint(my_blueprint)

# 4. Accessing Components (Optional)
# You can access the internal aiohttp app and core components using AppKeys
# from avtomatika.app_keys import ENGINE_KEY, DISPATCHER_KEY
# app = engine.app
# dispatcher = app[DISPATCHER_KEY]

# 5. Define the main entrypoint to run the server
async def main():
    await engine.start()
    
    try:
        await asyncio.Event().wait()
    finally:
        await engine.stop()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nStopping server.")

Engine Lifecycle: run() vs. start()

The OrchestratorEngine offers two ways to start the server:

  • engine.run(): This is a simple, blocking method. It's useful for dedicated scripts where the orchestrator is the only major component. It handles starting and stopping the server for you. You should not use this inside an async def function that is part of a larger application, as it can conflict with the event loop.

  • await engine.start() and await engine.stop(): These are the non-blocking methods for integrating the engine into a larger asyncio application.

    • start() sets up and starts the web server in the background.
    • stop() gracefully shuts down the server and cleans up resources. The "Quick Start" example above demonstrates the correct way to use these methods.

Handler Arguments & Dependency Injection

State handlers are the core of your workflow logic. Avtomatika provides a powerful dependency injection system to make writing handlers clean and efficient.

Instead of receiving a single, large context object, your handler can ask for exactly what it needs as function arguments. The engine will automatically provide them.

The following arguments can be injected by name:

  • From the core job context:
    • job_id (str): The ID of the current job.
    • initial_data (dict): The data the job was created with.
    • state_history (dict): A dictionary for storing and passing data between steps. Data returned by workers is automatically merged into this dictionary.
    • actions (ActionFactory): The object used to tell the orchestrator what to do next (e.g., actions.transition_to(...)).
    • client (ClientConfig): Information about the API client that started the job.
    • data_stores (SimpleNamespace): Access to shared resources like database connections or caches.
  • From worker results:
    • Any key from a dictionary returned by a previous worker can be injected by name.

Example: Dependency Injection

This is the recommended way to write handlers.

# A worker for this task returned: {"output_path": "/videos/123.mp4", "duration": 95}
# This dictionary was automatically merged into `state_history`.

@my_blueprint.handler_for("publish_video")
async def publish_handler(
    job_id: str,
    output_path: str, # Injected from state_history
    duration: int,    # Injected from state_history
    actions: ActionFactory
):
    print(f"Job {job_id}: Publishing video at {output_path} ({duration}s).")
    actions.transition_to("complete")

The actions Object

This is the most important injected argument. It tells the orchestrator what to do next. Only one actions method can be called in a single handler.

  • actions.transition_to("next_state"): Moves the job to a new state.
  • actions.dispatch_task(...): Delegates work to a Worker.
  • actions.dispatch_parallel(...): Runs multiple tasks at once.
  • actions.await_human_approval(...): Pauses the workflow for external input.
  • actions.run_blueprint(...): Starts a child workflow.

Backward Compatibility: The context Object

For backward compatibility or if you prefer to have a single object, you can still ask for context.

# This handler is equivalent to the one above.
@my_blueprint.handler_for("publish_video")
async def publish_handler_old_style(context):
    output_path = context.state_history.get("output_path")
    duration = context.state_history.get("duration")

    print(f"Job {context.job_id}: Publishing video at {output_path} ({duration}s).")
    context.actions.transition_to("complete")

Key Concepts: JobContext and Actions

High Performance Architecture

Avtomatika is engineered for high-load environments with thousands of concurrent workers.

  • Smart Dispatching: High-performance routing using Redis Set intersections.
    • Hot Cache & Skill Awareness: Prioritizes workers that already have specific AI models loaded in memory (based on model_name param) or have everything ready for a specific skill.
    • Load Balancing: Employs optimistic load incrementing to prevent worker overloading between heartbeats.
  • Bi-directional Heartbeats: A robust feedback loop where the orchestrator sends urgent commands (like task cancellations) directly in response to worker heartbeats, ensuring reliability even without persistent connections.
  • Zero Trust Security:
    • mTLS (Mutual TLS): Mutual authentication between Orchestrator and Workers using certificates.
    • STS (Security Token Service): Token rotation mechanism with short-lived access tokens.
    • Identity Extraction: Automatically maps Certificate Common Name (CN) to Worker ID.
  • Data Integrity:
    • End-to-End Validation: Automatic verification of file size and ETag (hash) during S3 transfers.
    • Audit Trail: File metadata is logged in history for full traceability.
  • Protocol Layer: Built on top of rxon, a strict contract defining interactions, ensuring forward compatibility and allowing transport evolution (e.g., to gRPC).
  • Non-Blocking I/O:
    • Webhooks: Sent via a bounded background queue.
    • S3 Streaming: Constant memory usage regardless of file size.

Blueprint Cookbook: Key Features

1. Conditional Transitions (.when())

Use .when() to create conditional logic branches. The condition string is evaluated by the engine before the handler is called, so it still uses the context. prefix. The handler itself, however, can use dependency injection.

# The `.when()` condition still refers to `context`.
@my_blueprint.handler_for("decision_step").when("context.initial_data.type == 'urgent'")
async def handle_urgent(actions):
    actions.transition_to("urgent_processing")

# The default handler if no `.when()` condition matches.
@my_blueprint.handler_for("decision_step")
async def handle_normal(actions):
    actions.transition_to("normal_processing")

Note on Limitations: The current version of .when() uses a simple parser with the following limitations:

  • No Nested Attributes: You can only access direct fields of context.initial_data or context.state_history (e.g., context.initial_data.field). Nested objects (e.g., context.initial_data.area.field) are not supported.
  • Simple Comparisons Only: Only the following operators are supported: ==, !=, >, <, >=, <=. Complex logical expressions with AND, OR, or NOT are not allowed.
  • Limited Value Types: The parser only recognizes strings (in quotes), integers, and floats. Boolean values (True, False) and None are not correctly parsed and will be treated as strings.

2. Delegating Tasks to Workers (dispatch_task)

This is the primary function for delegating work. The orchestrator will queue the task and wait for a worker to pick it up and return a result.

@my_blueprint.handler_for("transcode_video")
async def transcode_handler(initial_data, actions):
    actions.dispatch_task(
        task_type="video_transcoding",
        params={"input_path": initial_data.get("path")},
        # Define the next step based on the worker's response status
        transitions={
            "success": "publish_video",
            "failure": "transcoding_failed",
            "needs_review": "manual_review" # Example of a custom status
        }
    )

If the worker returns a status not listed in transitions, the job will automatically transition to a failed state.

3. Parallel Execution and Aggregation (Fan-out/Fan-in)

Run multiple tasks simultaneously and gather their results.

# 1. Fan-out: Dispatch multiple tasks to be aggregated into a single state
@my_blueprint.handler_for("process_files")
async def fan_out_handler(initial_data, actions):
    tasks_to_dispatch = [
        {"task_type": "file_analysis", "params": {"file": file}})
        for file in initial_data.get("files", [])
    ]
    # Use dispatch_parallel to send all tasks at once.
    # All successful tasks will implicitly lead to the 'aggregate_into' state.
    actions.dispatch_parallel(
        tasks=tasks_to_dispatch,
        aggregate_into="aggregate_results"
    )

# 2. Fan-in: Collect results using the @aggregator_for decorator
@my_blueprint.aggregator_for("aggregate_results")
async def aggregator_handler(aggregation_results, state_history, actions):
    # This handler will only execute AFTER ALL tasks
    # dispatched by dispatch_parallel are complete.

    # aggregation_results is a dictionary of {task_id: result_dict}
    summary = [res.get("data") for res in aggregation_results.values()]
    state_history["summary"] = summary
    actions.transition_to("processing_complete")

4. Dependency Injection (DataStore)

Provide handlers with access to external resources (like a cache or DB client).

import redis.asyncio as redis

# 1. Initialize and register your DataStore
redis_client = redis.Redis(decode_responses=True)
bp = StateMachineBlueprint(
    "blueprint_with_datastore",
    data_stores={"cache": redis_client}
)

# 2. Use it in a handler via dependency injection
@bp.handler_for("get_from_cache")
async def cache_handler(data_stores):
    # Access the redis_client by the name "cache"
    user_data = await data_stores.cache.get("user:123")
    print(f"User from cache: {user_data}")

5. Native Scheduler

Avtomatika includes a built-in distributed scheduler. It allows you to trigger blueprints periodically (interval, daily, weekly, monthly) without external tools like cron.

  • Configuration: Defined in schedules.toml.
  • Timezone Aware: Supports global timezone configuration (e.g., TZ="Europe/Moscow").
  • Expiration Support: Supports dispatch_timeout and result_timeout to ensure tasks don't run or complete too late.
  • Distributed Locking: Safe to run with multiple orchestrator instances; jobs are guaranteed to run only once per interval using distributed locks (Redis/Memory).
# schedules.toml example
[nightly_backup]
blueprint = "backup_flow"
daily_at = "02:00"
dispatch_timeout = 60 # Fail if no worker picks it up within 1 minute

6. Webhook Notifications

The orchestrator can send asynchronous notifications to an external system when a job completes, fails, or is quarantined. This eliminates the need for clients to constantly poll the API for status updates.

7. S3 Payload Offloading

Orchestrator provides first-class support for handling large files via S3-compatible storage, powered by the high-performance obstore library (Rust bindings).

  • Memory Safe (Streaming): Uses streaming for uploads and downloads, allowing processing of files larger than available RAM without OOM errors.
  • Managed Mode: The Orchestrator manages file lifecycle (automatic cleanup of S3 objects and local temporary files on job completion).
  • Dependency Injection: Use the task_files argument in your handlers to easily read/write data.
  • Directory Support: Supports recursive download and upload of entire directories.
@bp.handler_for("process_data")
async def process_data(task_files, actions):
    # Streaming download of a large file
    local_path = await task_files.download("large_dataset.csv")
    
    # ... process data ...
    
    # Upload results
    await task_files.write_json("results.json", {"status": "done"})
    
    actions.transition_to("finished")

Production Configuration

  • Events:
    • job_finished: The job reached a final success state.
    • job_failed: The job failed (e.g., due to an error or invalid input).
    • job_quarantined: The job was moved to quarantine after repeated failures.

Example Request:

POST /api/v1/jobs/my_flow
{
    "initial_data": {
        "video_url": "..."
    },
    "webhook_url": "https://my-app.com/webhooks/avtomatika",
    "dispatch_timeout": 30,
    "result_timeout": 120
}

Example Webhook Payload:

{
    "event": "job_finished",
    "job_id": "123e4567-e89b-12d3-a456-426614174000",
    "status": "finished",
    "result": {
        "output_path": "/videos/result.mp4"
    },
    "error": null
}

Production Configuration

The orchestrator's behavior can be configured through environment variables. Additionally, any configuration parameter loaded from environment variables can be programmatically overridden in your application code after the Config object has been initialized. This provides flexibility for different deployment and testing scenarios.

Important: The system employs strict validation for configuration files (clients.toml, workers.toml) at startup. If a configuration file is invalid (e.g., malformed TOML, missing required fields), the application will fail fast and exit with an error, rather than starting in a partially broken state. This ensures the security and integrity of the deployment.

Configuration Files

To manage access and worker settings securely, Avtomatika uses TOML configuration files.

  • clients.toml: Defines API clients, their tokens, plans, and quotas.
    [client_premium]
    token = "secret-token-123"
    plan = "premium"
    
  • workers.toml: Defines individual tokens for workers to enhance security.
    [gpu-worker-01]
    token = "worker-secret-456"
    
  • schedules.toml: Defines periodic tasks (CRON-like) for the native scheduler.
    [nightly_backup]
    blueprint = "backup_flow"
    daily_at = "02:00"
    

For detailed specifications and examples, please refer to the Configuration Guide.

Fault Tolerance

The orchestrator has built-in mechanisms for handling failures based on the error.code field in a worker's response.

  • TRANSIENT_ERROR: A temporary error (e.g., network failure). The orchestrator will automatically retry the task several times.
  • RESOURCE_EXHAUSTED_ERROR / TIMEOUT_ERROR / INTERNAL_ERROR: Treated as transient errors and retried.
  • PERMANENT_ERROR: A permanent error. The task will be immediately sent to quarantine.
  • SECURITY_ERROR / DEPENDENCY_ERROR: Treated as permanent errors (e.g., security violation or missing model). Immediate quarantine.
  • INVALID_INPUT_ERROR: An error in the input data. The entire pipeline (Job) will be immediately moved to the failed state.

Progress Tracking

Workers can report real-time execution progress (0-100%) and status messages. This information is automatically persisted by the Orchestrator and exposed via the Job Status API (GET /api/v1/jobs/{job_id}).

Concurrency & Performance

To prevent system overload during high traffic, the Orchestrator implements a backpressure mechanism for its internal job processing logic.

  • EXECUTOR_MAX_CONCURRENT_JOBS: Limits the number of job handlers running simultaneously within the Orchestrator process (default: 100). If this limit is reached, new jobs remain in the Redis queue until a slot becomes available. This ensures the event loop remains responsive even with a massive backlog of pending jobs.

High Availability & Distributed Locking

The architecture supports horizontal scaling. Multiple Orchestrator instances can run behind a load balancer.

  • Stateless API: The API is stateless; all state is persisted in Redis.
  • Instance Identity: Each instance should have a unique INSTANCE_ID (defaults to hostname) for correct handling of Redis Streams consumer groups.
  • Distributed Locking: Background processes (Watcher, ReputationCalculator) use distributed locks (via Redis SET NX) to coordinate and prevent race conditions when multiple instances are active.

Logging & Observability

Avtomatika is designed for modern observability stacks (ELK, Loki, Prometheus).

  • Structured Logging: By default, logs are output in JSON format, making them easy to parse and index. Can be switched to text via LOG_FORMAT="text".
  • Timezone Awareness: All log timestamps respect the globally configured TZ environment variable.
  • Traceability: Logs include job_id, worker_id, and task_id for full end-to-end tracing.
  • Metrics: Prometheus metrics are available at /_public/metrics, including a specific counter orchestrator_ratelimit_blocked_total to track blocked requests.

Rate Limiting

The Orchestrator includes a built-in, granular rate limiter based on Redis to protect against abuse and DDoS.

  • Granular Protection: Limits are applied per Client Token (for API clients) or per Worker ID (for workers).
  • Context Aware: Different limits apply to different operations:
    • Heartbeats: Higher limit (default 120/min) to allow frequent status updates.
    • Polling: Moderate limit (default 60/min) for task fetching.
    • General API: Default limit (default 100/min) for other operations.
  • Global Enforcement: The middleware is applied globally, protecting all entry points including Worker API and Client API.

Storage Backend

By default, the engine uses in-memory storage. For production, you must configure persistent storage via environment variables.

  • Redis (StorageBackend): For storing current job states (serialized with msgpack) and managing task queues (using Redis Streams with consumer groups).

    • Install:
      pip install "avtomatika[redis]"
      
    • Configure:
      export REDIS_HOST=your_redis_host
      
  • PostgreSQL/SQLite (HistoryStorage): For archiving completed job history.

    • Install:
      pip install "avtomatika[history]"
      
    • Configure:
      export HISTORY_DATABASE_URI=...
      
      • SQLite: sqlite:///path/to/history.db
      • PostgreSQL: postgresql://user:pass@host/db

Dynamic Blueprint Loading

Avtomatika supports automatic loading of blueprints from a directory. This allows you to deploy and update your workflow logic by simply copying Python files without changing the orchestrator's core code.

  • Configure: Set the BLUEPRINTS_DIR environment variable to the path containing your blueprint files.
  • How it works: At startup, the engine scans the directory for all .py files, imports them, and automatically registers any found StateMachineBlueprint instances.

Security

The orchestrator uses tokens to authenticate API requests.

  • Client Authentication: All API clients must provide a token in the X-Client-Token header. The orchestrator validates this token against client configurations.
  • Worker Authentication: Workers must provide a token in the X-Worker-Token header.
    • GLOBAL_WORKER_TOKEN: You can set a global token for all workers using this environment variable. For development and testing, it defaults to "secure-worker-token".
    • Individual Tokens: For production, it is recommended to define individual tokens for each worker in a separate configuration file and provide its path via the WORKERS_CONFIG_PATH environment variable. Tokens from this file are stored in a hashed format for security.

Note on Dynamic Reloading: The worker configuration file can be reloaded without restarting the orchestrator by sending an authenticated POST request to the /api/v1/admin/reload-workers endpoint. This allows for dynamic updates of worker tokens.

Pure Holon Mode

For high-security environments or when operating as a Compound Holon within an HLN, you can disable the public client API.

  • Enable/Disable: Set ENABLE_CLIENT_API="false" (default: true).
  • Effect: The Orchestrator will stop listening on /api/v1/jobs/.... It will only accept tasks via the Worker Protocol (RXON) from its parent.

Observability

When installed with the telemetry dependency, the system automatically provides:

  • Prometheus Metrics: Available at the /_public/metrics endpoint.
  • Distributed Tracing: Compatible with OpenTelemetry and systems like Jaeger or Zipkin.

Contributor Guide

Setup Environment

  • Clone the repository.
  • For local development, install the protocol package first:
    pip install -e ../rxon
    
  • Then install the engine in editable mode with all dependencies:
    pip install -e ".[all,test]"
    
  • Ensure you have system dependencies installed, such as graphviz.
    • Debian/Ubuntu:
      sudo apt-get install graphviz
      
    • macOS (Homebrew):
      brew install graphviz
      

Running Tests

To run the avtomatika test suite:

pytest tests/

Interactive API Documentation

Avtomatika provides a built-in interactive API documentation page (similar to Swagger UI) that is automatically generated based on your registered blueprints.

  • Endpoint: /_public/docs
  • Features:
    • List of all system endpoints: Detailed documentation for Public, Protected, and Worker API groups.
    • Dynamic Blueprint Documentation: Automatically generates and lists documentation for all blueprints registered in the engine, including their specific API endpoints.
    • Interactive Testing: Allows you to test API calls directly from the browser. You can provide authentication tokens, parameters, and request bodies to see real server responses.

Detailed Documentation

For a deeper dive into the system, please refer to the following documents:

  • Architecture Guide: A detailed overview of the system components and their interactions.
  • API Reference: Full specification of the HTTP API.
  • Deployment Guide: Instructions for deploying with Gunicorn/Uvicorn and NGINX.
  • Cookbook: Examples and best practices for creating blueprints.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

avtomatika-1.0b13.tar.gz (139.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

avtomatika-1.0b13-py3-none-any.whl (98.4 kB view details)

Uploaded Python 3

File details

Details for the file avtomatika-1.0b13.tar.gz.

File metadata

  • Download URL: avtomatika-1.0b13.tar.gz
  • Upload date:
  • Size: 139.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for avtomatika-1.0b13.tar.gz
Algorithm Hash digest
SHA256 24c1e9184be833ffad969ddf93755b05f77158f8cc97fa6fbd274cc711c84ad0
MD5 6578140baf58a4887931f67f0b072b40
BLAKE2b-256 104ce3a755d249cc1a91b1b1d792cfd1fa8baced7e53f2b5a4d7d58829dc2048

See more details on using hashes here.

File details

Details for the file avtomatika-1.0b13-py3-none-any.whl.

File metadata

  • Download URL: avtomatika-1.0b13-py3-none-any.whl
  • Upload date:
  • Size: 98.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for avtomatika-1.0b13-py3-none-any.whl
Algorithm Hash digest
SHA256 3a9cdc70352b2849debedb494b2a2e903bdd74aae721c3207c3c2c0a31ad3433
MD5 a0700bf1749490869c864900896a4e29
BLAKE2b-256 202b5970c336e7f4ae2ce08a070c9169482c876fd020ca46125348e82ac4a474

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page