Skip to main content

Postgres-backed job queue. Durable, visible, transactional.

Project description

PGWerk

CI PyPI Python License: MIT Docs

A Postgres-backed job queue. Durable, visible, transactional.

Jobs are rows. Workers poll with SELECT … FOR UPDATE SKIP LOCKED. No external broker, no sidecar, just your existing Postgres instance. The schema is created automatically on first connect.


Python

Install

pip install pgwerk

# Cron support (optional)
pip install "pgwerk[cron]"

Requires Python 3.11+ and a Postgres 14+ database.

Quickstart

Define your app and handlers:

from pgwerk import Werk, Context

app = Werk("postgresql://user:pass@localhost/mydb")

# Call connect() once at startup; disconnect() at shutdown.
# async with app: is shorthand for the same pair.
await app.connect()

async def send_email(to: str):
    ...  # plain handler — ctx is optional

async def send_email_with_context(ctx: Context, to: str):
    ...  # opt in by naming/typing the first parameter as ctx

Enqueue jobs from anywhere in your application:

await app.enqueue(send_email, to="user@example.com")

Run a worker in a separate process:

import asyncio
from pgwerk import AsyncWorker

async def main():
    worker = AsyncWorker(app=app, queues=["default"], concurrency=10)
    await worker.run()

asyncio.run(main())

Handlers are identified by their dotted import path (myapp.tasks.send_email). The function itself is passed to enqueue; werk records its path and imports it on the worker side. ctx is injected when the first parameter is named ctx or annotated as Context.

Enqueueing

from pgwerk import Retry, Repeat, Dependency
from datetime import datetime, timezone, timedelta

# Basic
await app.enqueue(my_func, x=1)

# With options
await app.enqueue(
    my_func,
    x=1,
    _queue="high",
    _priority=10,
    _delay=30,                         # seconds from now
    _at=datetime(2025, 1, 1, tzinfo=timezone.utc),
    _retry=Retry(max=3, intervals=[10, 60, 300]),  # total attempts, including the first
    _timeout=120,                      # fail after 2 minutes
    _heartbeat=30,                     # worker auto-renews while the job is running
    _key="unique-key",                 # idempotency key — duplicate enqueues are silently dropped
    _group="user:42",                  # at most one active job per group
    _meta={"source": "api"},
    _on_success=notify_done,
    _on_failure=alert_team,
)

# Repeat a job N more times after the first run
await app.enqueue(cleanup, _repeat=Repeat(times=5, interval=3600))

# Depend on another job finishing first
job_a = await app.enqueue(step_one)
await app.enqueue(step_two, _depends_on=job_a)          # waits for job_a
await app.enqueue(step_two, _depends_on=Dependency(job_a, allow_failure=True))

# Bulk enqueue in one round-trip
from pgwerk import EnqueueParams
await app.enqueue_many([
    EnqueueParams(func=my_func, kwargs={"n": i}, queue="bulk") for i in range(100)
])

Workers

from pgwerk import AsyncWorker, ThreadWorker, ProcessWorker, ForkWorker

# Asyncio (default — best for I/O-bound work)
worker = AsyncWorker(app=app, queues=["default", "high"], concurrency=20)

# Thread pool (CPU-light work, blocking libraries)
worker = ThreadWorker(app=app, concurrency=8)

# Process pool (CPU-bound work, true parallelism)
worker = ProcessWorker(app=app, concurrency=4)

# Fork per job (maximum isolation)
worker = ForkWorker(app=app, concurrency=4)

await worker.run()

Workers register themselves in the database, send periodic heartbeats, auto-touch jobs that opt into _heartbeat, and use LISTEN/NOTIFY for instant wake-up when jobs are enqueued.

Cron

from pgwerk import CronScheduler, CronJob

scheduler = CronScheduler(app)
scheduler.register(CronJob(func=my_func, cron="*/15 * * * *"))  # every 15 min
scheduler.register(CronJob(func=other_func, interval=3600))      # every hour

async with app:
    await scheduler.run()

CronScheduler uses a Postgres advisory lock so only one process runs the scheduler at a time. Requires croniter for cron expressions.

Serializers

from pgwerk import Werk, PickleSerializer

app = Werk(dsn, serializer=PickleSerializer())  # default is JSONSerializer

The configured serializer is used for job payloads, job results, and execution results.

Job inspection

job = await app.get_job(job_id)
executions = await app.get_executions(job_id)
await app.cancel_job(job_id)

CLI

# Start a worker
werk worker myapp.tasks:app --queues default,high --concurrency 10 --worker-type async

# Show queue statistics and active workers
werk info myapp.tasks:app

# Delete finished jobs
werk purge myapp.tasks:app --status complete,failed

APP is a module:attribute path to a Werk instance.


Schema

All tables are prefixed (default _pgwerk_) and optionally placed in a named schema. Migrations run automatically on connect using an advisory lock to prevent races across multiple processes starting simultaneously.

Table Purpose
_pgwerk_worker Registered workers + heartbeat tracking
_pgwerk_jobs Job queue — all state lives here
_pgwerk_worker_jobs Active claim tracking
_pgwerk_jobs_executions Per-attempt execution history
_pgwerk_job_deps Job dependency graph

Job lifecycle

queued → active → complete
                ↘ failed   (retries exhausted)
       → waiting           (blocked on dependencies, Python only)
       → aborted           (cancelled before execution)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgwerk-0.1.0.tar.gz (234.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgwerk-0.1.0-py3-none-any.whl (81.0 kB view details)

Uploaded Python 3

File details

Details for the file pgwerk-0.1.0.tar.gz.

File metadata

  • Download URL: pgwerk-0.1.0.tar.gz
  • Upload date:
  • Size: 234.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgwerk-0.1.0.tar.gz
Algorithm Hash digest
SHA256 077ace27b57925f9803c1f207bab3039bb5f07c9a1be4d16f62667656a6e46b5
MD5 20b92da2f8d5e64eec0e12b78d0c63ec
BLAKE2b-256 136e866d469d68ff915701111785ae78145bfefa14f8101fce6eadbb941f3393

See more details on using hashes here.

File details

Details for the file pgwerk-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: pgwerk-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 81.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pgwerk-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 501b6c0ff520b0d708d9c8b23a1d95100cbad76dc723d22c8626c8553685ee2d
MD5 c85f80d299e7e75508daf5a27f82f772
BLAKE2b-256 35861dc11b8e049bef46ee8dc6623cf173227b2b588fab809871aecf77de64f7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page