Skip to main content

YDB integration for taskiq

Project description

taskiq + ydb

PyPI - Python Version PyPI Checks

Plugin for taskiq that adds a new result backend, broker and schedule source based on YDB.

Installation

This project can be installed using pip/poetry/uv (choose your preferred package manager):

pip install taskiq-ydb

Quick start

Basic task processing

  1. Define your broker with asyncpg:
# broker_example.py
import asyncio
from ydb.aio.driver import DriverConfig
from taskiq_ydb import YdbBroker, YdbResultBackend


driver_config = DriverConfig(
    endpoint='grpc://localhost:2136',
    database='/local',
)
broker = YdbBroker(
    driver_config=driver_config,
).with_result_backend(
    YdbResultBackend(driver_config=driver_config),
)


@broker.task('solve_all_problems')
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(2)
    print('All problems are solved!')


async def main() -> None:
    await broker.startup()
    task = await best_task_ever.kiq()
    print(await task.wait_result())
    await broker.shutdown()


if __name__ == '__main__':
    asyncio.run(main())
  1. Start a worker to process tasks (by default taskiq runs two instances of worker):
taskiq worker broker_example:broker
  1. Run broker_example.py file to send a task to the worker:
python broker_example.py

Your experience with other drivers will be pretty similar. Just change the import statement and that's it.

Task scheduling

  1. Define your broker and schedule source:
# scheduler_example.py
import asyncio

from taskiq import TaskiqScheduler
from ydb.aio.driver import DriverConfig

from taskiq_ydb import YdbBroker, YdbScheduleSource


driver_config = DriverConfig(
    endpoint='grpc://localhost:2136',
    database='/local',
)
broker = YdbBroker(driver_config=driver_config)
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[
        YdbScheduleSource(
            driver_config=driver_config,
            broker=broker,
        ),
    ],
)


@broker.task(
    task_name='solve_all_problems',
    schedule=[
        {
            'cron': '*/1 * * * *',  # type: str, either cron or time should be specified.
            'cron_offset': None,  # type: str | timedelta | None, can be omitted.
            'time': None,  # type: datetime | None, either cron or time should be specified.
            'args': [],  # type list[Any] | None, can be omitted.
            'kwargs': {},  # type: dict[str, Any] | None, can be omitted.
            'labels': {},  # type: dict[str, Any] | None, can be omitted.
        },
    ],
)
async def best_task_ever() -> None:
    """Solve all problems in the world."""
    await asyncio.sleep(2)
    print('All problems are solved!')
  1. Start worker processes:
taskiq worker scheduler_example:broker
  1. Run scheduler process:
taskiq scheduler scheduler_example:scheduler

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_ydb-0.4.0.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_ydb-0.4.0-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_ydb-0.4.0.tar.gz.

File metadata

  • Download URL: taskiq_ydb-0.4.0.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.5

File hashes

Hashes for taskiq_ydb-0.4.0.tar.gz
Algorithm Hash digest
SHA256 354c9a9769f2a6c164d0bab434caac78508b687db5d583523b53131b210e219e
MD5 2ea314a3205c003e995e200ccbfb997a
BLAKE2b-256 724382db49934230bb9b27f3b3c8ddbd82f93ebb6b2aae5bb17a2a7e4a6af31b

See more details on using hashes here.

File details

Details for the file taskiq_ydb-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: taskiq_ydb-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.5

File hashes

Hashes for taskiq_ydb-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3ccf18b377aa96f4241d4c49f9144f642f14757c724181f126e67874c39607e1
MD5 1b9681f6f28370ef4a04e4a80a5efe40
BLAKE2b-256 7c2c5e970742b74757309a1173642204041492751a61fd191025b187dc148078

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page