Task cancellation for taskiq
Project description
taskiq-cancellation aims to be a drop-in task cancellation solution for taskiq as the original package doesn't provide a cancellation API.
Contents:
Installation
This package can be install from PyPI with your package manager of choice.
pip install taskiq-cancellation
pipx install taskiq-cancellation
poetry add taskiq-cancellation
uv add taskiq-cancellation
taskiq-cancellation currently provides integrations with Redis and RabbitMQ that are installable with redis and aiopika extras respectfully.
pip install taskiq-cancellation[redis,aiopika]
Usage
To do task cancellation, you need to:
- Create a cancellation backend
- Wrap a function with
cancellabledecorator - Cancel the task with
cancel(task_id)
broker = PubSubBroker(url).with_result_backend(RedisAsyncResultBackend(url))
cancellation_backend = RedisCancellationBackend(url).with_broker(broker)
@broker.task
@cancellation_backend.cancellable
async def sleep(seconds: int):
await asyncio.sleep(seconds)
print("Slept!") # Won't be printed on worker side because of the cancellation
async def main():
await broker.startup()
task = await sleep.kiq(5)
await cancellation_backend.cancel(task.task_id)
await broker.shutdown()
asyncio.run(main())
What is a cancellation backend?
Cancellation backend can be seen as combination of a broker and result backend for cancellation messages that works underneath taskiq's broker. Cancellation backend won't run tasks marked as cancelled and will listen for cancellation messages for already running tasks.
Modular cancellation backend
To easily create cancellation backends taskiq-cancellation provides ModularCancellationBackend. Modular cancellation backend consists of two parts: state holder and notifier.
- State holder is used to check for task cancellation status before running the task.
- Notifier is used to listen for cancellation messages while running the task
This allows to use any techonology for task cancellation. For example, if one uses SQL database and RabbitMQ message broker, they can make a custom state holder with SQL library of their choice and use provided RabbitMQ notifier.
from taskiq_cancellation import ModularCancellationBackend
from taskiq_cancellation.state_holders.redis import RedisCancellationStateHolder
from taskiq_cancellation.notifiers.aiopika import AioPikaCancellationNotifier
backend = ModularCancellationBackend(
RedisCancellationStateHolder("redis://localhost:6379"),
AioPikaCancellationNotifier("amqp://guest:guest@localhost:5672")
)
Available integrations
taskiq-cancellation provides:
- state holder for Redis (
RedisCancellationStateHolder) - notifiers for Redis pub/sub (
PubSubCancellationNotifier) and RabbitMQ (AioPikaCancellationNotifier)
Also there are NullCancellationStateHolder and NullCancellationNotifier that do absolutely nothing, if there's no need to not check for task cancellation before starting the task or no need to listen for cancellation of already running tasks.
Level and edge cancellation
By default, taskiq-cancellation uses anyio and its level cancellation. Level cancellation raises a cancellation exception on every asynchronous wait in a function.
As external libraries might not support level cancellation, task-cancellation also provides edge cancellation via asyncio. Edge cancellation raises an exception only once. To enable it, add cancellation_type=CancellationType.EDGE parameter to cancellable decorator.
[!WARNING] Currently edge cancellation is supported only for Python 3.11+ because it uses
asyncio.TaskGroup
Example:
from sqlalchemy.ext.asyncio import AsyncSession
from taskiq_cancellation import CancellationType
@broker.task
@cancellation_backend.cancellable(cancellation_type=CancellationType.EDGE)
async def sleep(seconds: int):
session = AsyncSession(engine)
try:
async with session.begin():
await asyncio.sleep(seconds)
session.add(SleptFor(seconds))
except asyncio.CancelledError:
# Won't raise cancelled exception
await session.close()
raise
Retry middlewares with task cancellation
If you use SimpleRetryMiddleware or SmartRetryMiddleware, make sure to add TaskCancellationException to types_of_exceptions parameter to not trigger additional retries.
from taskiq_cancellation.exceptions import TaskCancellationException
broker = PubSubBroker(url)
.with_result_backend(RedisAsyncResultBackend(url))
.with_middlewares(
SimpleRetryMiddleware(
types_of_exceptions=[TaskCancellationException, ]
)
)
Development
For linting, ruff is used
ruff check
ruff format
For testing, pytest is used
pytest tests/unit # Unit tests
# Integration tests
docker compose -f docker-compose-tests.yml up --wait
pytest tests/integration
Contributing
If you have any issues with this package or have an idea for improvement, please don't hesitate to open an issue! This is my first open-source project so I would like to ask to be a little patient with me though 🙏
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_cancellation-0.0.1.tar.gz.
File metadata
- Download URL: taskiq_cancellation-0.0.1.tar.gz
- Upload date:
- Size: 14.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1fe60a4213b8c3189f3519db3ca8379d66f389858229e3250dc4425154d442f8
|
|
| MD5 |
178afd09dba2a91730fa51d40d46d97f
|
|
| BLAKE2b-256 |
63d8ae662c1bb5eef1021d620e5a9d968cb01c906d7689ecb1c65380fa5ce9c0
|
File details
Details for the file taskiq_cancellation-0.0.1-py3-none-any.whl.
File metadata
- Download URL: taskiq_cancellation-0.0.1-py3-none-any.whl
- Upload date:
- Size: 24.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0f3e8cb37e67e973f406124b93fdc7000d720a12f0187344a37e04dd3f195085
|
|
| MD5 |
f87889f81694b460ad0c31bc798153f7
|
|
| BLAKE2b-256 |
044c16eb1df685312f2e0e518abe523ec3387f9fd15063d449a9b10ddf88950b
|