Python SDK for the Rochallor Workflow engine
Project description
workflow-sdk-python
Python SDK for the Rochallor Workflow engine. Provides a polling worker, REST client, and Prometheus metrics — all with zero framework dependencies.
Python 3.10+ required.
Installation
# From the repository root:
pip install -e "workflow-sdk-python"
# With dev dependencies (pytest, mypy, etc.):
pip install -e "workflow-sdk-python[dev]"
Quick Start: Worker
import signal
import threading
from workflow_sdk.client.rest import RestEngineClient
from workflow_sdk.handler.registry import HandlerRegistry
from workflow_sdk.runner.runner import Runner
# 1. Create the REST client pointing at your engine
client = RestEngineClient("http://localhost:8080")
# 2. Register handlers by job type
registry = HandlerRegistry()
@registry.register("process-order")
def handle_process_order(ctx: dict) -> dict:
order_id = ctx["variables"].get("orderId")
# ... do work ...
return {"status": "processed", "orderId": order_id}
@registry.register("send-notification")
def handle_notification(ctx: dict) -> dict:
# Raise NonRetryableError for permanent failures
from workflow_sdk.errors import NonRetryableError
if not ctx["variables"].get("email"):
raise NonRetryableError("email address is required")
return {}
# 3. Start the runner (blocks until stop_event is set)
stop = threading.Event()
signal.signal(signal.SIGINT, lambda *_: stop.set())
signal.signal(signal.SIGTERM, lambda *_: stop.set())
runner = Runner(
client=client,
registry=registry,
worker_id="py-worker-1",
parallelism=16, # concurrent job handlers (default 64)
poll_interval=0.5, # seconds between polls (default 0.5)
)
runner.run(stop_event=stop)
API Reference
RestEngineClient
from workflow_sdk.client.rest import RestEngineClient
client = RestEngineClient(base_url, timeout=30.0)
| Method | Description |
|---|---|
upload_definition(definition) |
Upload a workflow definition JSON; returns definition summary |
get_definition(definition_id) |
Fetch a definition by ID |
list_definitions(keyword="", page=0, page_size=20) |
Paginated list of definitions |
start_instance(definition_id, variables=None, ...) |
Start a workflow instance; returns instance summary |
get_instance(instance_id) |
Fetch instance state |
get_instance_history(instance_id) |
List step executions for an instance |
cancel_instance(instance_id, reason="") |
Cancel a running instance |
poll_jobs(worker_id, job_types, max_jobs=1) |
Claim jobs (used by Runner automatically) |
complete_job(job_id, worker_id, variables=None) |
Mark job complete with output variables |
fail_job(job_id, worker_id, error_message, retryable=True) |
Record job failure |
complete_user_task(task_id, completed_by="", result=None) |
Complete a user task |
close() |
Release the underlying HTTP connection pool |
RestEngineClient is a context manager:
with RestEngineClient("http://localhost:8080") as client:
instance = client.start_instance("my-workflow")
HandlerRegistry
from workflow_sdk.handler.registry import HandlerRegistry
registry = HandlerRegistry()
# Register via decorator
@registry.register("my-job-type")
def handler(ctx: dict) -> dict:
return {"result": "ok"}
# Or register directly
registry.register("other-type", lambda ctx: {"x": 1})
# Inspect registered types
registry.job_types() # -> ["my-job-type", "other-type"]
Handler signature: (ctx: dict) -> dict | None
The ctx dict contains:
| Key | Type | Description |
|---|---|---|
id |
str |
Job ID |
jobType |
str |
Handler key |
instanceId |
str |
Workflow instance ID |
stepId |
str |
Step ID in the definition |
stepExecutionId |
str |
Unique execution ID for this attempt |
retriesRemaining |
int |
Retries left before permanent failure |
variables |
dict |
Input variables from the workflow |
lockExpiresAt |
str |
ISO-8601 timestamp when the job lock expires |
Runner
from workflow_sdk.runner.runner import Runner
runner = Runner(
client=client, # RestEngineClient (or any EngineClient implementor)
registry=registry, # HandlerRegistry with at least one handler
worker_id="my-worker", # Unique worker ID (shown in engine logs)
parallelism=64, # Max concurrent handler threads (default 64)
poll_interval=0.5, # Poll interval in seconds (default 0.5)
metrics=None, # Optional Metrics instance for Prometheus
)
stop = threading.Event()
runner.run(stop_event=stop) # Blocks until stop_event is set
The runner drains all in-flight jobs before returning after stop_event is set.
Errors
from workflow_sdk.errors import NonRetryableError, EngineClientError, WorkflowSDKError
| Exception | When to use |
|---|---|
NonRetryableError |
Raise inside a handler to mark the job failed permanently (no retry) |
EngineClientError |
Raised by RestEngineClient on HTTP 4xx/5xx responses; has .status_code and .message |
WorkflowSDKError |
Base class; raised on connection errors |
Any other exception raised by a handler causes the job to fail with retryable=True.
Metrics
from prometheus_client import CollectorRegistry
from workflow_sdk.metrics.metrics import Metrics
# Use an isolated registry (recommended in tests / multi-worker setups)
reg = CollectorRegistry()
m = Metrics(registry=reg)
runner = Runner(client=client, registry=registry, worker_id="w1", metrics=m)
| Metric | Type | Labels | Description |
|---|---|---|---|
workflow_sdk_poll_latency_seconds |
Histogram | — | Time spent in each poll_jobs call |
workflow_sdk_lock_conflicts_total |
Counter | — | Empty poll rounds (no jobs claimed) |
workflow_sdk_handler_latency_seconds |
Histogram | job_type |
Handler execution time |
workflow_sdk_retries_total |
Counter | job_type |
Jobs retried after transient failure |
workflow_sdk_jobs_completed_total |
Counter | job_type, outcome |
Completed jobs; outcome is success or failure |
Expose metrics via prometheus_client.start_http_server(port) in your worker process.
Management Operations Example
The following script uploads a definition, starts an instance, polls until it completes, then prints the final variables:
import time
from workflow_sdk.client.rest import RestEngineClient
client = RestEngineClient("http://localhost:8080")
# 1. Upload a simple one-step workflow
definition = {
"id": "echo-workflow",
"name": "Echo Workflow",
"steps": [
{
"id": "echo",
"type": "SERVICE_TASK",
"jobType": "echo",
"next": "end"
},
{"id": "end", "type": "END"}
]
}
uploaded = client.upload_definition(definition)
print(f"Definition: {uploaded['id']} v{uploaded['version']}")
# 2. Start an instance
instance = client.start_instance("echo-workflow", variables={"message": "hello"})
instance_id = instance["id"]
print(f"Instance started: {instance_id}")
# 3. Poll instance status until completed or failed
for _ in range(30):
state = client.get_instance(instance_id)
status = state["status"]
print(f" Status: {status}")
if status in ("COMPLETED", "FAILED", "CANCELLED"):
break
time.sleep(1)
# 4. Print execution history
history = client.get_instance_history(instance_id)
for step in history:
print(f" Step {step['stepId']}: {step['status']}")
# 5. List all definitions
page = client.list_definitions()
print(f"Total definitions: {page['total']}")
Running Tests
cd workflow-sdk-python
pytest tests/ -v
Expected: 52 tests pass in < 1 second. No running engine required — all HTTP interactions are mocked via pytest-httpx.
Type Checking
mypy src/
The package ships a py.typed marker (PEP 561). All public APIs have complete type annotations.
Backoff Configuration
The SDK uses exponential backoff when retrying failed jobs (constants in src/workflow_sdk/retry/backoff.py):
| Constant | Value | Description |
|---|---|---|
BASE_DELAY |
0.1 s | Initial delay before first retry |
FACTOR |
2.0 | Exponential growth factor |
JITTER_FRAC |
0.20 | ±20% random jitter per step |
MAX_DELAY |
30.0 s | Maximum delay cap |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rochallor_sdk-1.0.0.tar.gz.
File metadata
- Download URL: rochallor_sdk-1.0.0.tar.gz
- Upload date:
- Size: 26.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
88ab10f72513d2987b35d4b0c3908f1a5486ebfa18e695b8e677a2e3299a3199
|
|
| MD5 |
9fe8898eecf11eb305a958b594af4635
|
|
| BLAKE2b-256 |
2e52b3b9277b2155aeab5f4306293630b8cab8752a0d013973af824329c34c23
|
Provenance
The following attestation bundles were made for rochallor_sdk-1.0.0.tar.gz:
Publisher:
publish.yml on batnam/rochallor-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rochallor_sdk-1.0.0.tar.gz -
Subject digest:
88ab10f72513d2987b35d4b0c3908f1a5486ebfa18e695b8e677a2e3299a3199 - Sigstore transparency entry: 1552085267
- Sigstore integration time:
-
Permalink:
batnam/rochallor-engine@864c29ea25884c38766d19619522d1a64f4b4b86 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/batnam
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@864c29ea25884c38766d19619522d1a64f4b4b86 -
Trigger Event:
push
-
Statement type:
File details
Details for the file rochallor_sdk-1.0.0-py3-none-any.whl.
File metadata
- Download URL: rochallor_sdk-1.0.0-py3-none-any.whl
- Upload date:
- Size: 25.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6ad0276a73e12e860d86117d4c11d68455e9d067557a59708a86f40d6fed415f
|
|
| MD5 |
dea108cc62e2f3e15cc619260f7f2ac5
|
|
| BLAKE2b-256 |
83b5f07d16d7cd391e53a6ea36789340ebe53f8103f04c9e7af420f22b310bab
|
Provenance
The following attestation bundles were made for rochallor_sdk-1.0.0-py3-none-any.whl:
Publisher:
publish.yml on batnam/rochallor-engine
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rochallor_sdk-1.0.0-py3-none-any.whl -
Subject digest:
6ad0276a73e12e860d86117d4c11d68455e9d067557a59708a86f40d6fed415f - Sigstore transparency entry: 1552085363
- Sigstore integration time:
-
Permalink:
batnam/rochallor-engine@864c29ea25884c38766d19619522d1a64f4b4b86 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/batnam
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@864c29ea25884c38766d19619522d1a64f4b4b86 -
Trigger Event:
push
-
Statement type: