Asynchronous task queue with async support
Project description
Taskiq
Taskiq is an asynchronous distributed task queue. This project takes inspiration from big projects such as Celery and Dramatiq. But taskiq can send and run both the sync and async functions. Also, we use PEP-612 to provide the best autosuggestions possible. But since it's a new PEP, I encourage you to use taskiq with VS code because Pylance understands all types correctly.
Installation
This project can be installed using pip:
pip install taskiq
Or it can be installed directly from git:
pip install git+https://github.com/taskiq-python/taskiq
Usage
Let's see the example with the in-memory broker:
import asyncio
from taskiq.brokers.inmemory_broker import InMemoryBroker
# This is broker that can be used for
# development or for demo purpose.
# In production environment consider using
# real distributed brokers, such as taskiq-aio-pika
# for rabbitmq.
broker = InMemoryBroker()
# Or you can optionally
# pass task_name as the parameter in the decorator.
@broker.task
async def my_async_task() -> None:
"""My lovely task."""
await asyncio.sleep(1)
print("Hello")
async def main():
# Kiq is the method that actually
# sends task over the network.
task = await my_async_task.kiq()
# Now we print result of execution.
print(await task.get_result())
asyncio.run(main())
You can run it with python without any extra actions,
since this script uses the InMemoryBroker
.
It won't send any data over the network, and you cannot use this type of broker in a real-world scenario, but it's useful for local development if you do not want to set up a taskiq worker.
Brokers
Brokers are simple. They don't execute functions, but they can send messages and listen to new messages.
Every broker implements the taskiq.abc.broker.AsyncBroker abstract class. All tasks are assigned to brokers, so every time you call the kiq
method, you send this task to the assigned broker. (This behavior can be changed, by using Kicker
directly).
Also you can add middlewares to brokers using add_middlewares
method.
Like this:
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
# This is broker that can be used for
# development or for demo purpose.
# In production environment consider using
# real distributed brokers, such as taskiq-aio-pika
# for rabbitmq.
broker = InMemoryBroker()
broker.add_middlewares(
[
SimpleRetryMiddleware(
default_retry_count=4,
)
]
)
To run middlewares properly you must add them using the add_middlewares
method.
It lead to errors if you try to add them by modifying broker directly.
Also brokers have formatters. You can change format of a message to be compitable with other task execution systems, so your migration to taskiq can be smoother.
Result backends
After task is complete it will try to save the results of execution
in result backends. By default brokers
use DummyResultBackend
wich doesn't do anything. It
won't print the result in logs and it always returns
None
as the return_value
, and 0 for execution_time
.
But some brokers can override it. For example InMemoryBroker
by default uses InMemoryResultBackend
and returns correct results.
CLI
Taskiq has a command line interface to run workers. It's very simple to get it to work.
You just have to provide path to your broker. As an example, if you want to start listen to new tasks
with broker in module my_project.broker
you just
have to run:
taskiq my_project.broker:broker
taskiq can discover tasks modules to import,
if you add the -fsd
(file system discover) option.
Let's assume we have project with the following structure:
test_project
├── broker.py
├── submodule
│ └── tasks.py
└── utils
└── tasks.py
You can specify all tasks modules to import manually.
taskiq test_project.broker:broker test_projec.submodule.tasks test_projec.utils.tasks
Or you can let taskiq find all python modules named tasks in current directory recursively.
taskiq test_project.broker:broker -fsd
You can always run --help
to see all possible options.
Middlewares
Middlewares are used to modify message, or take some actions after task is complete.
You can write your own middlewares by subclassing
the taskiq.abc.middleware.TaskiqMiddleware
.
Every hook can be sync or async. Taskiq will execute it.
For example, this is a valid middleware.
import asyncio
from taskiq.abc.middleware import TaskiqMiddleware
from taskiq.message import TaskiqMessage
class MyMiddleware(TaskiqMiddleware):
async def pre_send(self, message: "TaskiqMessage") -> TaskiqMessage:
await asyncio.sleep(1)
message.labels["my_label"] = "my_value"
return message
def post_send(self, message: "TaskiqMessage") -> None:
print(f"Message {message} was sent.")
You can use sync or async hooks without changing aything, but adding async to the hook signature.
Middlewares can store information in message.labels for
later use. For example SimpleRetryMiddleware
uses labels
to remember number of failed attempts.
Messages
Every message has labels. You can define labels
using task
decorator, or you can add them using kicker.
For example:
@broker.task(my_label=1, label2="something")
async def my_async_task() -> None:
"""My lovely task."""
await asyncio.sleep(1)
print("Hello")
async def main():
await my_async_task.kiq()
It's equivalent to this
@broker.task
async def my_async_task() -> None:
"""My lovely task."""
await asyncio.sleep(1)
print("Hello")
async def main():
await my_async_task.kicker().with_labels(
my_label=1,
label2="something",
).kiq()
Kicker
The kicker is the object that sends tasks. When you call kiq it generates a Kicker instance, remembering current broker and message labels. You can change the labels you want to use for this particular task or you can even change broker.
For example:
import asyncio
from taskiq.brokers.inmemory_broker import InMemoryBroker
broker = InMemoryBroker()
second_broker = InMemoryBroker()
@broker.task
async def my_async_task() -> None:
"""My lovely task."""
await asyncio.sleep(1)
print("Hello")
async def main():
task = await my_async_task.kicker().with_broker(second_broker).kiq()
print(await task.get_result())
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.