Kafka broker for taskiq
Project description
AioKafka broker for taskiq
This library provides you with aiokafka broker for taskiq.
Usage:
from taskiq_aio_kafka import AioKafkaBroker
broker = AioKafkaBroker(bootstrap_servers="localhost")
@broker.task
async def test() -> None:
print("The best task ever!")
Non-obvious things
You can configure kafka producer and consumer with special methods configure_producer and configure_consumer.
Example:
from taskiq_aio_kafka import AioKafkaBroker
broker = AioKafkaBroker(bootstrap_servers="localhost")
# configure producer, you can set any parameter from
# base AIOKafkaProducer, except `loop` and `bootstrap_servers`
broker.configure_producer(request_timeout_ms=100000)
# configure consumer, you can set any parameter from
# base AIOKafkaConsumer, except `loop` and `bootstrap_servers`
broker.configure_consumer(group_id="the best group ever.")
Multiple topics
By default AioKafkaBroker sends all tasks to kafka_topic.
You can also configure the broker to listen to multiple topics and bind
different tasks to different default topics.
from taskiq_aio_kafka import AioKafkaBroker
from taskiq_aio_kafka.topic import Topic
broker = AioKafkaBroker(
bootstrap_servers="localhost",
kafka_topic="default-topic",
kafka_topics=[
Topic("emails"),
Topic("reports"),
],
)
@broker.task_with_topic("emails")
async def send_email(user_id: int) -> None:
print(f"Send email to {user_id}")
@broker.task_with_topic("reports")
async def build_report(report_id: int) -> None:
print(f"Build report {report_id}")
In this example the worker listens to default-topic, emails, and reports.
When you call send_email.kiq(...), the task is sent to emails by default.
When you call build_report.kiq(...), the task is sent to reports by default.
You can override a task topic for a single kick with kicker().with_topic(...):
await send_email.kicker().with_topic("reports").kiq(user_id=1)
Tasks without a custom topic keep the old behavior and are sent to kafka_topic.
The regular @broker.task decorator keeps the standard taskiq labels behavior.
@broker.task
async def regular_task() -> None:
print("This task goes to default-topic.")
await regular_task.kiq()
Configuration
AioKafkaBroker parameters:
bootstrap_servers- url to kafka nodes. Can be either string or list of strings.kafka_topic- default topic in kafka.kafka_topics- additional topics that worker should listen to.result_backend- custom result backend.task_id_generator- custom task_id genertaor.kafka_admin_client- customkafkaadmin client.delete_topic_on_shutdown- flag to delete topics on broker shutdown.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskiq_aio_kafka-1.0.0.tar.gz.
File metadata
- Download URL: taskiq_aio_kafka-1.0.0.tar.gz
- Upload date:
- Size: 122.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d2c378544abfbb54e79a4323322b4b402e9beeff6052aa82435c552bc7c435e3
|
|
| MD5 |
309e458df044e8585b182415fba323a4
|
|
| BLAKE2b-256 |
c2370e03ddc519f64e7f79b46ce860389b8aeaeefeed9e01351099010a2b0422
|
File details
Details for the file taskiq_aio_kafka-1.0.0-py3-none-any.whl.
File metadata
- Download URL: taskiq_aio_kafka-1.0.0-py3-none-any.whl
- Upload date:
- Size: 10.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
af66e53b94a986744dab042f0765e833539e7ad3c648e67e9f92624b47a8399b
|
|
| MD5 |
e7c7d03040d8006e6dae9b36cfe8532b
|
|
| BLAKE2b-256 |
f2b621e76a922795893592c01190a78fe2e3dfcb6a1199daafd873132ab6986e
|