Async Python runner SDK for Croniq — distributed job scheduling that just works.
Project description
Croniq Runner SDK for Python
Build job execution runners for Croniq in async Python. The SDK polls a Croniq server for work, dispatches your handlers, streams structured logs back, and reports completion — idiomatic asyncio + httpx + Pydantic v2.
Install
pip install croniq-runner
# Optional: OpenTelemetry tracing
pip install "croniq-runner[otel]"
Python 3.11+ required (asyncio.TaskGroup, tomllib).
Quick start
import asyncio
from croniq_runner import Runner, RunnerOptions
async def hello(ctx):
ctx.logger.info("hello from %s (attempt %d)", ctx.job_key, ctx.attempt)
await ctx.log("emitting a structured event", fields={"customer": "acme"})
async def main():
runner = Runner(RunnerOptions(
server_url="http://localhost:4000",
api_key="croniq_...",
capabilities=["billing"],
tags=["lang=python", "env=dev"],
max_inflight=5,
))
runner.add_handler("hello:world", hello)
await runner.run()
asyncio.run(main())
Stop the runner with Ctrl-C or by calling runner.request_drain() from
another coroutine — in-flight handlers get up to drain_timeout_ms to finish
before the loop returns.
Features
asyncio-first — every public coroutine returns awaitably; no sync surface.- Pydantic v2 DTOs mirroring
openapi.yamlsnake_case wire format. - Two-tier logging:
ctx.logger— standardlogging.Loggerscoped withexecution_id,job_key,runner_id,attempt.ctx.log_writer— streaming channel backed by a boundedasyncio.Queuewith batching (32 events / 200 ms / max 100 per POST), drained before the ack.
- Server-side cancellation —
PollResponse.cancelis honoured viactx.cancellation(anasyncio.Event). Handlers shouldawait ctx.cancellation.wait()between checkpoints, or just useawait asyncio.sleep(...)— the runner cancels the underlying task when the event fires. - Lease renewal — periodic
POST /v1/work/renewheartbeat for each in-flight execution. - Self-registration — pass
schedule="5m"toadd_handlerand the runner POSTs to/v1/jobs/registeron startup. - OpenTelemetry — opt-in via the
[otel]extra; spans wrap each handler invocation whenopentelemetry-apiis importable. Zero dependency otherwise.
Capabilities vs Tags
A common pitfall: don't put implementation details into capabilities. Capabilities drive server-side job routing (require/prefer in the Croniqfile). Tags are filter-only — they show up in the UI and operational views but don't influence routing.
| Good capability | Bad capability |
|---|---|
billing, reporting, gpu, sandboxed |
python, linux-x64, dotnet |
If your runner is Python-based, that belongs in tags (lang=python, platform=linux-x64), not capabilities — so a future Rust- or .NET-runner with the same business capabilities can take over without rewriting Croniqfile entries.
Handler API
A handler is any async def fn(ctx: ExecutionContext) -> None. The ctx
exposes:
| Attribute | Meaning |
|---|---|
execution_id |
Server-assigned execution identifier |
job_key |
E.g. "billing:invoice" |
attempt |
1-based attempt counter (incremented on retry) |
metadata |
Raw dict from the server (job-specific schema) |
timeout |
datetime.timedelta declared by the server |
runner_id, runner_tags |
This runner's identity |
cancellation |
asyncio.Event — fires on host shutdown or server-initiated cancel |
logger |
logging.Logger pre-scoped with execution identifiers |
log_writer |
Streaming LogWriter (created lazily on first access) |
Two ways to control the ack failure message:
from croniq_runner import HandlerError
async def my_handler(ctx):
if not data_available():
raise HandlerError("upstream feed unavailable") # ack.error = "upstream feed unavailable"
Any other exception's str(exc) is forwarded as the error message.
Configuration
| Option | Default | Meaning |
|---|---|---|
server_url |
http://localhost:4000 |
Croniq server base URL |
runner_id |
resolved at start | Stable runner identifier — see resolution order in _identity.py |
api_key / bearer_token |
None |
Auth header (ApiKey preferred when both set) |
capabilities |
[] |
Capabilities advertised to the server |
tags |
[] |
Free-form key=value tags |
max_inflight |
5 |
Concurrent in-flight executions |
poll_timeout_ms |
35_000 |
Per-request long-poll timeout |
renew_interval_ms |
15_000 |
Lease-renewal heartbeat interval |
drain_timeout_ms |
30_000 |
Wait budget for handlers on shutdown |
poll_retry_delay_ms |
5_000 |
Back-off after a failed poll |
capacity_backoff_ms |
500 |
Idle delay at max_inflight |
log_writer |
LogWriterOptions() |
Streaming-log tunables |
Streaming logs example
from croniq_runner import LogLevel
async def long_job(ctx):
async with ctx.log_writer as writer: # type: ignore[reportInvalidUsage]
async for line in slow_generator():
await writer.write(line, level=LogLevel.INFO)
You don't actually need async with — the runner drains the writer before
the ack regardless. Calling aclose() yourself just lets you control when
the drain happens (e.g. before a downstream API call that should see the
events first).
Conformance suite
The Python binding for the language-agnostic conformance
suite lives under tests/conformance/. Every
case is one pytest parameter; run them with:
pip install -e ".[dev]"
pytest tests/conformance
The cases live at sdks/conformance/cases/*.yaml and are loaded by file —
adding a new YAML automatically adds a new test.
Development
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev,otel]"
ruff check .
mypy
pytest
Releasing
Releases run via .github/workflows/python-sdk-release.yml and upload to PyPI through Trusted Publishing (OIDC — no API token in the repo).
- Bump
version = "X.Y.Z"inpyproject.toml. - Add a
## [X.Y.Z]section toCHANGELOG.md. - Commit, push to
main. - Tag and push:
git tag python-sdk-vX.Y.Z git push origin python-sdk-vX.Y.Z
- The workflow builds, verifies the tag matches
pyproject.toml, publishes to PyPI, then attaches the wheel + sdist to a GitHub Release.
One-time PyPI setup (project owner): add a Pending Publisher on pypi.org with project croniq-runner, owner nuetzliches, repository croniq, workflow python-sdk-release.yml, environment pypi.
License
Dual-licensed under MIT or Apache-2.0, matching the rest of the Croniq repository.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file croniq_runner-0.1.0.tar.gz.
File metadata
- Download URL: croniq_runner-0.1.0.tar.gz
- Upload date:
- Size: 17.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a19723a8124fc5efd17f8bba386a6c78532514e4e862320dd0dff75104b45bae
|
|
| MD5 |
36f8688610ca7a78379c143176177406
|
|
| BLAKE2b-256 |
90043e55e027cdafa98b4ccb036c7a8221e6fbc2e7c1d174558c02c9a818d48b
|
Provenance
The following attestation bundles were made for croniq_runner-0.1.0.tar.gz:
Publisher:
python-sdk-release.yml on nuetzliches/croniq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
croniq_runner-0.1.0.tar.gz -
Subject digest:
a19723a8124fc5efd17f8bba386a6c78532514e4e862320dd0dff75104b45bae - Sigstore transparency entry: 1631334660
- Sigstore integration time:
-
Permalink:
nuetzliches/croniq@6e39c1b22936edec1c9d55ed918c525f2cdfe5d9 -
Branch / Tag:
refs/tags/python-sdk-v0.1.0 - Owner: https://github.com/nuetzliches
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-sdk-release.yml@6e39c1b22936edec1c9d55ed918c525f2cdfe5d9 -
Trigger Event:
push
-
Statement type:
File details
Details for the file croniq_runner-0.1.0-py3-none-any.whl.
File metadata
- Download URL: croniq_runner-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f9b62dee5c2d0b2a1fc9b881b66de3fd2c3a27e5bf88043ca20aa0ec744ecfd5
|
|
| MD5 |
375bc35f60a1a816d523f9fa45d50b27
|
|
| BLAKE2b-256 |
c8f2a9a98ea384451332527a31391fa5e9ca642a2174ac42c279ade78698fbd7
|
Provenance
The following attestation bundles were made for croniq_runner-0.1.0-py3-none-any.whl:
Publisher:
python-sdk-release.yml on nuetzliches/croniq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
croniq_runner-0.1.0-py3-none-any.whl -
Subject digest:
f9b62dee5c2d0b2a1fc9b881b66de3fd2c3a27e5bf88043ca20aa0ec744ecfd5 - Sigstore transparency entry: 1631334661
- Sigstore integration time:
-
Permalink:
nuetzliches/croniq@6e39c1b22936edec1c9d55ed918c525f2cdfe5d9 -
Branch / Tag:
refs/tags/python-sdk-v0.1.0 - Owner: https://github.com/nuetzliches
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-sdk-release.yml@6e39c1b22936edec1c9d55ed918c525f2cdfe5d9 -
Trigger Event:
push
-
Statement type: