Worker SDK for the Avtomatika orchestrator.
Project description
Avtomatika Worker SDK
The official SDK for creating workers compatible with the Avtomatika Orchestrator. It handles polling, heartbeats, S3 payload offloading, and graceful shutdown so you can focus on your business logic.
Installation
pip install avtomatika-worker
Recommended for full features:
pip install "avtomatika-worker[s3,pydantic,metrics]"
Extras:
[s3]— for S3 payload offloading (requiresobstore).[pydantic]— for Pydantic-based parameter validation.[metrics]— for OpenTelemetry tracing and metrics.[dev]— for development features like CLI--reload.
Quick Start
Option 1: CLI Usage (Recommended)
Define your worker in a Python module (e.g., app/main.py). The SDK automatically infers skill names and schemas from your code!
from avtomatika_worker import Worker
from pydantic import BaseModel
worker = Worker(worker_type="image-processor")
class ResizeParams(BaseModel):
width: int
height: int
url: str
# Automatic: name="resize", schema from ResizeParams
@worker.skill()
async def resize(params: ResizeParams):
print(f"Resizing to {params.width}px")
return {"status": "success", "data": {"result": "ok"}}
Option 2: Dynamic Skill Loading
Place your skill handlers in the skills/ directory (e.g., skills/my_skills.py):
from avtomatika_worker import SkillBlueprint
bp = SkillBlueprint()
# Add metadata for the Marketplace (optional)
@bp.skill(price=0.5, category="AI")
async def generate_preview(params: dict):
return {"status": "success"}
Run the worker, and it will automatically load all skills from the directory:
# It will look into ./skills by default
worker run --app app.main:worker
Key Features
1. Zero-Trust Security (HLN Identity Chain)
- Cryptographic Signatures: When
WORKER_TOKENis configured, the SDK automatically signs all registrations, heartbeats, task results, and events using HMAC SHA256. This ensures payload integrity and prevents unauthorized worker spoofing. - Robust Hash Consistency: The payload is recursively cleaned from
Nonevalues prior to signing to guarantee identical signature hashes across different systems and programming languages. Dynamic elements (likebubbling_chain) are automatically excluded from the signature calculation to allow HLN network traversal without breaking cryptographic integrity.
2. Smart Skill Registration
- Zero Configuration: Names and schemas are inferred from function names and type hints.
- Auto-Contracts: Both
input_schemaandoutput_schemaare automatically generated from Pydantic models or standard Dataclasses. - Generic Events: Declare custom events via
@worker.skill(events={"alert": Schema})and emit them using thesend_eventhelper. Progress is also a system event.
3. Optimized Network Traffic & Performance
- Skills Hashing: Workers only send the full skill list when it actually changes. Periodic heartbeats use a lightweight
skills_hash. - Thundering Herd Prevention: The SDK automatically applies
next_heartbeat_jitter_msprovided by the Orchestrator to prevent network spikes when thousands of workers restart simultaneously. - Self-Healing Sync: If the orchestrator loses worker metadata, it can trigger a
Full Syncvia heartbeat response, ensuring seamless recovery.
4. Multi-Orchestrator Support (Waterfall Priority)
- Waterfall Strategy: By default, the worker polls orchestrators in order of their priority. It always returns to the highest-priority orchestrator after completing any task, ensuring VIP tasks are handled first.
- Failover & Round Robin: Alternative strategies for load balancing and high availability.
4. Observability (OpenTelemetry)
- Distributed Tracing: Every task execution creates an OpenTelemetry Span. S3 operations are tracked as child spans, providing full visibility in Jaeger/Tempo.
- Metrics: Built-in Prometheus-compatible metrics for task count, duration, and S3 performance. Available at
http://localhost:8083/metrics(if[metrics]extra is installed).
5. Fail-Fast Validation
- Local Enforcement: The SDK validates task results and events against their declared schemas locally. Errors are logged immediately, preventing the transmission of "broken" data.
6. Structured Logging
The SDK supports both human-readable and JSON logging.
LOG_FORMAT=json— for production (ELK, Grafana Loki).LOG_FORMAT=text— for development (default).- All logs automatically include
worker_id,task_id, andjob_idcontext.
7. File System & S3 Reliability
- TaskFiles: Async helper for isolated task workspaces.
- S3 SDK: High-performance async uploads/downloads with automatic retries and Graceful Shutdown (waits for pending uploads before exit).
Configuration Reference
| Variable | Description | Default |
|---|---|---|
WORKER_ID |
Unique identifier for the worker instance. | UUID |
ORCHESTRATOR_URL |
Address of the orchestrator. | http://localhost:8080 |
LOG_FORMAT |
Log format: text or json. |
text |
LOG_LEVEL |
Minimum log level (DEBUG, INFO, etc). | INFO |
WORKER_PORT |
Port for health-check server. | 8083 |
WORKER_SHUTDOWN_TIMEOUT |
Max seconds to wait for tasks during shutdown. | 30.0 |
WORKER_ENABLE_WEBSOCKETS |
Enable real-time commands (e.g., cancellation). | false |
MULTI_ORCHESTRATOR_MODE |
Polling strategy: WATERFALL, ROUND_ROBIN, FAILOVER. |
WATERFALL |
WORKER_ENABLE_METRICS |
Enable OpenTelemetry metrics and tracing. | false |
REGISTRATION_RETRY_INITIAL_DELAY |
Initial delay for registration retries (sec). | 1.0 |
REGISTRATION_RETRY_MAX_DELAY |
Maximum delay for registration retries (sec). | 60.0 |
TASK_FILES_DIR |
Local directory for temporary S3 payloads. | /tmp/payloads |
WORKER_SKILLS_DIR |
Directory to dynamically load skills from. | skills |
Documentation
- Development Guide — Detailed instructions on how to create custom workers, use middlewares, and handle S3 offloading.
Docker Usage
Use the provided Dockerfile for easy deployment:
docker build -t my-worker .
docker run -e ORCHESTRATOR_URL=... my-worker worker run --app app:worker
Development
Install development dependencies:
pip install -e .[dev]
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file avtomatika_worker-1.0b13.tar.gz.
File metadata
- Download URL: avtomatika_worker-1.0b13.tar.gz
- Upload date:
- Size: 61.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b1e17398189544d1c79cf2314bd3883d39cb56fe4eb9cd24a33a4704f72ec4cc
|
|
| MD5 |
aa12b0fcecf462508e7bb1a9eff7f751
|
|
| BLAKE2b-256 |
93a2b6ec3d6d63764e6c0da05c54a10bc4c3aeca0efb0b1b80a8a08f7d695b0c
|
File details
Details for the file avtomatika_worker-1.0b13-py3-none-any.whl.
File metadata
- Download URL: avtomatika_worker-1.0b13-py3-none-any.whl
- Upload date:
- Size: 38.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d70fcdede20d0a144314fd1ea1fc5fe74cbf6781c25a83eb93dcbfa520c61401
|
|
| MD5 |
34b8b00e4dd4b177071d087cb07a0d74
|
|
| BLAKE2b-256 |
fe9f1c6a6a7291cb81a29a68550fb1d6af278a7ca8bf08655aab855066a02510
|