Skip to main content

PostgreSQL backed job queues

Project description

pgjobq

A job queue built on top of Postgres.

Project status

Please do not use this for anything other than experimentation or inspiration. At some point I may decide to support this long term (at which point this warning will be removed), but until then this is just a playground subject to breaking changes (including breaking schema changes).

Purpose

Sometimes you have a Postgres database and need a queue. You could stand up more infrastructure (SQS, Redis, etc), or you could use your existing database. There are plenty of use cases for a persistent queue that do not require infinite scalability, snapshots or any of the other advanced features full fledged queues/event buses/job brokers have.

Features

  • Best effort at most once delivery (jobs are only delivered to one worker at a time)
  • Automatic redelivery of failed jobs (even if your process crashes)
  • Low latency delivery (near realtime, uses PostgreSQL's NOTIFY feature)
  • Low latency completion tracking (using NOTIFY)
  • Dead letter queuing
  • Job attributes and attribute filtering
  • Job dependencies (for processing DAG-like workflows or making jobs process FIFO)
  • Persistent scheduled jobs (scheduled in the database, not the client application)
  • Job cancellation (guaranteed for jobs in the queue and best effort for checked-out jobs)
  • Bulk sending and polling to support large workloads
  • Back pressure / bound queues
  • Fully typed async Python client (using asyncpg)
  • Exponential back off for retries
  • Telemetry hooks for sampling queries with EXPLAIN or integration with OpenTelemetry.

Possible features:

  • Reply-to queues and response handling

Examples

from contextlib import AsyncExitStack

import anyio
import asyncpg  # type: ignore
from pgjobq import create_queue, connect_to_queue, migrate_to_latest_version

async def main() -> None:

    async with AsyncExitStack() as stack:
        pool: asyncpg.Pool = await stack.enter_async_context(
            asyncpg.create_pool(  # type: ignore
                "postgres://postgres:postgres@localhost/postgres"
            )
        )
        await migrate_to_latest_version(pool)
        await create_queue("myq", pool)
        queue = await stack.enter_async_context(
            connect_to_queue("myq", pool)
        )
        async with anyio.create_task_group() as tg:

            async def worker() -> None:
                async with queue.receive() as msg_handle_rcv_stream:
                    # receive a single job
                    async with (await msg_handle_rcv_stream.receive()).acquire():
                        print("received")
                        # do some work
                        await anyio.sleep(1)
                        print("done processing")
                        print("acked")

            tg.start_soon(worker)
            tg.start_soon(worker)

            async with queue.send(b'{"foo":"bar"}') as completion_handle:
                print("sent")
                await completion_handle.wait()
                print("completed")
                tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(main)
    # prints:
    # "sent"
    # "received"
    # "done processing"
    # "acked"
    # "completed"

Development

  1. Clone the repo
  2. Start a disposable PostgreSQL instance (e.g docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres)
  3. Run make test

See this release on GitHub: v0.10.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgjobq-0.10.0.tar.gz (19.2 kB view details)

Uploaded Source

Built Distribution

pgjobq-0.10.0-py3-none-any.whl (24.1 kB view details)

Uploaded Python 3

File details

Details for the file pgjobq-0.10.0.tar.gz.

File metadata

  • Download URL: pgjobq-0.10.0.tar.gz
  • Upload date:
  • Size: 19.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.11.0 Linux/5.15.0-1022-azure

File hashes

Hashes for pgjobq-0.10.0.tar.gz
Algorithm Hash digest
SHA256 1deb0cb74b76ab1822de0f3e0a8913f1b94505268c7b65f9e950b6045d12a2a2
MD5 07f04d07d473652a6c570783c66f4337
BLAKE2b-256 9f4c580141800f827f03946483c0cc761b0c5355163293caa857488e96202db3

See more details on using hashes here.

File details

Details for the file pgjobq-0.10.0-py3-none-any.whl.

File metadata

  • Download URL: pgjobq-0.10.0-py3-none-any.whl
  • Upload date:
  • Size: 24.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.11.0 Linux/5.15.0-1022-azure

File hashes

Hashes for pgjobq-0.10.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8c0421c0584dbe8ce05a6351507666c42e4bd70c22ea58121c9a4b7b7b2165c5
MD5 7da3268775c73bb78085e351986870b1
BLAKE2b-256 56572b38980cf3a1777a041bc0eb6f670240711a604c724f8d0300cc4c800f03

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page