Skip to main content

Postgres-backed job queue. Durable, visible, transactional.

Project description

PGWerk

CI PyPI Python License: MIT Docs

A Postgres-backed job queue. Durable, visible, transactional.

Jobs are rows. Workers poll with SELECT … FOR UPDATE SKIP LOCKED. No external broker, no sidecar, just your existing Postgres instance. The schema is created automatically on first connect.

PGWerk dashboard overview


Python

Install

pip install pgwerk

# Cron support (optional)
pip install "pgwerk[cron]"

Requires Python 3.11+ and a Postgres 14+ database.

Quickstart

Define your app and handlers:

from pgwerk import Werk, Context

app = Werk("postgresql://user:pass@localhost/mydb")

# Call connect() once at startup; disconnect() at shutdown.
# async with app: is shorthand for the same pair.
await app.connect()

async def send_email(to: str):
    ...  # plain handler — ctx is optional

async def send_email_with_context(ctx: Context, to: str):
    ...  # opt in by naming/typing the first parameter as ctx

Enqueue jobs from anywhere in your application:

await app.enqueue(send_email, to="user@example.com")

Run a worker in a separate process:

import asyncio
from pgwerk import AsyncWorker

async def main():
    worker = AsyncWorker(app=app, queues=["default"], concurrency=10)
    await worker.run()

asyncio.run(main())

Handlers are identified by their dotted import path (myapp.tasks.send_email). The function itself is passed to enqueue; werk records its path and imports it on the worker side. ctx is injected when the first parameter is named ctx or annotated as Context.

Enqueueing

from pgwerk import Retry, Repeat, Dependency
from datetime import datetime, timezone, timedelta

# Basic
await app.enqueue(my_func, x=1)

# With options
await app.enqueue(
    my_func,
    x=1,
    _queue="high",
    _priority=10,
    _delay=30,                         # seconds from now
    _at=datetime(2025, 1, 1, tzinfo=timezone.utc),
    _retry=Retry(max=3, intervals=[10, 60, 300]),  # total attempts, including the first
    _timeout=120,                      # fail after 2 minutes
    _heartbeat=30,                     # worker auto-renews while the job is running
    _key="unique-key",                 # idempotency key — duplicate enqueues are silently dropped
    _group="user:42",                  # at most one active job per group
    _meta={"source": "api"},
    _on_success=notify_done,
    _on_failure=alert_team,
)

# Repeat a job N more times after the first run
await app.enqueue(cleanup, _repeat=Repeat(times=5, interval=3600))

# Depend on another job finishing first
job_a = await app.enqueue(step_one)
await app.enqueue(step_two, _depends_on=job_a)          # waits for job_a
await app.enqueue(step_two, _depends_on=Dependency(job_a, allow_failure=True))

# Bulk enqueue in one round-trip
from pgwerk import EnqueueParams
await app.enqueue_many([
    EnqueueParams(func=my_func, kwargs={"n": i}, queue="bulk") for i in range(100)
])

Workers

from pgwerk import AsyncWorker, ThreadWorker, ProcessWorker, ForkWorker

# Asyncio (default — best for I/O-bound work)
worker = AsyncWorker(app=app, queues=["default", "high"], concurrency=20)

# Thread pool (CPU-light work, blocking libraries)
worker = ThreadWorker(app=app, concurrency=8)

# Process pool (CPU-bound work, true parallelism)
worker = ProcessWorker(app=app, concurrency=4)

# Fork per job (maximum isolation)
worker = ForkWorker(app=app, concurrency=4)

await worker.run()

Workers register themselves in the database, send periodic heartbeats, auto-touch jobs that opt into _heartbeat, and use LISTEN/NOTIFY for instant wake-up when jobs are enqueued.

Cron

from pgwerk import CronScheduler, CronJob

scheduler = CronScheduler(app)
scheduler.register(CronJob(func=my_func, cron="*/15 * * * *"))  # every 15 min
scheduler.register(CronJob(func=other_func, interval=3600))      # every hour

async with app:
    await scheduler.run()

CronScheduler uses a Postgres advisory lock so only one process runs the scheduler at a time. Requires croniter for cron expressions.

Serializers

from pgwerk import Werk, PickleSerializer

app = Werk(dsn, serializer=PickleSerializer())  # default is JSONSerializer

The configured serializer is used for job payloads, job results, and execution results.

Job inspection

job = await app.get_job(job_id)
executions = await app.get_executions(job_id)
await app.cancel_job(job_id)

CLI

# Start a worker
werk worker myapp.tasks:app --queues default,high --concurrency 10 --worker-type async

# Show queue statistics and active workers
werk info myapp.tasks:app

# Delete finished jobs
werk purge myapp.tasks:app --status complete,failed

APP is a module:attribute path to a Werk instance.


Schema

All tables are prefixed (default _pgwerk_) and optionally placed in a named schema. Migrations run automatically on connect using an advisory lock to prevent races across multiple processes starting simultaneously.

Table Purpose
_pgwerk_worker Registered workers + heartbeat tracking
_pgwerk_jobs Job queue — all state lives here
_pgwerk_worker_jobs Active claim tracking
_pgwerk_jobs_executions Per-attempt execution history
_pgwerk_job_deps Job dependency graph

Job lifecycle

queued → active → complete
                ↘ failed   (retries exhausted)
       → waiting           (blocked on dependencies, Python only)
       → aborted           (cancelled before execution)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgwerk-0.1.4.tar.gz (1.0 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgwerk-0.1.4-py3-none-any.whl (81.9 kB view details)

Uploaded Python 3

File details

Details for the file pgwerk-0.1.4.tar.gz.

File metadata

  • Download URL: pgwerk-0.1.4.tar.gz
  • Upload date:
  • Size: 1.0 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgwerk-0.1.4.tar.gz
Algorithm Hash digest
SHA256 85e1c43fdf409fee5c67e93c5b18c0351a6c27905a038b1a007e838862c0bdaf
MD5 26a9c07f0dc6f68c2775d7023fd2ba0a
BLAKE2b-256 ef239ce9dc6a6766b29ec57bb5faa13383e44dba87e20ac1a6ba50b6f9475dd6

See more details on using hashes here.

File details

Details for the file pgwerk-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: pgwerk-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 81.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgwerk-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 8ab1b5a1341b1b1c1d401fd56d66585677eab8820e6c6c9b83d6148977663afc
MD5 387437ddf3b667a20b8e427470f45573
BLAKE2b-256 c86c9d4e44860dfa1ec1731c1a05d44395da2ec6e40ee051f6669153fc7f5848

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page