Skip to main content

Worker SDK for the Avtomatika orchestrator.

Project description

EN | ES | RU

Avtomatika Worker SDK

License: MIT Python 3.11+ Code Style: Ruff

The official SDK for creating workers compatible with the Avtomatika Orchestrator. It handles polling, heartbeats, S3 payload offloading, and graceful shutdown so you can focus on your business logic.

Installation

pip install avtomatika-worker

Extras:

  • pip install "avtomatika-worker[s3]" — for S3 payload offloading (requires obstore).
  • pip install "avtomatika-worker[pydantic]" — for Pydantic-based parameter validation.
  • pip install "avtomatika-worker[dev]" — for development features like CLI --reload.

Quick Start

Option 1: CLI Usage (Recommended)

Define your worker in a Python module (e.g., app/main.py):

from avtomatika_worker import Worker

worker = Worker(worker_type="image-processor")

@worker.task("resize")
async def resize_image(params: dict, **kwargs):
    return {"status": "success", "data": {"result": "ok"}}

Option 2: Dynamic Skill Loading (No code changes)

Place your task handlers in the skills/ directory (e.g., skills/my_tasks.py):

from avtomatika_worker import SkillBlueprint

bp = SkillBlueprint()

@bp.task("generate_preview")
async def generate_preview(params: dict, **kwargs):
    return {"status": "success"}

Run the worker, and it will automatically load all skills from the directory. For this to work, ensure your skills are in the ./skills folder, or specify a custom path via the WORKER_SKILLS_DIR environment variable or the Worker(skills_dir=...) constructor parameter (the constructor parameter takes precedence):

# It will look into ./skills by default
worker run --app app.main:worker

Key Features

1. Structured Logging

The SDK supports both human-readable and JSON logging.

  • LOG_FORMAT=json — for production (ELK, Grafana Loki).
  • LOG_FORMAT=text — for development (default).
  • All logs automatically include worker_id, task_id, and job_id context.

2. Graceful Shutdown

Built-in handling of SIGTERM and SIGINT. When a signal is received, the worker:

  1. Enters "Drain Mode" (stops taking new tasks).
  2. Waits for active tasks to complete (configurable via WORKER_SHUTDOWN_TIMEOUT).
  3. Sends final heartbeats and closes connections.

3. File System & S3 Offloading

  • TaskFiles: Async helper for isolated task workspaces.
  • S3 Payload Offloading: Automatic download/upload of large files via S3 URIs in task parameters (requires [s3] extra).

Configuration Reference

Variable Description Default
WORKER_ID Unique identifier for the worker instance. UUID
ORCHESTRATOR_URL Address of the orchestrator. http://localhost:8080
LOG_FORMAT Log format: text or json. text
LOG_LEVEL Minimum log level (DEBUG, INFO, etc). INFO
WORKER_SHUTDOWN_TIMEOUT Max seconds to wait for tasks during shutdown. 30.0
WORKER_ENABLE_WEBSOCKETS Enable real-time commands (e.g., cancellation). false
TASK_FILES_DIR Local directory for temporary S3 payloads. /tmp/payloads
WORKER_SKILLS_DIR Directory to dynamically load skills from. skills

Documentation

  • Development Guide — Detailed instructions on how to create custom workers, use middlewares, and handle S3 offloading.

Docker Usage

Use the provided Dockerfile for easy deployment:

docker build -t my-worker .
docker run -e ORCHESTRATOR_URL=... my-worker worker run --app app:worker

Development

Install development dependencies:

pip install -e .[test,dev]
pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

avtomatika_worker-1.0b7.tar.gz (40.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

avtomatika_worker-1.0b7-py3-none-any.whl (25.3 kB view details)

Uploaded Python 3

File details

Details for the file avtomatika_worker-1.0b7.tar.gz.

File metadata

  • Download URL: avtomatika_worker-1.0b7.tar.gz
  • Upload date:
  • Size: 40.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for avtomatika_worker-1.0b7.tar.gz
Algorithm Hash digest
SHA256 19c4edf4d19925d39de2f678244ee071a7925d58104b558f750e5181a1a817d5
MD5 2cdd37d271527da05825612e8867c4c3
BLAKE2b-256 36da1ae5f047e4ec95b25fa95c46e250a0883b35c7a270d6a92623b6aefed882

See more details on using hashes here.

File details

Details for the file avtomatika_worker-1.0b7-py3-none-any.whl.

File metadata

File hashes

Hashes for avtomatika_worker-1.0b7-py3-none-any.whl
Algorithm Hash digest
SHA256 0d06b74be8cdfd1f19510dfc083c1d91ad9d42107495bd6965261fd19d64183a
MD5 1d0f428588d1c94a5088cba02742a257
BLAKE2b-256 90479485b5b9164999adb19f91d4232092e3e57f588c40d20711a72936da86b6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page