Skip to main content

PostgreSQL integration for taskiq

Project description

PyPI - Python Version PyPI Checks


PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.

See more example of usage in the documentation or examples directory.

Installation

Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:

# with asyncpg
pip install taskiq-postgres[asyncpg]

# with psqlpy
pip install taskiq-postgres[psqlpy]

# with aiopg
pip install taskiq-postgres[aiopg]

Quick start

Basic task processing

  1. Define your broker with asyncpg:
# broker_example.py
import asyncio
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))


@broker.task("solve_all_problems")
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(2)
    print("All problems are solved!")


async def main():
    await broker.startup()
    task = await best_task_ever.kiq()
    print(await task.wait_result())
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
  1. Start a worker to process tasks (by default taskiq runs two instances of worker):
taskiq worker broker_example:broker
  1. Run broker_example.py file to send a task to the worker:
python broker_example.py

Your experience with other drivers will be pretty similar. Just change the import statement and that's it.

Task scheduling

  1. Define your broker and schedule source:
# scheduler_example.py
import asyncio
from taskiq import TaskiqScheduler
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn)
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[AsyncpgScheduleSource(
        dsn=dsn,
        broker=broker,
    )],
)


@broker.task(
    task_name="solve_all_problems",
    schedule=[
        {
            "cron": "*/1 * * * *",  # type: str, either cron or time should be specified.
            "cron_offset": None,  # type: str | timedelta | None, can be omitted.
            "time": None,  # type: datetime | None, either cron or time should be specified.
            "args": [], # type list[Any] | None, can be omitted.
            "kwargs": {}, # type: dict[str, Any] | None, can be omitted.
            "labels": {}, # type: dict[str, Any] | None, can be omitted.
        },
    ],
)
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(2)
    print("All problems are solved!")
  1. Start worker processes:
taskiq worker scheduler_example:broker
  1. Run scheduler process:
taskiq scheduler scheduler_example:scheduler

Motivation

There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality. To address this issue I created this library with a common interface for most popular PostgreSQL drivers that handle similarity across functionality of result backends, brokers and schedule sources.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_postgres-0.4.0.tar.gz (11.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_postgres-0.4.0-py3-none-any.whl (22.6 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_postgres-0.4.0.tar.gz.

File metadata

  • Download URL: taskiq_postgres-0.4.0.tar.gz
  • Upload date:
  • Size: 11.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.1

File hashes

Hashes for taskiq_postgres-0.4.0.tar.gz
Algorithm Hash digest
SHA256 63dc5d176a66cf03d63e52ed2777a38f31c0dc5cc6d198fa00ccf75fb55f2f26
MD5 0ad5fc85e21cc778a5374f5626e944bd
BLAKE2b-256 4d2f7bb5e067b9980570997bf341ed17594efe39e5b0855fc3721af4c0db10b1

See more details on using hashes here.

File details

Details for the file taskiq_postgres-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for taskiq_postgres-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dfafe54345b19733f0c41f69e3b8a003cb9a879e70eac60e877aa872994c4506
MD5 da60c754288db66524fad451f1458cca
BLAKE2b-256 a93ffedfc34e67cd6d4dae628be06283e3becb87c4f3990079d7c5759deacd2d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page