Apache Kafka event bus backend for varco — KafkaEventBus built on aiokafka
Project description
varco-kafka
Apache Kafka event bus backend for varco.
KafkaEventBus implements AbstractEventBus from varco_core using
aiokafka. Published events are serialized
to JSON (via JsonEventSerializer) and sent to Kafka topics. A background
consumer task reads from those topics and dispatches messages to locally
registered handlers.
Installation
pip install varco-kafka
# or with uv:
uv add varco-kafka
Quick start
from varco_kafka import KafkaEventBus, KafkaConfig
from varco_core.event import BusEventProducer, EventConsumer, listen, Event
# Define your events
class OrderPlacedEvent(Event):
__event_type__ = "order.placed"
order_id: str
total: float
# Configure the bus
config = KafkaConfig(
bootstrap_servers="localhost:9092",
group_id="order-service",
)
async def main():
async with KafkaEventBus(config) as bus:
# --- Consumer side ---
class OrderConsumer(EventConsumer):
@listen(OrderPlacedEvent, channel="orders")
async def on_placed(self, event: OrderPlacedEvent) -> None:
print(f"Order placed: {event.order_id}")
OrderConsumer().register_to(bus)
# --- Producer side ---
producer = BusEventProducer(bus)
await producer._produce(
OrderPlacedEvent(order_id="abc", total=99.0),
channel="orders",
)
Configuration
from varco_kafka import KafkaConfig
config = KafkaConfig(
bootstrap_servers="kafka.internal:9092", # broker address(es)
group_id="my-service", # consumer group ID
topic_prefix="prod.", # optional — "orders" → "prod.orders"
auto_offset_reset="latest", # "latest" or "earliest"
enable_auto_commit=True, # at-least-once delivery
)
| Field | Default | Description |
|---|---|---|
bootstrap_servers |
"localhost:9092" |
Kafka broker address(es) |
group_id |
"varco-default" |
Consumer group ID |
topic_prefix |
"" |
Prepended to every topic name |
auto_offset_reset |
"latest" |
Offset policy for new consumer groups |
enable_auto_commit |
True |
Auto-commit consumer offsets |
producer_kwargs |
{} |
Extra kwargs for AIOKafkaProducer |
consumer_kwargs |
{} |
Extra kwargs for AIOKafkaConsumer |
Lifecycle
# Explicit lifecycle
bus = KafkaEventBus(config)
await bus.start() # connects producer, starts consumer task
# ... use bus ...
await bus.stop() # flushes producer, cancels consumer task
# Context manager (recommended)
async with KafkaEventBus(config) as bus:
...
Running tests
# Unit tests (no Kafka required)
uv sync
uv run pytest
# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration
Delivery semantics
KafkaEventBus provides at-least-once delivery with enable_auto_commit=True.
Consumer errors are logged and do not stop the consumer loop.
For exactly-once semantics, configure Kafka transactions in
producer_kwargs / consumer_kwargs — this is out of scope for the bus itself.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file varco_kafka-0.0.1.tar.gz.
File metadata
- Download URL: varco_kafka-0.0.1.tar.gz
- Upload date:
- Size: 18.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
be47db45fdf458f27ca04d37fb2325fa8446a480bca6a98bb9da721e5e6b8f8d
|
|
| MD5 |
0f99c3df980d91a8a37ee938d67aa807
|
|
| BLAKE2b-256 |
c0b721a72764b5a532de041a4461d668fde5718d21cfd9abd9d8787c27b08908
|
File details
Details for the file varco_kafka-0.0.1-py3-none-any.whl.
File metadata
- Download URL: varco_kafka-0.0.1-py3-none-any.whl
- Upload date:
- Size: 14.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e29cb55d9d00f3dd29144cec10e50b47c1cf2b0eb76a9646fa8cf15d2cdfc03
|
|
| MD5 |
a727992dcdea7015ed901056ace3de89
|
|
| BLAKE2b-256 |
9a250641caa5a6d15ac38d1fd6928c11f70c3a8067f6d841593be3bb137f0db7
|