YDB integration for taskiq
Project description
taskiq + ydb
Plugin for taskiq that adds a new result backend, broker and schedule source based on YDB.
Installation
This project can be installed using pip/poetry/uv (choose your preferred package manager):
pip install taskiq-ydb
Quick start
Basic task processing
- Define your broker with asyncpg:
# broker_example.py
import asyncio
from ydb.aio.driver import DriverConfig
from taskiq_ydb import YdbBroker, YdbResultBackend
driver_config = DriverConfig(
endpoint='grpc://localhost:2136',
database='/local',
)
broker = YdbBroker(
driver_config=driver_config,
).with_result_backend(
YdbResultBackend(driver_config=driver_config),
)
@broker.task('solve_all_problems')
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print('All problems are solved!')
async def main() -> None:
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()
if __name__ == '__main__':
asyncio.run(main())
- Start a worker to process tasks (by default taskiq runs two instances of worker):
taskiq worker broker_example:broker
- Run
broker_example.pyfile to send a task to the worker:
python broker_example.py
Your experience with other drivers will be pretty similar. Just change the import statement and that's it.
Task scheduling
- Define your broker and schedule source:
# scheduler_example.py
import asyncio
from taskiq import TaskiqScheduler
from ydb.aio.driver import DriverConfig
from taskiq_ydb import YdbBroker, YdbScheduleSource
driver_config = DriverConfig(
endpoint='grpc://localhost:2136',
database='/local',
)
broker = YdbBroker(driver_config=driver_config)
scheduler = TaskiqScheduler(
broker=broker,
sources=[
YdbScheduleSource(
driver_config=driver_config,
broker=broker,
),
],
)
@broker.task(
task_name='solve_all_problems',
schedule=[
{
'cron': '*/1 * * * *', # type: str, either cron or time should be specified.
'cron_offset': None, # type: str | timedelta | None, can be omitted.
'time': None, # type: datetime | None, either cron or time should be specified.
'args': [], # type list[Any] | None, can be omitted.
'kwargs': {}, # type: dict[str, Any] | None, can be omitted.
'labels': {}, # type: dict[str, Any] | None, can be omitted.
},
],
)
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print('All problems are solved!')
- Start worker processes:
taskiq worker scheduler_example:broker
- Run scheduler process:
taskiq scheduler scheduler_example:scheduler
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_ydb-0.4.0.tar.gz.
File metadata
- Download URL: taskiq_ydb-0.4.0.tar.gz
- Upload date:
- Size: 7.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
354c9a9769f2a6c164d0bab434caac78508b687db5d583523b53131b210e219e
|
|
| MD5 |
2ea314a3205c003e995e200ccbfb997a
|
|
| BLAKE2b-256 |
724382db49934230bb9b27f3b3c8ddbd82f93ebb6b2aae5bb17a2a7e4a6af31b
|
File details
Details for the file taskiq_ydb-0.4.0-py3-none-any.whl.
File metadata
- Download URL: taskiq_ydb-0.4.0-py3-none-any.whl
- Upload date:
- Size: 10.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3ccf18b377aa96f4241d4c49f9144f642f14757c724181f126e67874c39607e1
|
|
| MD5 |
1b9681f6f28370ef4a04e4a80a5efe40
|
|
| BLAKE2b-256 |
7c2c5e970742b74757309a1173642204041492751a61fd191025b187dc148078
|