Skip to main content

Worker SDK for the Avtomatika orchestrator.

Project description

Avtomatika Worker SDK

Official SDK for building workers compatible with the Avtomatika orchestrator. It automates low-level tasks: polling, heartbeats, S3 payload management, and graceful shutdown.

🚀 Key Features

  • Language: Python 3.11+
  • Protocol: Based on RXON (Reverse Axon Protocol) for Hierarchical Logic Networks (Holarchy).
  • Communication Model:
    • PULL: Workers poll tasks from orchestrators (works behind NAT/Firewall).
    • WebSocket: Real-time command channel (cancellation, custom commands).
  • Zero Trust Security:
    • Mandatory HMAC SHA256 signing for all messages using WORKER_TOKEN.
    • Identity Chain and Origin Worker ID support for provenance tracking.
    • Replay protection with timestamp validation.
  • Traffic Optimization:
    • 3-Tier Skills: Supported (catalog), Available (dynamic limits), and Hot (cached).
    • Stable Hashing: Sends full skill catalog only when changed, using skills_hash for light heartbeats.
  • S3 Streaming: High-performance data transfer using obstore. No OOM on large files.
  • Hardware Awareness: Built-in monitoring for CPU, RAM, and NVIDIA GPUs (via psutil and GPUtil).

🛡 Resilience & Connectivity

  • Independent Managers: Connection to each orchestrator is managed by a separate background task. One server failure doesn't affect others.
  • Infinite Retries: Exponential backoff for registration if the orchestrator is unavailable.
  • Non-blocking Startup: The worker starts polling as soon as it registers successfully with at least one orchestrator.
  • Graceful Shutdown: Handles SIGTERM and SIGINT properly, waiting for active tasks to finish.

🛠 Installation

pip install avtomatika-worker[s3,pydantic]

For development:

pip install -e .[test,dev]

💻 Quick Start

from avtomatika_worker import Worker, TaskFiles

worker = Worker()

@worker.skill("hello_world")
async def my_skill(params: dict, files: TaskFiles):
    """Simple skill that says hello."""
    return {"message": f"Hello, {params.get('name', 'World')}!"}

@worker.on_command("reboot")
async def handle_reboot(command):
    print("Rebooting worker...")

if __name__ == "__main__":
    worker.run()

⚙️ Configuration

Controlled via environment variables:

  • ORCHESTRATORS_CONFIG: JSON list of orchestrator configs (URLs, priorities, weights).
  • ORCHESTRATOR_URL: Simple fallback if only one orchestrator is used (default: http://localhost:8080).
  • WORKER_TOKEN: Secret for HMAC signing (Zero Trust).
  • S3_ENDPOINT_URL, S3_ACCESS_KEY, S3_SECRET_KEY, S3_DEFAULT_BUCKET: Storage settings for large payloads.
  • STRICT_EVENT_VALIDATION: (Default: True) Validates events against schemas before emitting.
  • LOG_LEVEL: Logging verbosity (DEBUG, INFO, WARNING, ERROR).
  • POLL_BACKOFF_INITIAL: Initial delay (seconds) after a 429 error or network failure (default: 1.0). Honors Retry-After header.
  • POLL_BACKOFF_MAX: Maximum backoff delay (seconds) (default: 60.0).
  • POLL_BACKOFF_FACTOR: Multiplier for exponential backoff (default: 2.0).
  • MAX_CONCURRENT_TASKS: Global limit for concurrent task execution.

📜 License

Mozilla Public License v. 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

avtomatika_worker-1.0b15.tar.gz (62.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

avtomatika_worker-1.0b15-py3-none-any.whl (37.5 kB view details)

Uploaded Python 3

File details

Details for the file avtomatika_worker-1.0b15.tar.gz.

File metadata

  • Download URL: avtomatika_worker-1.0b15.tar.gz
  • Upload date:
  • Size: 62.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for avtomatika_worker-1.0b15.tar.gz
Algorithm Hash digest
SHA256 8241110637c562263af81280da26ce8f790151a99d345a42404c4ec103a1c1e0
MD5 b26ed60a6d785dcf135a441d7697adf1
BLAKE2b-256 15ee32cf697e20859bf68355021f8afff5200a778241ee30be416902c7afce31

See more details on using hashes here.

File details

Details for the file avtomatika_worker-1.0b15-py3-none-any.whl.

File metadata

File hashes

Hashes for avtomatika_worker-1.0b15-py3-none-any.whl
Algorithm Hash digest
SHA256 342ac2976cfb1a24639f42d055dccddd4cdf8a2bb0b82783c745bf4571acfc45
MD5 8287ce9960328005d704d1e84e9b0a8a
BLAKE2b-256 0abe05ffae230cbe6aa47e54cfa53c18096a17858f1365adca3b50a9b2f7967d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page