Worker SDK for the Avtomatika orchestrator.
Project description
Avtomatika Worker SDK
The official SDK for creating workers compatible with the Avtomatika Orchestrator. It handles polling, heartbeats, S3 payload offloading, and graceful shutdown so you can focus on your business logic.
Installation
pip install avtomatika-worker
Extras:
pip install "avtomatika-worker[s3]"— for S3 payload offloading (requiresobstore).pip install "avtomatika-worker[pydantic]"— for Pydantic-based parameter validation.pip install "avtomatika-worker[dev]"— for development features like CLI--reload.
Quick Start
Option 1: CLI Usage (Recommended)
Define your worker in a Python module (e.g., app/main.py):
from avtomatika_worker import Worker
worker = Worker(worker_type="image-processor")
@worker.task("resize")
async def resize_image(params: dict, **kwargs):
return {"status": "success", "data": {"result": "ok"}}
Option 2: Dynamic Skill Loading (No code changes)
Place your task handlers in the skills/ directory (e.g., skills/my_tasks.py):
from avtomatika_worker import SkillBlueprint
bp = SkillBlueprint()
@bp.task("generate_preview")
async def generate_preview(params: dict, **kwargs):
return {"status": "success"}
Run the worker, and it will automatically load all skills from the directory. For this to work, ensure your skills are in the ./skills folder, or specify a custom path via the WORKER_SKILLS_DIR environment variable or the Worker(skills_dir=...) constructor parameter (the constructor parameter takes precedence):
# It will look into ./skills by default
worker run --app app.main:worker
Key Features
1. Structured Logging
The SDK supports both human-readable and JSON logging.
LOG_FORMAT=json— for production (ELK, Grafana Loki).LOG_FORMAT=text— for development (default).- All logs automatically include
worker_id,task_id, andjob_idcontext.
2. Graceful Shutdown
Built-in handling of SIGTERM and SIGINT. When a signal is received, the worker:
- Enters "Drain Mode" (stops taking new tasks).
- Waits for active tasks to complete (configurable via
WORKER_SHUTDOWN_TIMEOUT). - Sends final heartbeats and closes connections.
3. File System & S3 Offloading
- TaskFiles: Async helper for isolated task workspaces.
- S3 Payload Offloading: Automatic download/upload of large files via S3 URIs in task parameters (requires
[s3]extra).
Configuration Reference
| Variable | Description | Default |
|---|---|---|
WORKER_ID |
Unique identifier for the worker instance. | UUID |
ORCHESTRATOR_URL |
Address of the orchestrator. | http://localhost:8080 |
LOG_FORMAT |
Log format: text or json. |
text |
LOG_LEVEL |
Minimum log level (DEBUG, INFO, etc). | INFO |
WORKER_SHUTDOWN_TIMEOUT |
Max seconds to wait for tasks during shutdown. | 30.0 |
WORKER_ENABLE_WEBSOCKETS |
Enable real-time commands (e.g., cancellation). | false |
TASK_FILES_DIR |
Local directory for temporary S3 payloads. | /tmp/payloads |
WORKER_SKILLS_DIR |
Directory to dynamically load skills from. | skills |
Documentation
- Development Guide — Detailed instructions on how to create custom workers, use middlewares, and handle S3 offloading.
Docker Usage
Use the provided Dockerfile for easy deployment:
docker build -t my-worker .
docker run -e ORCHESTRATOR_URL=... my-worker worker run --app app:worker
Development
Install development dependencies:
pip install -e .[test,dev]
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file avtomatika_worker-1.0b7.tar.gz.
File metadata
- Download URL: avtomatika_worker-1.0b7.tar.gz
- Upload date:
- Size: 40.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19c4edf4d19925d39de2f678244ee071a7925d58104b558f750e5181a1a817d5
|
|
| MD5 |
2cdd37d271527da05825612e8867c4c3
|
|
| BLAKE2b-256 |
36da1ae5f047e4ec95b25fa95c46e250a0883b35c7a270d6a92623b6aefed882
|
File details
Details for the file avtomatika_worker-1.0b7-py3-none-any.whl.
File metadata
- Download URL: avtomatika_worker-1.0b7-py3-none-any.whl
- Upload date:
- Size: 25.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0d06b74be8cdfd1f19510dfc083c1d91ad9d42107495bd6965261fd19d64183a
|
|
| MD5 |
1d0f428588d1c94a5088cba02742a257
|
|
| BLAKE2b-256 |
90479485b5b9164999adb19f91d4232092e3e57f588c40d20711a72936da86b6
|