Skip to main content

PostgreSQL integration for taskiq

Project description

PyPI - Python Version PyPI Checks


PostgreSQL integration for Taskiq with support for asyncpg, psqlpy, psycopg and aiopg drivers.

Features

  • PostgreSQL Broker - high-performance message broker using PostgreSQL LISTEN/NOTIFY;
  • Result Backend - persistent task result storage with configurable retention;
  • Scheduler Source - cron-like task scheduling with PostgreSQL persistence;
  • Multiple Drivers - support for asyncpg, psycopg3, psqlpy and aiopg;
  • Flexible Configuration - customizable table names, field types, and connection options;
  • Multiple Serializers - support for different serialization methods (Pickle, JSON, etc.).

See usage guide in documentation or explore examples in separate directory.

Installation

Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:

# with asyncpg
pip install taskiq-postgres[asyncpg]

# with psqlpy
pip install taskiq-postgres[psqlpy]

# with psycopg3
pip install taskiq-postgres[psycopg]

# with aiopg
pip install taskiq-postgres[aiopg]

Quick start

Basic task processing

  1. Define your broker with asyncpg:
# broker_example.py
import asyncio
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))


@broker.task("solve_all_problems")
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(2)
    print("All problems are solved!")


async def main():
    await broker.startup()
    task = await best_task_ever.kiq()
    print(await task.wait_result())
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  1. Start a worker to process tasks (by default taskiq runs two instances of worker):
taskiq worker broker_example:broker
  1. Run broker_example.py file to send a task to the worker:
python broker_example.py

Your experience with other drivers will be pretty similar. Just change the import statement and that's it.

Task scheduling

  1. Define your broker and schedule source:
# scheduler_example.py
import asyncio
from taskiq import TaskiqScheduler
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn)
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[AsyncpgScheduleSource(
        dsn=dsn,
        broker=broker,
    )],
)


@broker.task(
    task_name="solve_all_problems",
    schedule=[
        {
            "cron": "*/1 * * * *",  # type: str, either cron or time should be specified.
            "cron_offset": None,  # type: str | None, can be omitted. For example "Europe/Berlin".
            "time": None,  # type: datetime | None, either cron or time should be specified.
            "args": [], # type list[Any] | None, can be omitted.
            "kwargs": {}, # type: dict[str, Any] | None, can be omitted.
            "labels": {}, # type: dict[str, Any] | None, can be omitted.
        },
    ],
)
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(2)
    print("All problems are solved!")
  1. Start worker processes:
taskiq worker scheduler_example:broker
  1. Run scheduler process:
taskiq scheduler scheduler_example:scheduler

Motivation

There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality. To address this issue I created this library with a common interface for most popular PostgreSQL drivers that handle similarity across functionality of result backends, brokers and schedule sources.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_postgres-0.7.0.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_postgres-0.7.0-py3-none-any.whl (30.6 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_postgres-0.7.0.tar.gz.

File metadata

  • Download URL: taskiq_postgres-0.7.0.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.16 {"installer":{"name":"uv","version":"0.9.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_postgres-0.7.0.tar.gz
Algorithm Hash digest
SHA256 7bb4c5094db379e7319230f589e54a32f525a5fb606d792c7c0a25530ed70a31
MD5 f6ceeffa6b0f37de935fe102289dddbf
BLAKE2b-256 128d73b7cbc8366539b9180d7603d5d885722d0908de9cefa91c1a08ad2742d9

See more details on using hashes here.

File details

Details for the file taskiq_postgres-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: taskiq_postgres-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 30.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.16 {"installer":{"name":"uv","version":"0.9.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_postgres-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 05531a4afe3e8a3afa52f65ba23699c03e3b4ec77bbbc7b722231110860df1c6
MD5 312aedbbba3b9246a3601292f43ffbf2
BLAKE2b-256 ccdb91bb2020d5c010d55cc99182bdb2edd420f8ea985705158af6003e1ef00e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page