Skip to main content

Task cancellation for taskiq

Project description

taskiq-cancellation logo

PyPI - Version PyPI - Python Version

taskiq-cancellation aims to be a drop-in task cancellation solution for taskiq as the original package doesn't provide a cancellation API.

Contents:

Installation

This package can be install from PyPI with your package manager of choice.

pip install taskiq-cancellation
pipx install taskiq-cancellation
poetry add taskiq-cancellation
uv add taskiq-cancellation

taskiq-cancellation currently provides integrations with Redis and RabbitMQ that are installable with redis and aiopika extras respectfully.

pip install taskiq-cancellation[redis,aiopika]

Usage

To do task cancellation, you need to:

  1. Create a cancellation backend
  2. Wrap a function with cancellable decorator
  3. Cancel the task with cancel(task_id)
broker = PubSubBroker(url).with_result_backend(RedisAsyncResultBackend(url))
cancellation_backend = RedisCancellationBackend(url).with_broker(broker)

@broker.task
@cancellation_backend.cancellable
async def sleep(seconds: int):
    await asyncio.sleep(seconds)
    print("Slept!")  # Won't be printed on worker side because of the cancellation

async def main():
    await broker.startup()

    task = await sleep.kiq(5)
    await cancellation_backend.cancel(task.task_id)

    await broker.shutdown()

asyncio.run(main())

What is a cancellation backend?

Cancellation backend can be seen as combination of a broker and result backend for cancellation messages that works underneath taskiq's broker. Cancellation backend won't run tasks marked as cancelled and will listen for cancellation messages for already running tasks.

Cancellation backend example scheme

Modular cancellation backend

To easily create cancellation backends taskiq-cancellation provides ModularCancellationBackend. Modular cancellation backend consists of two parts: state holder and notifier.

  • State holder is used to check for task cancellation status before running the task.
  • Notifier is used to listen for cancellation messages while running the task

This allows to use any techonology for task cancellation. For example, if one uses SQL database and RabbitMQ message broker, they can make a custom state holder with SQL library of their choice and use provided RabbitMQ notifier.

from taskiq_cancellation import ModularCancellationBackend
from taskiq_cancellation.state_holders.redis import RedisCancellationStateHolder
from taskiq_cancellation.notifiers.aiopika import AioPikaCancellationNotifier

backend = ModularCancellationBackend(
    RedisCancellationStateHolder("redis://localhost:6379"),
    AioPikaCancellationNotifier("amqp://guest:guest@localhost:5672")
)

Available integrations

taskiq-cancellation provides:

  • state holder for Redis (RedisCancellationStateHolder)
  • notifiers for Redis pub/sub (PubSubCancellationNotifier) and RabbitMQ (AioPikaCancellationNotifier)

Also there are NullCancellationStateHolder and NullCancellationNotifier that do absolutely nothing, if there's no need to not check for task cancellation before starting the task or no need to listen for cancellation of already running tasks.

Level and edge cancellation

By default, taskiq-cancellation uses anyio and its level cancellation. Level cancellation raises a cancellation exception on every asynchronous wait in a function.

As external libraries might not support level cancellation, task-cancellation also provides edge cancellation via asyncio. Edge cancellation raises an exception only once. To enable it, add cancellation_type=CancellationType.EDGE parameter to cancellable decorator.

[!WARNING] Currently edge cancellation is supported only for Python 3.11+ because it uses asyncio.TaskGroup

Example:

from sqlalchemy.ext.asyncio import AsyncSession
from taskiq_cancellation import CancellationType

@broker.task
@cancellation_backend.cancellable(cancellation_type=CancellationType.EDGE)
async def sleep(seconds: int):
    session = AsyncSession(engine)

    try:
        async with session.begin():
            await asyncio.sleep(seconds)
            session.add(SleptFor(seconds))
    except asyncio.CancelledError:
        # Won't raise cancelled exception
        await session.close()
        raise

Retry middlewares with task cancellation

If you use SimpleRetryMiddleware or SmartRetryMiddleware, make sure to add TaskCancellationException to types_of_exceptions parameter to not trigger additional retries.

from taskiq_cancellation.exceptions import TaskCancellationException

broker = PubSubBroker(url)
    .with_result_backend(RedisAsyncResultBackend(url))
    .with_middlewares(
        SimpleRetryMiddleware(
            types_of_exceptions=[TaskCancellationException, ]
        )
    )

Development

For linting, ruff is used

ruff check
ruff format

For testing, pytest is used

pytest tests/unit # Unit tests

# Integration tests
docker compose -f docker-compose-tests.yml up --wait
pytest tests/integration

Contributing

If you have any issues with this package or have an idea for improvement, please don't hesitate to open an issue! This is my first open-source project so I would like to ask to be a little patient with me though 🙏

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_cancellation-0.0.1.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_cancellation-0.0.1-py3-none-any.whl (24.0 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_cancellation-0.0.1.tar.gz.

File metadata

  • Download URL: taskiq_cancellation-0.0.1.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.7

File hashes

Hashes for taskiq_cancellation-0.0.1.tar.gz
Algorithm Hash digest
SHA256 1fe60a4213b8c3189f3519db3ca8379d66f389858229e3250dc4425154d442f8
MD5 178afd09dba2a91730fa51d40d46d97f
BLAKE2b-256 63d8ae662c1bb5eef1021d620e5a9d968cb01c906d7689ecb1c65380fa5ce9c0

See more details on using hashes here.

File details

Details for the file taskiq_cancellation-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for taskiq_cancellation-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0f3e8cb37e67e973f406124b93fdc7000d720a12f0187344a37e04dd3f195085
MD5 f87889f81694b460ad0c31bc798153f7
BLAKE2b-256 044c16eb1df685312f2e0e518abe523ec3387f9fd15063d449a9b10ddf88950b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page