Skip to main content

Apache Kafka event bus backend for varco — KafkaEventBus built on aiokafka

Project description

varco-kafka

Apache Kafka event bus backend for varco.

KafkaEventBus implements AbstractEventBus from varco_core using aiokafka. Published events are serialized to JSON (via JsonEventSerializer) and sent to Kafka topics. A background consumer task reads from those topics and dispatches messages to locally registered handlers.


Installation

pip install varco-kafka
# or with uv:
uv add varco-kafka

Quick start

from varco_kafka import KafkaEventBus, KafkaEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen, Event

# Define your events
class OrderPlacedEvent(Event):
    __event_type__ = "order.placed"
    order_id: str
    total: float

# Configure the bus
settings = KafkaEventBusSettings(
    bootstrap_servers="localhost:9092",
    group_id="order-service",
)

async def main():
    async with KafkaEventBus(settings) as bus:
        # --- Consumer side ---
        class OrderConsumer(EventConsumer):
            @listen(OrderPlacedEvent, channel="orders")
            async def on_placed(self, event: OrderPlacedEvent) -> None:
                print(f"Order placed: {event.order_id}")

        OrderConsumer().register_to(bus)

        # --- Producer side ---
        producer = BusEventProducer(bus)
        await producer._produce(
            OrderPlacedEvent(order_id="abc", total=99.0),
            channel="orders",
        )

Configuration

Event bus

from varco_kafka import KafkaEventBusSettings

settings = KafkaEventBusSettings(
    bootstrap_servers="kafka.internal:9092",   # broker address(es)
    group_id="my-service",                     # consumer group ID
    channel_prefix="prod.",                    # optional — "orders" → "prod.orders"
    auto_offset_reset="latest",                # "latest" or "earliest"
    enable_auto_commit=True,                   # at-least-once delivery
)
Field Default Env var Description
bootstrap_servers "localhost:9092" VARCO_KAFKA_BOOTSTRAP_SERVERS Kafka broker address(es)
group_id "varco-default" VARCO_KAFKA_GROUP_ID Consumer group ID
channel_prefix "" VARCO_KAFKA_CHANNEL_PREFIX Prepended to every topic name
auto_offset_reset "latest" VARCO_KAFKA_AUTO_OFFSET_RESET Offset policy for new consumer groups
enable_auto_commit True VARCO_KAFKA_ENABLE_AUTO_COMMIT Auto-commit consumer offsets
producer_kwargs {} Extra kwargs for AIOKafkaProducer
consumer_kwargs {} Extra kwargs for AIOKafkaConsumer

Channel manager (topic administration)

KafkaChannelManager handles Kafka topic creation and deletion. It uses a separate settings class so admin credentials never bleed into the bus:

from varco_kafka import KafkaChannelManager, KafkaChannelManagerSettings

admin_settings = KafkaChannelManagerSettings(
    bootstrap_servers="kafka.internal:9092",
    # env prefix: VARCO_KAFKA_ADMIN_
)

async with KafkaChannelManager(admin_settings) as manager:
    await manager.declare_channel("orders")      # create topic if absent
    await manager.delete_channel("orders")       # delete topic
Field Default Env var Description
bootstrap_servers "localhost:9092" VARCO_KAFKA_ADMIN_BOOTSTRAP_SERVERS Kafka broker address(es)
admin_kwargs {} Extra kwargs for AIOKafkaAdminClient

Lifecycle

# Explicit lifecycle
bus = KafkaEventBus(settings)
await bus.start()     # connects producer, starts consumer task
# ... use bus ...
await bus.stop()      # flushes producer, cancels consumer task

# Context manager (recommended)
async with KafkaEventBus(settings) as bus:
    ...

DI integration

varco_kafka ships two @Configuration classes so you can install only what each service needs:

from providify import DIContainer
from varco_core.event import AbstractEventBus, ChannelManager
from varco_kafka import KafkaEventBusConfiguration, KafkaChannelManagerConfiguration

# Services that only publish/consume events
container = DIContainer()
await container.ainstall(KafkaEventBusConfiguration)
bus = await container.aget(AbstractEventBus)

# Admin services that also manage topics
await container.ainstall(KafkaChannelManagerConfiguration)
manager = await container.aget(ChannelManager)

await container.ashutdown()

Running tests

# Unit tests (no Kafka required)
uv sync
uv run pytest

# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration

Delivery semantics

KafkaEventBus provides at-least-once delivery with enable_auto_commit=True. Consumer errors are logged and do not stop the consumer loop.

For exactly-once semantics, configure Kafka transactions in producer_kwargs / consumer_kwargs — this is out of scope for the bus itself.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_kafka-0.1.0.tar.gz (37.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_kafka-0.1.0-py3-none-any.whl (29.8 kB view details)

Uploaded Python 3

File details

Details for the file varco_kafka-0.1.0.tar.gz.

File metadata

  • Download URL: varco_kafka-0.1.0.tar.gz
  • Upload date:
  • Size: 37.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_kafka-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ee3579267cb60cb00c0c08260ef1158a47b158a406272591b442c8a3fe346f58
MD5 d70a14b31511816bed2fd04abb260d75
BLAKE2b-256 a68c6474a2b8e2dfd3695a80ad47ae959bb8e3050fd33f00e4d3e1712a6e72e0

See more details on using hashes here.

File details

Details for the file varco_kafka-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: varco_kafka-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 29.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_kafka-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d6665b9afe7bbe2c8bbd10a9b55c388dfdb53d442ca7833f7bd4d9c55af61b2a
MD5 6308499e1de69408c6c24896966a1e02
BLAKE2b-256 9d1daec8bbdc10056e31f4f54361f4bd1bd2c4ff1b046401999a13b10dbef255

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page