Skip to main content

Worker SDK for the Avtomatika orchestrator.

Project description

Avtomatika Worker SDK

Official SDK for building workers compatible with the Avtomatika orchestrator. It automates low-level tasks: polling, heartbeats, S3 payload management, and graceful shutdown.

🚀 Key Features

  • Language: Python 3.11+
  • Protocol: Based on RXON (Reverse Axon Protocol) for Hierarchical Logic Networks (Holarchy).
  • Communication Model:
    • PULL: Workers poll tasks from orchestrators (works behind NAT/Firewall).
    • WebSocket: Real-time command channel (cancellation, custom commands).
  • Zero Trust Security:
    • Mandatory HMAC SHA256 signing for all messages using WORKER_TOKEN.
    • Identity Chain and Origin Worker ID support for provenance tracking.
    • Replay protection with timestamp validation.
  • Traffic Optimization:
    • 3-Tier Skills: Supported (catalog), Available (dynamic limits), and Hot (cached).
    • Stable Hashing: Sends full skill catalog only when changed, using skills_hash for light heartbeats.
  • S3 Streaming: High-performance data transfer using obstore. No OOM on large files.
  • Hardware Awareness: Built-in monitoring for CPU, RAM, and NVIDIA GPUs (via psutil and GPUtil).

🛡 Resilience & Connectivity

  • Independent Managers: Connection to each orchestrator is managed by a separate background task. One server failure or rate limit doesn't affect others.
  • Smart Backoff: Unified exponential backoff for registration, polling, and heartbeats.
  • Rate Limit Protection: Full support for Retry-After (seconds or HTTP-date). Implements a mandatory 30s safety floor for 429 errors without Retry-After to prevent Retry Storms.
  • Heartbeat Debouncing: Throttles heartbeats to once every 2 seconds. Events are not lost but consolidated and sent after the cooldown period.
  • Infinite Retries: Workers never stop trying to register with an exponential delay.
  • Graceful Shutdown: Handles SIGTERM and SIGINT properly, waiting for active tasks to finish.

🛠 Installation

pip install avtomatika-worker[s3,pydantic]

For development:

pip install -e .[test,dev]

💻 Quick Start

from avtomatika_worker import Worker, TaskFiles

worker = Worker()

@worker.skill("hello_world")
async def my_skill(params: dict, files: TaskFiles):
    """Simple skill that says hello."""
    return {"message": f"Hello, {params.get('name', 'World')}!"}

@worker.on_command("reboot")
async def handle_reboot(command):
    print("Rebooting worker...")

if __name__ == "__main__":
    worker.run()

⚙️ Configuration

Controlled via environment variables:

  • ORCHESTRATORS_CONFIG: JSON list of orchestrator configs (URLs, priorities, weights).
  • ORCHESTRATOR_URL: Simple fallback if only one orchestrator is used (default: http://localhost:8080).
  • WORKER_TOKEN: Secret for HMAC signing (Zero Trust).
  • S3_ENDPOINT_URL, S3_ACCESS_KEY, S3_SECRET_KEY, S3_DEFAULT_BUCKET: Storage settings for large payloads.
  • STRICT_EVENT_VALIDATION: (Default: True) Validates events against schemas before emitting.
  • LOG_LEVEL: Logging verbosity (DEBUG, INFO, WARNING, ERROR).
  • POLL_BACKOFF_INITIAL: Initial delay (seconds) after a 429 error or network failure (default: 1.0). Honors Retry-After header.
  • POLL_BACKOFF_MAX: Maximum backoff delay (seconds) (default: 60.0).
  • POLL_BACKOFF_FACTOR: Multiplier for exponential backoff (default: 2.0).
  • MAX_CONCURRENT_TASKS: Global limit for concurrent task execution.

📜 License

Mozilla Public License v. 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

avtomatika_worker-1.0b16.tar.gz (65.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

avtomatika_worker-1.0b16-py3-none-any.whl (38.5 kB view details)

Uploaded Python 3

File details

Details for the file avtomatika_worker-1.0b16.tar.gz.

File metadata

  • Download URL: avtomatika_worker-1.0b16.tar.gz
  • Upload date:
  • Size: 65.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for avtomatika_worker-1.0b16.tar.gz
Algorithm Hash digest
SHA256 f6eac067fe7534c9182a734e72a9120414dc469989b71d4c0916e7bb7cf100fd
MD5 b130173375e3a03cc24904e4559459bc
BLAKE2b-256 939ac479293bcd30ac990ed83b6cf34f419947d0d0e87447097214ad8f5f33b7

See more details on using hashes here.

File details

Details for the file avtomatika_worker-1.0b16-py3-none-any.whl.

File metadata

File hashes

Hashes for avtomatika_worker-1.0b16-py3-none-any.whl
Algorithm Hash digest
SHA256 203da4504e5d0a31616dac392fd663814cd95c857e765f8bdde00a719b50dfa7
MD5 0948539ec7096b10ddc93a816257833c
BLAKE2b-256 c79cb30d5b1d65e0e365fc527e69cc2524ee5960269a5e909ac1d2dcaecdba7d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page