Skip to main content

Apache Kafka event bus backend for varco — KafkaEventBus built on aiokafka

Project description

varco-kafka

Apache Kafka event bus backend for varco.

KafkaEventBus implements AbstractEventBus from varco_core using aiokafka. Published events are serialized to JSON (via JsonEventSerializer) and sent to Kafka topics. A background consumer task reads from those topics and dispatches messages to locally registered handlers.


Installation

pip install varco-kafka
# or with uv:
uv add varco-kafka

Quick start

from varco_kafka import KafkaEventBus, KafkaConfig
from varco_core.event import BusEventProducer, EventConsumer, listen, Event

# Define your events
class OrderPlacedEvent(Event):
    __event_type__ = "order.placed"
    order_id: str
    total: float

# Configure the bus
config = KafkaConfig(
    bootstrap_servers="localhost:9092",
    group_id="order-service",
)

async def main():
    async with KafkaEventBus(config) as bus:
        # --- Consumer side ---
        class OrderConsumer(EventConsumer):
            @listen(OrderPlacedEvent, channel="orders")
            async def on_placed(self, event: OrderPlacedEvent) -> None:
                print(f"Order placed: {event.order_id}")

        OrderConsumer().register_to(bus)

        # --- Producer side ---
        producer = BusEventProducer(bus)
        await producer._produce(
            OrderPlacedEvent(order_id="abc", total=99.0),
            channel="orders",
        )

Configuration

from varco_kafka import KafkaConfig

config = KafkaConfig(
    bootstrap_servers="kafka.internal:9092",   # broker address(es)
    group_id="my-service",                     # consumer group ID
    topic_prefix="prod.",                      # optional — "orders" → "prod.orders"
    auto_offset_reset="latest",                # "latest" or "earliest"
    enable_auto_commit=True,                   # at-least-once delivery
)
Field Default Description
bootstrap_servers "localhost:9092" Kafka broker address(es)
group_id "varco-default" Consumer group ID
topic_prefix "" Prepended to every topic name
auto_offset_reset "latest" Offset policy for new consumer groups
enable_auto_commit True Auto-commit consumer offsets
producer_kwargs {} Extra kwargs for AIOKafkaProducer
consumer_kwargs {} Extra kwargs for AIOKafkaConsumer

Lifecycle

# Explicit lifecycle
bus = KafkaEventBus(config)
await bus.start()     # connects producer, starts consumer task
# ... use bus ...
await bus.stop()      # flushes producer, cancels consumer task

# Context manager (recommended)
async with KafkaEventBus(config) as bus:
    ...

Running tests

# Unit tests (no Kafka required)
uv sync
uv run pytest

# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration

Delivery semantics

KafkaEventBus provides at-least-once delivery with enable_auto_commit=True. Consumer errors are logged and do not stop the consumer loop.

For exactly-once semantics, configure Kafka transactions in producer_kwargs / consumer_kwargs — this is out of scope for the bus itself.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_kafka-0.0.1.tar.gz (18.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_kafka-0.0.1-py3-none-any.whl (14.5 kB view details)

Uploaded Python 3

File details

Details for the file varco_kafka-0.0.1.tar.gz.

File metadata

  • Download URL: varco_kafka-0.0.1.tar.gz
  • Upload date:
  • Size: 18.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_kafka-0.0.1.tar.gz
Algorithm Hash digest
SHA256 be47db45fdf458f27ca04d37fb2325fa8446a480bca6a98bb9da721e5e6b8f8d
MD5 0f99c3df980d91a8a37ee938d67aa807
BLAKE2b-256 c0b721a72764b5a532de041a4461d668fde5718d21cfd9abd9d8787c27b08908

See more details on using hashes here.

File details

Details for the file varco_kafka-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: varco_kafka-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 14.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_kafka-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8e29cb55d9d00f3dd29144cec10e50b47c1cf2b0eb76a9646fa8cf15d2cdfc03
MD5 a727992dcdea7015ed901056ace3de89
BLAKE2b-256 9a250641caa5a6d15ac38d1fd6928c11f70c3a8067f6d841593be3bb137f0db7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page