External task library and client for Operaton
Project description
operaton-tasks
External task library and worker for https://operaton.org/
Use this package when you want to implement Operaton external service task workers in Python.
Installation
Install as a Python dependency:
pip install operaton-tasks
Install with CLI support:
pip install "operaton-tasks[cli]"
What this package provides
- A decorator API for registering handlers by topic.
- A long-polling worker that fetches, locks, executes, and completes/fails external tasks.
- Optional CLI command to run the worker app.
- Built-in health endpoint at /healthz.
Authentication
Requests to Operaton REST API can be authenticated in two supported ways.
Option 1: Static Authorization header
Set ENGINE_REST_AUTHORIZATION to the full header value.
Example:
export ENGINE_REST_AUTHORIZATION="Basic base64-user-pass"
# or
export ENGINE_REST_AUTHORIZATION="Bearer my-static-token"
Option 2: OAuth2 client credentials
Set the OAuth2 settings:
export OAUTH2_CLIENT_ID="my-client-id"
export OAUTH2_CLIENT_SECRET="my-client-secret"
export OAUTH2_TOKEN_URL="https://idp.example.com/realms/operaton/protocol/openid-connect/token"
export OAUTH2_SCOPES="scope-a scope-b" # optional, space-separated
When OAuth2 is configured, the worker automatically fetches and refreshes bearer tokens.
Authentication precedence
Authorization is resolved in this order:
- Explicit authorization argument passed to session/request helpers.
- OAuth2 client credentials token (if fully configured).
- ENGINE_REST_AUTHORIZATION.
With OAuth2 enabled, a 401 response triggers one token invalidation and one retry.
Configuration
Main environment variables:
- ENGINE_REST_BASE_URL (default: http://localhost:8080/engine-rest)
- ENGINE_REST_AUTHORIZATION (optional)
- OAUTH2_CLIENT_ID (optional)
- OAUTH2_CLIENT_SECRET (optional)
- OAUTH2_TOKEN_URL (optional)
- OAUTH2_SCOPES (optional)
- ENGINE_REST_TIMEOUT_SECONDS (default: 20)
- ENGINE_REST_POLL_TTL_SECONDS (default: 10)
- ENGINE_REST_LOCK_TTL_SECONDS (default: 30)
- TASKS_WORKER_ID (default: operaton-tasks-client)
- TASKS_HEARTBEAT_TOPIC (default: operaton.tasks.heartbeat)
- TASKS_MODULE (used by CLI/app startup to load your handlers module)
- TASKS_LIMIT (default: 0, stop after processing this many tasks)
- TASKS_RUN_TIMEOUT_SECONDS (default: 0, exit after this many seconds)
- LOG_LEVEL (default: DEBUG)
Usage as standalone worker (CLI)
This is the quickest way to run your handlers as a process.
- Create a handlers module, for example my_tasks.py.
- Run operaton-tasks and pass your module path.
Example handler module:
from operaton.tasks.types import CompleteExternalTaskDto
from operaton.tasks.types import ExternalTaskComplete
from operaton.tasks.types import LockedExternalTaskDto
from operaton.tasks.types import VariableValueDto
from operaton.tasks.types import VariableValueType
import operaton.tasks
@operaton.tasks.register("hello-world", localVariables=True)
async def hello_world(task: LockedExternalTaskDto) -> ExternalTaskComplete:
return ExternalTaskComplete(
task=task,
response=CompleteExternalTaskDto(
workerId=task.workerId,
localVariables={
"message": VariableValueDto(
value="Hello World",
type=VariableValueType.String,
),
},
),
)
Run it:
operaton-tasks serve ./my_tasks.py -- --host 0.0.0.0 --port 8000
Pass auth options directly on the CLI if preferred:
operaton-tasks serve ./my_tasks.py \
--base-url http://localhost:8080/engine-rest \
--authorization "Basic base64-user-pass"
Or with OAuth2:
operaton-tasks serve ./my_tasks.py \
--base-url http://localhost:8080/engine-rest \
--oauth2-client-id my-client-id \
--oauth2-client-secret my-client-secret \
--oauth2-token-url https://idp.example.com/realms/operaton/protocol/openid-connect/token \
--oauth2-scopes "scope-a scope-b"
Note: arguments after -- are passed through to uvicorn.
If you want the CLI to process a bounded number of tasks and then exit, pass the worker controls directly:
operaton-tasks serve ./my_tasks.py --limit 10 --run-timeout 60
When either --limit or --run-timeout is set, the CLI loads the handler module,
runs the worker in one-shot mode, and exits when the limit or timeout is reached.
Usage as a library/dependency
Use this mode when you already have your own Python app/runtime and want to embed the worker.
A minimal worker process without CLI
import asyncio
import operaton.tasks
from operaton.tasks.api import external_task_worker
from operaton.tasks.api import handlers
from operaton.tasks.types import CompleteExternalTaskDto
from operaton.tasks.types import ExternalTaskComplete
from operaton.tasks.types import LockedExternalTaskDto
@operaton.tasks.task("hello-world")
async def hello_world(task: LockedExternalTaskDto) -> ExternalTaskComplete:
return ExternalTaskComplete(
task=task,
response=CompleteExternalTaskDto(workerId=task.workerId),
)
if __name__ == "__main__":
asyncio.run(external_task_worker(handlers))
Configure auth and connection through environment variables, or set values programmatically through operaton.tasks.settings before starting the worker.
Embed into an existing FastAPI app
If you want to reuse built-in routes such as /healthz and run the worker in your own FastAPI service:
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from operaton.tasks.api import external_task_worker
from operaton.tasks.api import handlers
from operaton.tasks.api import router
@asynccontextmanager
async def lifespan(app: FastAPI):
asyncio.create_task(external_task_worker(handlers))
yield
app = FastAPI(lifespan=lifespan)
app.include_router(router)
Handler contract
- Handlers are async functions registered with @task(topic) or @register(topic).
- Input is LockedExternalTaskDto.
- Return ExternalTaskComplete for success (with CompleteExternalTaskDto or ExternalTaskBpmnError).
- If your handler raises an exception, it is converted to ExternalTaskFailure automatically.
Health endpoint
GET /healthz is included on the built-in router.
- Without heartbeat activity, it checks Operaton engine reachability.
- With heartbeat activity, it verifies recent heartbeat timestamps.
Development
This project uses devenv.sh for reproducible development environments with Nix.
Local Development
Enter the development shell:
devenv shell
# or
make shell
Build and link the virtual environment:
make env
Running Tests Locally
Run all checks (linting, type checking, unit tests):
make test
Run individual checks:
make check # treefmt, flake8, mypy
make test-pytest # Unit tests only
make devenv-test # Integration tests with Operaton + Keycloak services
Start background services (Operaton + Keycloak) for manual testing:
make devenv-up # Start services
make devenv-down # Stop services
Watch mode for development:
make watch # Run app in reload mode
make watch-tests # Continuously run mypy + pytest
CI/CD Pipelines
This project includes GitHub Actions and GitLab CI pipelines configured with devenv.sh best practices.
GitHub Actions (.github/workflows/):
ci.yml: Runs on push, pull requests, and manual dispatch. Executes linting, type checking, and integration tests.release.yml: Publishes to PyPI on version tags (v*).
GitLab CI (.gitlab-ci.yml):
- Lint stage: treefmt, flake8, mypy
- Test stage: Unit and integration tests with live Operaton + Keycloak services
- Build & Publish stages: Triggered on tags
Both pipelines use:
- Nix for reproducible environments (via
devenv) - Magic Nix Cache (GitHub) or Nix cache directories (GitLab) for faster builds
devenv shell -- make <target>to ensure identical environments between local development and CI
For PyPI publishing, configure:
- GitHub Actions: Use trusted publishers (OIDC) or set
PYPI_API_TOKENsecret - GitLab CI: Set
PYPI_TOKENCI/CD variable with your PyPI token
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file operaton_tasks-1.0b1.tar.gz.
File metadata
- Download URL: operaton_tasks-1.0b1.tar.gz
- Upload date:
- Size: 203.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6be00a8b64fabeee4d8734c6477dfb78c2397465238e34220388389514d1415f
|
|
| MD5 |
eda60db5a7e0c8f9c04dc12605324609
|
|
| BLAKE2b-256 |
fa50946374abe6505139b560582d220eae502c9ec3bcfb8a8bc77c0cb724c89b
|
File details
Details for the file operaton_tasks-1.0b1-py3-none-any.whl.
File metadata
- Download URL: operaton_tasks-1.0b1-py3-none-any.whl
- Upload date:
- Size: 66.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e11f7010f2729b7188dab662f45fbafa83c5c9a21434dbd895c00a7c95607803
|
|
| MD5 |
5be517fc13664cc9af399cc73e87d5c8
|
|
| BLAKE2b-256 |
53dae365155188788eb32fe69ae8f5eac4085ecc16c30911bc5b1e1234f4b634
|