Apache Kafka event bus backend for varco — KafkaEventBus built on aiokafka
Project description
varco-kafka
Apache Kafka event bus backend for varco.
KafkaEventBus implements AbstractEventBus from varco_core using
aiokafka. Published events are serialized
to JSON (via JsonEventSerializer) and sent to Kafka topics. A background
consumer task reads from those topics and dispatches messages to locally
registered handlers.
Installation
pip install varco-kafka
# or with uv:
uv add varco-kafka
Quick start
from varco_kafka import KafkaEventBus, KafkaEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen, Event
# Define your events
class OrderPlacedEvent(Event):
__event_type__ = "order.placed"
order_id: str
total: float
# Configure the bus
settings = KafkaEventBusSettings(
bootstrap_servers="localhost:9092",
group_id="order-service",
)
async def main():
async with KafkaEventBus(settings) as bus:
# --- Consumer side ---
class OrderConsumer(EventConsumer):
@listen(OrderPlacedEvent, channel="orders")
async def on_placed(self, event: OrderPlacedEvent) -> None:
print(f"Order placed: {event.order_id}")
OrderConsumer().register_to(bus)
# --- Producer side ---
producer = BusEventProducer(bus)
await producer._produce(
OrderPlacedEvent(order_id="abc", total=99.0),
channel="orders",
)
Configuration
Event bus
from varco_kafka import KafkaEventBusSettings
settings = KafkaEventBusSettings(
bootstrap_servers="kafka.internal:9092", # broker address(es)
group_id="my-service", # consumer group ID
channel_prefix="prod.", # optional — "orders" → "prod.orders"
auto_offset_reset="latest", # "latest" or "earliest"
enable_auto_commit=True, # at-least-once delivery
)
| Field | Default | Env var | Description |
|---|---|---|---|
bootstrap_servers |
"localhost:9092" |
VARCO_KAFKA_BOOTSTRAP_SERVERS |
Kafka broker address(es) |
group_id |
"varco-default" |
VARCO_KAFKA_GROUP_ID |
Consumer group ID |
channel_prefix |
"" |
VARCO_KAFKA_CHANNEL_PREFIX |
Prepended to every topic name |
auto_offset_reset |
"latest" |
VARCO_KAFKA_AUTO_OFFSET_RESET |
Offset policy for new consumer groups |
enable_auto_commit |
True |
VARCO_KAFKA_ENABLE_AUTO_COMMIT |
Auto-commit consumer offsets |
producer_kwargs |
{} |
— | Extra kwargs for AIOKafkaProducer |
consumer_kwargs |
{} |
— | Extra kwargs for AIOKafkaConsumer |
Channel manager (topic administration)
KafkaChannelManager handles Kafka topic creation and deletion. It uses a
separate settings class so admin credentials never bleed into the bus:
from varco_kafka import KafkaChannelManager, KafkaChannelManagerSettings
admin_settings = KafkaChannelManagerSettings(
bootstrap_servers="kafka.internal:9092",
# env prefix: VARCO_KAFKA_ADMIN_
)
async with KafkaChannelManager(admin_settings) as manager:
await manager.declare_channel("orders") # create topic if absent
await manager.delete_channel("orders") # delete topic
| Field | Default | Env var | Description |
|---|---|---|---|
bootstrap_servers |
"localhost:9092" |
VARCO_KAFKA_ADMIN_BOOTSTRAP_SERVERS |
Kafka broker address(es) |
admin_kwargs |
{} |
— | Extra kwargs for AIOKafkaAdminClient |
Lifecycle
# Explicit lifecycle
bus = KafkaEventBus(settings)
await bus.start() # connects producer, starts consumer task
# ... use bus ...
await bus.stop() # flushes producer, cancels consumer task
# Context manager (recommended)
async with KafkaEventBus(settings) as bus:
...
DI integration
varco_kafka ships two @Configuration classes so you can install only what
each service needs:
from providify import DIContainer
from varco_core.event import AbstractEventBus, ChannelManager
from varco_kafka import KafkaEventBusConfiguration, KafkaChannelManagerConfiguration
# Services that only publish/consume events
container = DIContainer()
await container.ainstall(KafkaEventBusConfiguration)
bus = await container.aget(AbstractEventBus)
# Admin services that also manage topics
await container.ainstall(KafkaChannelManagerConfiguration)
manager = await container.aget(ChannelManager)
await container.ashutdown()
Running tests
# Unit tests (no Kafka required)
uv sync
uv run pytest
# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration
Delivery semantics
KafkaEventBus provides at-least-once delivery with enable_auto_commit=True.
Consumer errors are logged and do not stop the consumer loop.
For exactly-once semantics, configure Kafka transactions in
producer_kwargs / consumer_kwargs — this is out of scope for the bus itself.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file varco_kafka-0.1.0.tar.gz.
File metadata
- Download URL: varco_kafka-0.1.0.tar.gz
- Upload date:
- Size: 37.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ee3579267cb60cb00c0c08260ef1158a47b158a406272591b442c8a3fe346f58
|
|
| MD5 |
d70a14b31511816bed2fd04abb260d75
|
|
| BLAKE2b-256 |
a68c6474a2b8e2dfd3695a80ad47ae959bb8e3050fd33f00e4d3e1712a6e72e0
|
File details
Details for the file varco_kafka-0.1.0-py3-none-any.whl.
File metadata
- Download URL: varco_kafka-0.1.0-py3-none-any.whl
- Upload date:
- Size: 29.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d6665b9afe7bbe2c8bbd10a9b55c388dfdb53d442ca7833f7bd4d9c55af61b2a
|
|
| MD5 |
6308499e1de69408c6c24896966a1e02
|
|
| BLAKE2b-256 |
9d1daec8bbdc10056e31f4f54361f4bd1bd2c4ff1b046401999a13b10dbef255
|