Postgres-backed job queue. Durable, visible, transactional.
Project description
PGWerk
A Postgres-backed job queue. Durable, visible, transactional.
Jobs are rows. Workers poll with SELECT … FOR UPDATE SKIP LOCKED. No external broker, no sidecar, just your existing Postgres instance. The schema is created automatically on first connect.
pgwerk does not maintain an internal connection pool. Each operation uses a short-lived connection, making it compatible with external poolers like PgBouncer in transaction pooling mode.
Python
Install
pip install pgwerk
# Cron support (optional)
pip install "pgwerk[cron]"
Requires Python 3.11+ and a Postgres 14+ database.
Quickstart
Define your app and handlers:
from pgwerk import Werk, Context
app = Werk("postgresql://user:pass@localhost/mydb")
# Call connect() once at startup; disconnect() at shutdown.
# async with app: is shorthand for the same pair.
await app.connect()
async def send_email(to: str):
... # plain handler — ctx is optional
async def send_email_with_context(ctx: Context, to: str):
... # opt in by naming/typing the first parameter as ctx
Enqueue jobs from anywhere in your application:
await app.enqueue(send_email, to="user@example.com")
Run a worker in a separate process:
import asyncio
from pgwerk import AsyncWorker
async def main():
worker = AsyncWorker(app=app, queues=["default"], concurrency=10)
await worker.run()
asyncio.run(main())
Handlers are identified by their dotted import path (myapp.tasks.send_email). The function itself is passed to enqueue; werk records its path and imports it on the worker side. ctx is injected when the first parameter is named ctx or annotated as Context.
Enqueueing
from pgwerk import Retry, Repeat, Dependency
from datetime import datetime, timezone, timedelta
# Basic
await app.enqueue(my_func, x=1)
# With options
await app.enqueue(
my_func,
x=1,
_queue="high",
_priority=10,
_delay=30, # seconds from now
_at=datetime(2025, 1, 1, tzinfo=timezone.utc),
_retry=Retry(max=3, intervals=[10, 60, 300]), # total attempts, including the first
_timeout=120, # fail after 2 minutes
_heartbeat=30, # worker auto-renews while the job is running
_key="unique-key", # idempotency key — duplicate enqueues are silently dropped
_group="user:42", # at most one active job per group
_meta={"source": "api"},
_on_success=notify_done,
_on_failure=alert_team,
)
# Repeat a job N more times after the first run
await app.enqueue(cleanup, _repeat=Repeat(times=5, interval=3600))
# Depend on another job finishing first
job_a = await app.enqueue(step_one)
await app.enqueue(step_two, _depends_on=job_a) # waits for job_a
await app.enqueue(step_two, _depends_on=Dependency(job_a, allow_failure=True))
# Bulk enqueue in one round-trip
from pgwerk import EnqueueParams
await app.enqueue_many([
EnqueueParams(func=my_func, kwargs={"n": i}, queue="bulk") for i in range(100)
])
Workers
from pgwerk import AsyncWorker, ThreadWorker, ProcessWorker, ForkWorker
# Asyncio (default — best for I/O-bound work)
worker = AsyncWorker(app=app, queues=["default", "high"], concurrency=20)
# Thread pool (CPU-light work, blocking libraries)
worker = ThreadWorker(app=app, concurrency=8)
# Process pool (CPU-bound work, true parallelism)
worker = ProcessWorker(app=app, concurrency=4)
# Fork per job (maximum isolation)
worker = ForkWorker(app=app, concurrency=4)
await worker.run()
Workers register themselves in the database, send periodic heartbeats, auto-touch jobs that opt into _heartbeat, and use LISTEN/NOTIFY for instant wake-up when jobs are enqueued.
Cron
from pgwerk import CronScheduler, CronJob
scheduler = CronScheduler(app)
scheduler.register(CronJob(func=my_func, cron="*/15 * * * *")) # every 15 min
scheduler.register(CronJob(func=other_func, interval=3600)) # every hour
async with app:
await scheduler.run()
CronScheduler uses a Postgres advisory lock so only one process runs the scheduler at a time. Requires croniter for cron expressions.
Serializers
from pgwerk import Werk, PickleSerializer
app = Werk(dsn, serializer=PickleSerializer()) # default is JSONSerializer
The configured serializer is used for job payloads, job results, and execution results.
Job inspection
job = await app.get_job(job_id)
executions = await app.get_executions(job_id)
await app.cancel_job(job_id)
CLI
APP is a module:attribute path to a Werk instance.
Worker
werk worker myapp.tasks:app --queues default,high --concurrency 10 --worker-type async
Observability API
werk api --dsn postgresql://localhost/mydb
| Flag | Env var | Default | Description |
|---|---|---|---|
--dsn |
PGWERK_DSN |
— | Postgres connection string (required) |
--host / -h |
PGWERK_HOST |
127.0.0.1 |
Host to bind |
--port / -p |
PGWERK_PORT |
8000 |
Port to bind |
--schema |
PGWERK_SCHEMA |
— | Postgres schema for wrk tables |
--prefix |
PGWERK_PREFIX |
— | Table-name prefix |
--metrics |
PGWERK_METRICS |
off | Enable Prometheus metrics at GET /metrics |
--metrics-interval |
PGWERK_METRICS_INTERVAL |
15.0 |
Metrics scrape interval in seconds |
--no-ui |
PGWERK_NO_UI |
off | Disable the SPA dashboard |
--ui-auth |
PGWERK_UI_AUTH |
— | Basic Auth for the SPA as user:password |
--api-token |
PGWERK_API_TOKEN |
— | Bearer token for all /api/* routes |
--log-level / -l |
PGWERK_LOG_LEVEL |
INFO |
debug, info, warning, error |
--log-format |
PGWERK_LOG_FORMAT |
text |
text or json |
--no-color |
PGWERK_NO_COLOR |
off | Disable colored log output |
--reload |
PGWERK_RELOAD |
off | Auto-reload (development) |
Inspection
# Show queue statistics and active workers
werk info myapp.tasks:app
# Live terminal dashboard
werk dashboard myapp.tasks:app
# List recent jobs
werk jobs myapp.tasks:app
# Show slowest functions by average execution time
werk slowest myapp.tasks:app
# Delete finished jobs
werk purge myapp.tasks:app --status complete,failed
Schema
All tables are prefixed (default _pgwerk_) and optionally placed in a named schema. Migrations run automatically on connect using an advisory lock to prevent races across multiple processes starting simultaneously.
| Table | Purpose |
|---|---|
_pgwerk_worker |
Registered workers + heartbeat tracking |
_pgwerk_jobs |
Job queue — all state lives here |
_pgwerk_worker_jobs |
Active claim tracking |
_pgwerk_jobs_executions |
Per-attempt execution history |
_pgwerk_job_deps |
Job dependency graph |
Job lifecycle
queued → active → complete
↘ failed (retries exhausted)
→ waiting (blocked on dependencies, Python only)
→ aborted (cancelled before execution)
Production security
By default, pgwerk tables land in whatever schema your connection's search_path resolves to (usually public). For production we recommend a dedicated schema and a role scoped to it, so pgwerk credentials can never touch the rest of your database.
CREATE SCHEMA pgwerk;
CREATE USER pgwerk_app WITH PASSWORD 'strong-password';
GRANT USAGE, CREATE ON SCHEMA pgwerk TO pgwerk_app;
Then point your app at that schema and user:
app = Werk("postgresql://pgwerk_app:strong-password@localhost/mydb", schema="pgwerk")
The CREATE privilege is needed because pgwerk auto-migrates on first connect. If you prefer to run migrations separately — e.g. as a CI/CD step with elevated credentials — use werk migrate:
# Run once during deployment with a role that has CREATE
werk migrate --dsn postgresql://pgwerk_admin:...@localhost/mydb --schema pgwerk
# Application runtime needs only DML
app = Werk("postgresql://pgwerk_app:...@localhost/mydb", schema="pgwerk", auto_migrate=False)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgwerk-0.1.16-py3-none-any.whl.
File metadata
- Download URL: pgwerk-0.1.16-py3-none-any.whl
- Upload date:
- Size: 373.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2b8e8c51a2c463ac1e16da0f6d5aac89dc0b1088e59b6b5fdac66aa8d4b90efa
|
|
| MD5 |
235c8e309130eeddf6f242c93e9f94ab
|
|
| BLAKE2b-256 |
ae1ec44fd5a4b4a7b5ec3bf3dcfac053d9330ddd2a8b48829450a6c98b0825eb
|