Skip to main content

Postgres-backed job queue. Durable, visible, transactional.

Project description

PGWerk

CI PyPI Python License: MIT Docs

A Postgres-backed job queue. Durable, visible, transactional.

Jobs are rows. Workers poll with SELECT … FOR UPDATE SKIP LOCKED. No external broker, no sidecar, just your existing Postgres instance. The schema is created automatically on first connect.

pgwerk does not maintain an internal connection pool. Each operation uses a short-lived connection, making it compatible with external poolers like PgBouncer in transaction pooling mode.

PGWerk dashboard overview


Python

Install

pip install pgwerk

# Cron support (optional)
pip install "pgwerk[cron]"

Requires Python 3.11+ and a Postgres 14+ database.

Quickstart

Define your app and handlers:

from pgwerk import Werk, Context

app = Werk("postgresql://user:pass@localhost/mydb")

# Call connect() once at startup; disconnect() at shutdown.
# async with app: is shorthand for the same pair.
await app.connect()

async def send_email(to: str):
    ...  # plain handler — ctx is optional

async def send_email_with_context(ctx: Context, to: str):
    ...  # opt in by naming/typing the first parameter as ctx

Enqueue jobs from anywhere in your application:

await app.enqueue(send_email, to="user@example.com")

Run a worker in a separate process:

import asyncio
from pgwerk import AsyncWorker

async def main():
    worker = AsyncWorker(app=app, queues=["default"], concurrency=10)
    await worker.run()

asyncio.run(main())

Handlers are identified by their dotted import path (myapp.tasks.send_email). The function itself is passed to enqueue; werk records its path and imports it on the worker side. ctx is injected when the first parameter is named ctx or annotated as Context.

Enqueueing

from pgwerk import Retry, Repeat, Dependency
from datetime import datetime, timezone, timedelta

# Basic
await app.enqueue(my_func, x=1)

# With options
await app.enqueue(
    my_func,
    x=1,
    _queue="high",
    _priority=10,
    _delay=30,                         # seconds from now
    _at=datetime(2025, 1, 1, tzinfo=timezone.utc),
    _retry=Retry(max=3, intervals=[10, 60, 300]),  # total attempts, including the first
    _timeout=120,                      # fail after 2 minutes
    _heartbeat=30,                     # worker auto-renews while the job is running
    _key="unique-key",                 # idempotency key — duplicate enqueues are silently dropped
    _group="user:42",                  # at most one active job per group
    _meta={"source": "api"},
    _on_success=notify_done,
    _on_failure=alert_team,
)

# Repeat a job N more times after the first run
await app.enqueue(cleanup, _repeat=Repeat(times=5, interval=3600))

# Depend on another job finishing first
job_a = await app.enqueue(step_one)
await app.enqueue(step_two, _depends_on=job_a)          # waits for job_a
await app.enqueue(step_two, _depends_on=Dependency(job_a, allow_failure=True))

# Bulk enqueue in one round-trip
from pgwerk import EnqueueParams
await app.enqueue_many([
    EnqueueParams(func=my_func, kwargs={"n": i}, queue="bulk") for i in range(100)
])

Workers

from pgwerk import AsyncWorker, ThreadWorker, ProcessWorker, ForkWorker

# Asyncio (default — best for I/O-bound work)
worker = AsyncWorker(app=app, queues=["default", "high"], concurrency=20)

# Thread pool (CPU-light work, blocking libraries)
worker = ThreadWorker(app=app, concurrency=8)

# Process pool (CPU-bound work, true parallelism)
worker = ProcessWorker(app=app, concurrency=4)

# Fork per job (maximum isolation)
worker = ForkWorker(app=app, concurrency=4)

await worker.run()

Workers register themselves in the database, send periodic heartbeats, auto-touch jobs that opt into _heartbeat, and use LISTEN/NOTIFY for instant wake-up when jobs are enqueued.

Cron

from pgwerk import CronScheduler, CronJob

scheduler = CronScheduler(app)
scheduler.register(CronJob(func=my_func, cron="*/15 * * * *"))  # every 15 min
scheduler.register(CronJob(func=other_func, interval=3600))      # every hour

async with app:
    await scheduler.run()

CronScheduler uses a Postgres advisory lock so only one process runs the scheduler at a time. Requires croniter for cron expressions.

Serializers

from pgwerk import Werk, PickleSerializer

app = Werk(dsn, serializer=PickleSerializer())  # default is JSONSerializer

The configured serializer is used for job payloads, job results, and execution results.

Job inspection

job = await app.get_job(job_id)
executions = await app.get_executions(job_id)
await app.cancel_job(job_id)

CLI

APP is a module:attribute path to a Werk instance.

Worker

werk worker myapp.tasks:app --queues default,high --concurrency 10 --worker-type async

Observability API

werk api --dsn postgresql://localhost/mydb
Flag Env var Default Description
--dsn PGWERK_DSN Postgres connection string (required)
--host / -h PGWERK_HOST 127.0.0.1 Host to bind
--port / -p PGWERK_PORT 8000 Port to bind
--schema PGWERK_SCHEMA Postgres schema for wrk tables
--prefix PGWERK_PREFIX Table-name prefix
--metrics PGWERK_METRICS off Enable Prometheus metrics at GET /metrics
--metrics-interval PGWERK_METRICS_INTERVAL 15.0 Metrics scrape interval in seconds
--no-ui PGWERK_NO_UI off Disable the SPA dashboard
--ui-auth PGWERK_UI_AUTH Basic Auth for the SPA as user:password
--api-token PGWERK_API_TOKEN Bearer token for all /api/* routes
--log-level / -l PGWERK_LOG_LEVEL INFO debug, info, warning, error
--log-format PGWERK_LOG_FORMAT text text or json
--no-color PGWERK_NO_COLOR off Disable colored log output
--reload PGWERK_RELOAD off Auto-reload (development)

Inspection

# Show queue statistics and active workers
werk info myapp.tasks:app

# Live terminal dashboard
werk dashboard myapp.tasks:app

# List recent jobs
werk jobs myapp.tasks:app

# Show slowest functions by average execution time
werk slowest myapp.tasks:app

# Delete finished jobs
werk purge myapp.tasks:app --status complete,failed

Schema

All tables are prefixed (default _pgwerk_) and optionally placed in a named schema. Migrations run automatically on connect using an advisory lock to prevent races across multiple processes starting simultaneously.

Table Purpose
_pgwerk_worker Registered workers + heartbeat tracking
_pgwerk_jobs Job queue — all state lives here
_pgwerk_worker_jobs Active claim tracking
_pgwerk_jobs_executions Per-attempt execution history
_pgwerk_job_deps Job dependency graph

Job lifecycle

queued → active → complete
                ↘ failed   (retries exhausted)
       → waiting           (blocked on dependencies, Python only)
       → aborted           (cancelled before execution)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgwerk-0.1.8-py3-none-any.whl (351.2 kB view details)

Uploaded Python 3

File details

Details for the file pgwerk-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: pgwerk-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 351.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgwerk-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 a826c1eb920c9ccbb67aa9b213d7b3572b59f946ef0e4fbd9d22d695a016a6bc
MD5 3cc48773572c9df2ac26193dbab59ff0
BLAKE2b-256 1eb89b4a11093222f5f869660555025fd99beb4b93d1266b782c96168ef4e0f8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page