Skip to main content

Asynchronous task queue with async support

Project description

Taskiq

Taskiq is an asynchronous distributed task queue. This project takes inspiration from big projects such as Celery and Dramatiq. But taskiq can send and run both the sync and async functions. Also, we use PEP-612 to provide the best autosuggestions possible. But since it's a new PEP, I encourage you to use taskiq with VS code because Pylance understands all types correctly.

Installation

This project can be installed using pip:

pip install taskiq

Or it can be installed directly from git:

pip install git+https://github.com/taskiq-python/taskiq

Usage

Let's see the example with the in-memory broker:

import asyncio

from taskiq.brokers.inmemory_broker import InMemoryBroker


# This is broker that can be used for
# development or for demo purpose.
# In production environment consider using
# real distributed brokers, such as taskiq-aio-pika
# for rabbitmq.
broker = InMemoryBroker()


# Or you can optionally
# pass task_name as the parameter in the decorator.
@broker.task
async def my_async_task() -> None:
    """My lovely task."""
    await asyncio.sleep(1)
    print("Hello")


async def main():
    # Kiq is the method that actually
    # sends task over the network.
    task = await my_async_task.kiq()
    # Now we print result of execution.
    print(await task.get_result())


asyncio.run(main())

You can run it with python without any extra actions, since this script uses the InMemoryBroker.

It won't send any data over the network, and you cannot use this type of broker in a real-world scenario, but it's useful for local development if you do not want to set up a taskiq worker.

Brokers

Brokers are simple. They don't execute functions, but they can send messages and listen to new messages.

Every broker implements the taskiq.abc.broker.AsyncBroker abstract class. All tasks are assigned to brokers, so every time you call the kiq method, you send this task to the assigned broker. (This behavior can be changed, by using Kicker directly).

Also you can add middlewares to brokers using add_middlewares method.

Like this:

from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware

# This is broker that can be used for
# development or for demo purpose.
# In production environment consider using
# real distributed brokers, such as taskiq-aio-pika
# for rabbitmq.
broker = InMemoryBroker()
broker.add_middlewares(
    [
        SimpleRetryMiddleware(
            default_retry_count=4,
        )
    ]
)

To run middlewares properly you must add them using the add_middlewares method. It lead to errors if you try to add them by modifying broker directly.

Also brokers have formatters. You can change format of a message to be compitable with other task execution systems, so your migration to taskiq can be smoother.

Result backends

After task is complete it will try to save the results of execution in result backends. By default brokers use DummyResultBackend wich doesn't do anything. It won't print the result in logs and it always returns None as the return_value, and 0 for execution_time. But some brokers can override it. For example InMemoryBroker by default uses InMemoryResultBackend and returns correct results.

CLI

Taskiq has a command line interface to run workers. It's very simple to get it to work.

You just have to provide path to your broker. As an example, if you want to start listen to new tasks with broker in module my_project.broker you just have to run:

taskiq my_project.broker:broker

taskiq can discover tasks modules to import, if you add the -fsd (file system discover) option.

Let's assume we have project with the following structure:

test_project
├── broker.py
├── submodule
│   └── tasks.py
└── utils
    └── tasks.py

You can specify all tasks modules to import manually.

taskiq test_project.broker:broker test_projec.submodule.tasks test_projec.utils.tasks

Or you can let taskiq find all python modules named tasks in current directory recursively.

taskiq test_project.broker:broker -fsd

You can always run --help to see all possible options.

Middlewares

Middlewares are used to modify message, or take some actions after task is complete.

You can write your own middlewares by subclassing the taskiq.abc.middleware.TaskiqMiddleware.

Every hook can be sync or async. Taskiq will execute it.

For example, this is a valid middleware.

import asyncio

from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.message import TaskiqMessage


class MyMiddleware(TaskiqMiddleware):
    async def pre_send(self, message: "TaskiqMessage") -> TaskiqMessage:
        await asyncio.sleep(1)
        message.labels["my_label"] = "my_value"
        return message

    def post_send(self, message: "TaskiqMessage") -> None:
        print(f"Message {message} was sent.")

You can use sync or async hooks without changing aything, but adding async to the hook signature.

Middlewares can store information in message.labels for later use. For example SimpleRetryMiddleware uses labels to remember number of failed attempts.

Messages

Every message has labels. You can define labels using task decorator, or you can add them using kicker.

For example:

@broker.task(my_label=1, label2="something")
async def my_async_task() -> None:
    """My lovely task."""
    await asyncio.sleep(1)
    print("Hello")

async def main():
    await my_async_task.kiq()

It's equivalent to this

@broker.task
async def my_async_task() -> None:
    """My lovely task."""
    await asyncio.sleep(1)
    print("Hello")

async def main():
    await my_async_task.kicker().with_labels(
        my_label=1,
        label2="something",
    ).kiq()

Kicker

The kicker is the object that sends tasks. When you call kiq it generates a Kicker instance, remembering current broker and message labels. You can change the labels you want to use for this particular task or you can even change broker.

For example:

import asyncio

from taskiq.brokers.inmemory_broker import InMemoryBroker

broker = InMemoryBroker()
second_broker = InMemoryBroker()


@broker.task
async def my_async_task() -> None:
    """My lovely task."""
    await asyncio.sleep(1)
    print("Hello")


async def main():
    task = await my_async_task.kicker().with_broker(second_broker).kiq()
    print(await task.get_result())


asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq-0.0.1.tar.gz (23.8 kB view hashes)

Uploaded Source

Built Distribution

taskiq-0.0.1-py3-none-any.whl (29.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page