Skip to main content

Kafka broker for taskiq

Project description

AioKafka broker for taskiq

PyPI - Python Version PyPI PyPI - Downloads

This library provides you with aiokafka broker for taskiq.

Usage:

from taskiq_aio_kafka import AioKafkaBroker

broker = AioKafkaBroker(bootstrap_servers="localhost")

@broker.task
async def test() -> None:
    print("The best task ever!")

Non-obvious things

You can configure kafka producer and consumer with special methods configure_producer and configure_consumer. Example:

from taskiq_aio_kafka import AioKafkaBroker

broker = AioKafkaBroker(bootstrap_servers="localhost")

# configure producer, you can set any parameter from
# base AIOKafkaProducer, except `loop` and `bootstrap_servers`
broker.configure_producer(request_timeout_ms=100000)

# configure consumer, you can set any parameter from
# base AIOKafkaConsumer, except `loop` and `bootstrap_servers`
broker.configure_consumer(group_id="the best group ever.")

Multiple topics

By default AioKafkaBroker sends all tasks to kafka_topic. You can also configure the broker to listen to multiple topics and bind different tasks to different default topics.

from taskiq_aio_kafka import AioKafkaBroker
from taskiq_aio_kafka.topic import Topic

broker = AioKafkaBroker(
    bootstrap_servers="localhost",
    kafka_topic="default-topic",
    kafka_topics=[
        Topic("emails"),
        Topic("reports"),
    ],
)


@broker.task_with_topic("emails")
async def send_email(user_id: int) -> None:
    print(f"Send email to {user_id}")


@broker.task_with_topic("reports")
async def build_report(report_id: int) -> None:
    print(f"Build report {report_id}")

In this example the worker listens to default-topic, emails, and reports. When you call send_email.kiq(...), the task is sent to emails by default. When you call build_report.kiq(...), the task is sent to reports by default.

You can override a task topic for a single kick with kicker().with_topic(...):

await send_email.kicker().with_topic("reports").kiq(user_id=1)

Tasks without a custom topic keep the old behavior and are sent to kafka_topic. The regular @broker.task decorator keeps the standard taskiq labels behavior.

@broker.task
async def regular_task() -> None:
    print("This task goes to default-topic.")


await regular_task.kiq()

Configuration

AioKafkaBroker parameters:

  • bootstrap_servers - url to kafka nodes. Can be either string or list of strings.
  • kafka_topic - default topic in kafka.
  • kafka_topics - additional topics that worker should listen to.
  • result_backend - custom result backend.
  • task_id_generator - custom task_id genertaor.
  • kafka_admin_client - custom kafka admin client.
  • delete_topic_on_shutdown - flag to delete topics on broker shutdown.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_aio_kafka-1.0.0.tar.gz (122.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_aio_kafka-1.0.0-py3-none-any.whl (10.2 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_aio_kafka-1.0.0.tar.gz.

File metadata

  • Download URL: taskiq_aio_kafka-1.0.0.tar.gz
  • Upload date:
  • Size: 122.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_aio_kafka-1.0.0.tar.gz
Algorithm Hash digest
SHA256 d2c378544abfbb54e79a4323322b4b402e9beeff6052aa82435c552bc7c435e3
MD5 309e458df044e8585b182415fba323a4
BLAKE2b-256 c2370e03ddc519f64e7f79b46ce860389b8aeaeefeed9e01351099010a2b0422

See more details on using hashes here.

File details

Details for the file taskiq_aio_kafka-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: taskiq_aio_kafka-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 10.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for taskiq_aio_kafka-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 af66e53b94a986744dab042f0765e833539e7ad3c648e67e9f92624b47a8399b
MD5 e7c7d03040d8006e6dae9b36cfe8532b
BLAKE2b-256 f2b621e76a922795893592c01190a78fe2e3dfcb6a1199daafd873132ab6986e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page