Backend-agnostic job processing toolkit
Project description
Pyjobkit
Pyjobkit is a backend-agnostic toolkit for building reliable asynchronous job processing systems. It provides an Engine facade for enqueueing work, a cooperative asyncio Worker, a set of executor contracts, and pluggable queue backends so you can adapt the runtime to your infrastructure with minimal glue code.
Features
- Backends - SQL (
SQLBackend, Postgres / MySQL / SQLite via SQLAlchemy) for production,MemoryBackendfor tests, optionalRedisBackend(preview). All implement the sameQueueBackendABC. - Async worker -
Workerbuilt onasyncio.TaskGroupwith concurrency limits, batch polling, lease extension, optimistic locking, configurable watchdog, optional heartbeat callback, and SIGTERM / SIGINT graceful shutdown. - Executors -
SubprocessExecutor(with optional command allowlist),HttpExecutor, and an optionalDockerExecutor. Custom executors implementExecutor; third-party packages can register via thepyjobkit.executorsentry-point group. - Retry policies -
FixedDelay/ExponentialBackoff/JitteredExponentialBackoff, configurable per worker or per job, with an optional wall-clockgive_up_after_age_scap. - Scheduling -
Engine.enqueue_at(when)/enqueue_in(delay)for delayed jobs,pyjobkit.scheduler.Schedulerfor periodic enqueues, plus job-leveltags,shadow(dry-run), and per-jobretry_policyoverrides. - Workflows -
Engine.chain(step_a, step_b, ...)runs steps sequentially and threads the previous result throughpayload["previous_result"];Engine.enqueue_many(...)bulk-inserts a batch in one round trip. - Routing -
Engine.set_router(callable)(sync or async) rewrites kinds based on payload;Worker(kinds=..., tags=...)filters claimed jobs. - Rate limiting - per-kind token bucket configured via the worker, the CLI, or TOML.
- Observability - JSON log formatter (
pyjobkit.logging.JsonFormatter),ctx.profile_phase(...), Prometheus/metricsexporter, optional OpenTelemetry spans on enqueue / execute (with W3C trace context propagation through the payload), and webhook notifications on terminal states (HMAC-signed whenPYJOBKIT_WEBHOOK_SECRETis set). - CLI -
pyjobkit(worker with--once,--kind,--config,--log-format, etc.),pyjobkit-simulate(run a JSON job file against the in-memory backend),pyjobkit-migrate(Alembic migrations), andpyjobkit-prune(retention). - REST + dashboard - optional FastAPI router (
pyjobkit.integrations.fastapi.make_routerwith adependencies=[Depends(...)]auth hook) plus a bundled HTML dashboard and a TypeScript client (ts/). - Typed API -
py.typedmarker, publicTypedDicts (JobRecord,JobResult,FailureReason),JobStatus/LogStreamliterals. - Deployment assets - Dockerfile, Helm chart (
deploy/helm/pyjobkit), Grafana dashboard (deploy/grafana/).
Installation
pip install pyjobkit
Optional extras:
pip install "pyjobkit[pg]" # asyncpg (PostgreSQL)
pip install "pyjobkit[mysql]" # aiomysql
pip install "pyjobkit[sqlite]" # aiosqlite
pip install "pyjobkit[redis]" # RedisBackend (preview)
pip install "pyjobkit[docker]" # DockerExecutor
pip install "pyjobkit[fastapi]" # REST router + dashboard
pip install "pyjobkit[metrics]" # Prometheus /metrics exporter
pip install "pyjobkit[otel]" # OpenTelemetry spans
Getting started
The core building blocks are the Engine, a QueueBackend, at least one executor, and the Worker loop.
import asyncio
from pyjobkit import Engine, Worker
from pyjobkit.backends.memory import MemoryBackend
from pyjobkit.contracts import ExecContext, Executor
class HelloExecutor:
kind = "hello"
async def run(self, *, job_id, payload, ctx: ExecContext):
await ctx.log(f"processing {job_id}")
name = payload.get("name", "world")
return {"message": f"Hello, {name}!"}
async def main():
backend = MemoryBackend()
engine = Engine(backend=backend, executors=[HelloExecutor()])
worker = Worker(engine, max_concurrency=2)
# enqueue a job
job_id = await engine.enqueue(kind="hello", payload={"name": "Ada"})
print("enqueued", job_id)
# run the worker loop (typically done in a dedicated process)
await worker.run()
asyncio.run(main())
The memory backend keeps jobs in-process, making it ideal for unit tests and experimentation. For production you can switch to the SQL backend without changing the worker or executors:
from sqlalchemy.ext.asyncio import create_async_engine
from pyjobkit.backends.sql import SQLBackend
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
backend = SQLBackend(engine, prefer_pg_skip_locked=True, lease_ttl_s=60)
SQL schema and migrations
The SQL backend uses the job_tasks table defined in pyjobkit.backends.sql.schema. The bundled pyjobkit-migrate console script runs Alembic against the migrations shipped with the package:
pyjobkit-migrate --dsn postgresql+asyncpg://user:pass@host/db up
pyjobkit-migrate --dsn postgresql+asyncpg://user:pass@host/db current
It accepts the async DSN forms (postgresql+asyncpg, mysql+aiomysql, sqlite+aiosqlite) and rewrites them to the matching sync drivers internally. The DSN may also come from PYJOBKIT_DSN or .pyjobkit.toml. Re-run pyjobkit-migrate up on every deploy; each minor release that touches the schema ships an Alembic revision.
For quick prototyping you can still bootstrap the table via SQLAlchemy directly:
from sqlalchemy.ext.asyncio import create_async_engine
from pyjobkit.backends.sql.schema import metadata
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)
Running the bundled worker CLI
Once the schema exists, you can run the provided worker process:
pyjobkit --dsn postgresql+asyncpg://user:pass@host/db \
--concurrency 8 \
--batch 4 \
--lease-ttl 30 \
--poll-interval 0.5
Use --disable-skip-locked when targeting databases that do not support the Postgres-specific optimization. The CLI wires in the SQL backend plus the HTTP and subprocess executors, so jobs with kind="http" or kind="subprocess" will run out of the box.
The worker installs SIGTERM / SIGINT handlers that call worker.request_stop(), so container runtimes (Kubernetes, Docker, systemd) get a clean drain on shutdown. Pass --once for a one-shot drain (useful for cron-style invocations) or --kind to restrict the worker to specific job kinds.
Retention
The SQL backend keeps finished jobs forever unless you remove them. Run pyjobkit-prune on a schedule:
pyjobkit-prune --older-than 30d --statuses success,cancelled
pyjobkit-prune --older-than 90d --statuses failed,timeout
--older-than accepts compact durations (30d, 24h, 90m, 60s). Without it every terminal job in the chosen statuses is deleted.
Configuration via TOML / environment
CLI flags can be replaced (or supplemented) by a .pyjobkit.toml file in the working directory or by PYJOBKIT_* environment variables. Resolution order is CLI flags -> environment -> TOML file -> defaults.
# .pyjobkit.toml
[pyjobkit]
dsn = "postgresql+asyncpg://user:pass@host/db" # alias: db_url
poll_interval = 0.5
max_attempts = 3
default_executor = "myapp.executors:make_redis"
concurrency = 8
batch = 1
lease_ttl = 30
log_level = "INFO"
disable_skip_locked = false
extra_executors = ["myapp.executors:make_celery"]
log_format = "json" # 'text' (default) or 'json' for structured logs
retry_policy = "exponential_jitter:1:2:30:0.1" # or "fixed:5", "exponential:1:2"
export PYJOBKIT_DSN=postgresql+asyncpg://user:pass@host/db
export PYJOBKIT_POLL_INTERVAL=1.0
pyjobkit # uses env + ./.pyjobkit.toml
pyjobkit --config /etc/pyjobkit.toml
The same loader is exposed programmatically as pyjobkit.load_config() / pyjobkit.Config.
In-memory backend (testing / debug)
For unit tests, prototyping, and tutorials the library ships with a
fully-featured in-memory implementation of QueueBackend:
from pyjobkit import Engine, MemoryBackend
from pyjobkit.executors import SubprocessExecutor
backend = MemoryBackend()
engine = Engine(backend=backend, executors=[SubprocessExecutor()])
job_id = await engine.enqueue(kind="subprocess", payload={"cmd": "echo hi"})
# debug helpers
print(await backend.count()) # 1
print(await backend.count(status="queued")) # 1
await backend.clear()
All state lives in process memory and is dropped when the process exits - do not use this backend for durable workloads.
Docker and Kubernetes
A minimal worker image is provided in Dockerfile:
docker build -t pyjobkit/worker:dev .
docker run --rm \
-e PYJOBKIT_DSN=postgresql+asyncpg://user:pass@host/db \
pyjobkit/worker:dev
The image installs pyjobkit with the asyncpg and aiosqlite
drivers; configuration is read from PYJOBKIT_* environment variables
or a TOML file mounted into the container.
For Kubernetes a Helm chart is shipped in deploy/helm/pyjobkit. It renders the Deployment, the DSN Secret, a pre-install migration Job (pyjobkit-migrate up), a metrics Service, and an optional ServiceMonitor for kube-prometheus-stack. See deploy/README.md for installation instructions and deploy/grafana/pyjobkit-dashboard.json for a ready-to-import Grafana dashboard.
Production deployment
See docs/production.md for guidance on schema migrations, graceful shutdown, retention, observability, rate limits, and Postgres tuning. docs/cancellation.md documents the cooperative cancellation contract. The full stability policy is in docs/stability.md. Vulnerabilities should be reported per SECURITY.md.
Comparison with other Python job libraries
See docs/comparison.md for a head-to-head positioning against Celery, RQ, and Dramatiq.
Extending Pyjobkit
- Custom executors - Implement the
Executorprotocol, register instances when constructing theEngine, and leverage theExecContexthelpers (log,set_progress,is_cancelled). - Alternate backends - Implement the
QueueBackendprotocol to target message brokers or proprietary queues while reusing the worker and executor layers. - Logging & events - Swap the memory log sink or event bus with your own implementations (e.g., stream to Loki or publish over Redis) by passing them to the
Engineconstructor.
Examples
examples/taskboard- A single-page FastAPI dashboard that enqueues demo jobs which sleep for a random duration using the in-memory backend. Includes a Dockerfile for quick demos.
Demo dashboard screenshot
Running the Taskboard demo with Docker Compose
The repository ships with a minimal docker-compose.yml that runs the taskboard FastAPI app using the official Python image and a runtime install script:
docker compose up taskboard
# open http://localhost:8000 to view the UI
Troubleshooting BuildKit errors
Some environments (including Portainer-managed hosts) route Docker daemon traffic through an HTTP proxy. When that proxy does not understand HTTP/2,
docker composecan fail during the build phase with an error similar to:Failed to deploy a stack: compose build operation failed: listing workers for Build: failed to list workers: Unavailable: connection error: desc = "error reading server preface: http2: failed reading the frame payload: http2: frame too large, note that the frame header looked like an HTTP/1.1 header"BuildKit (used by
docker composeby default) communicates with the daemon over HTTP/2. To bypass the proxy limitation, temporarily disable BuildKit for the build command:DOCKER_BUILDKIT=0 docker compose up --build taskboardThe legacy builder falls back to HTTP/1.1 and succeeds in environments where BuildKit cannot establish its HTTP/2 connection. Note: the Environment variables form in Portainer only passes values to the Compose file itself, so setting
DOCKER_BUILDKITthere will not disable BuildKit. For Portainer, use one of the approaches below.
Running the Taskboard demo in Portainer
Below is a proven scenario for running the examples/taskboard demo stack on a host managed by Portainer.
- Make sure your Docker host (where Portainer agent or standalone daemon runs) has internet access for git and pip.
- In Portainer, go to Stacks -> Add stack -> Web editor.
- Copy the contents of your updated
docker-compose.yml(from this repository) into the editor. Make sure only thetaskboardservice is present in the YAML. - Click Deploy the stack. Portainer will pull the official
python:3.13-slimimage, install dependencies, clone the repository, and start the FastAPI app.
All jobs and data are stored in memory. Restarting the container will reset the job history. For production, consider building and publishing your own image to a registry.
Local run (alternative)
docker compose up taskboard
Notes
- All jobs and demo data are stored in memory; restarting the container clears the history.
- For production, build and push your own image to a registry.
- If you need to access a private repository, use environment variables to pass a token or SSH key.
Requirements
- Python 3.13+
- An asyncio-compatible event loop (the worker uses
asyncio.TaskGroup) - SQL backend users need SQLAlchemy 2.x plus an async driver for their database
License
Pyjobkit is distributed under the MIT License. See LICENSE for the full text.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyjobkit-1.0.0.tar.gz.
File metadata
- Download URL: pyjobkit-1.0.0.tar.gz
- Upload date:
- Size: 105.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eada14bb9a75dc689e5b1a0cc4f29d8b3634eee8dd8f7cf6280cd980c7534a7b
|
|
| MD5 |
9d91a3499d077aa52c30e9d9fb7c6864
|
|
| BLAKE2b-256 |
37cfb452c62e3ea359bf9e01d8961f63be6d73a39197c1ffc8b2ac0cb911f9a0
|
File details
Details for the file pyjobkit-1.0.0-py3-none-any.whl.
File metadata
- Download URL: pyjobkit-1.0.0-py3-none-any.whl
- Upload date:
- Size: 84.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b0b7dec3fe0502d466b7f5f8eba55a7c052e932f160ce62d7a9af93d735ed19a
|
|
| MD5 |
d789048f75911ad4236e4c7eed751347
|
|
| BLAKE2b-256 |
56fa1c59cde90df27bf3703ce596b23d6cd33bccb653b17e0ef1e7dd9310cc7e
|