Apache Kafka event bus backend for varco — KafkaEventBus built on aiokafka
Project description
varco-kafka
Apache Kafka event bus backend for varco.
KafkaEventBus implements AbstractEventBus from varco_core using
aiokafka. Published events are serialized
to JSON (via JsonEventSerializer) and sent to Kafka topics. A background
consumer task reads from those topics and dispatches messages to locally
registered handlers.
Installation
pip install varco-kafka
# or with uv:
uv add varco-kafka
Quick start
from varco_kafka import KafkaEventBus, KafkaEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen, Event
# Define your events
class OrderPlacedEvent(Event):
__event_type__ = "order.placed"
order_id: str
total: float
# Configure the bus
settings = KafkaEventBusSettings(
bootstrap_servers="localhost:9092",
group_id="order-service",
)
async def main():
async with KafkaEventBus(settings) as bus:
# --- Consumer side ---
class OrderConsumer(EventConsumer):
@listen(OrderPlacedEvent, channel="orders")
async def on_placed(self, event: OrderPlacedEvent) -> None:
print(f"Order placed: {event.order_id}")
OrderConsumer().register_to(bus)
# --- Producer side ---
producer = BusEventProducer(bus)
await producer._produce(
OrderPlacedEvent(order_id="abc", total=99.0),
channel="orders",
)
Configuration
Event bus
from varco_kafka import KafkaEventBusSettings
settings = KafkaEventBusSettings(
bootstrap_servers="kafka.internal:9092", # broker address(es)
group_id="my-service", # consumer group ID
channel_prefix="prod.", # optional — "orders" → "prod.orders"
auto_offset_reset="latest", # "latest" or "earliest"
enable_auto_commit=True, # at-least-once delivery
)
| Field | Default | Env var | Description |
|---|---|---|---|
bootstrap_servers |
"localhost:9092" |
VARCO_KAFKA_BOOTSTRAP_SERVERS |
Kafka broker address(es) |
group_id |
"varco-default" |
VARCO_KAFKA_GROUP_ID |
Consumer group ID |
channel_prefix |
"" |
VARCO_KAFKA_CHANNEL_PREFIX |
Prepended to every topic name |
auto_offset_reset |
"latest" |
VARCO_KAFKA_AUTO_OFFSET_RESET |
Offset policy for new consumer groups |
enable_auto_commit |
True |
VARCO_KAFKA_ENABLE_AUTO_COMMIT |
Auto-commit consumer offsets |
producer_kwargs |
{} |
— | Extra kwargs for AIOKafkaProducer |
consumer_kwargs |
{} |
— | Extra kwargs for AIOKafkaConsumer |
Channel manager (topic administration)
KafkaChannelManager handles Kafka topic creation and deletion. It uses a
separate settings class so admin credentials never bleed into the bus:
from varco_kafka import KafkaChannelManager, KafkaChannelManagerSettings
admin_settings = KafkaChannelManagerSettings(
bootstrap_servers="kafka.internal:9092",
# env prefix: VARCO_KAFKA_ADMIN_
)
async with KafkaChannelManager(admin_settings) as manager:
await manager.declare_channel("orders") # create topic if absent
await manager.delete_channel("orders") # delete topic
| Field | Default | Env var | Description |
|---|---|---|---|
bootstrap_servers |
"localhost:9092" |
VARCO_KAFKA_ADMIN_BOOTSTRAP_SERVERS |
Kafka broker address(es) |
admin_kwargs |
{} |
— | Extra kwargs for AIOKafkaAdminClient |
Lifecycle
# Explicit lifecycle
bus = KafkaEventBus(settings)
await bus.start() # connects producer, starts consumer task
# ... use bus ...
await bus.stop() # flushes producer, cancels consumer task
# Context manager (recommended)
async with KafkaEventBus(settings) as bus:
...
DI integration
varco_kafka ships two @Configuration classes so you can install only what
each service needs:
from providify import DIContainer
from varco_core.event import AbstractEventBus, ChannelManager
from varco_kafka import KafkaEventBusConfiguration, KafkaChannelManagerConfiguration
# Services that only publish/consume events
container = DIContainer()
await container.ainstall(KafkaEventBusConfiguration)
bus = await container.aget(AbstractEventBus)
# Admin services that also manage topics
await container.ainstall(KafkaChannelManagerConfiguration)
manager = await container.aget(ChannelManager)
await container.ashutdown()
Connection settings
KafkaConnectionSettings is a structured, env-var loadable config object
that produces kwargs for AIOKafkaProducer and AIOKafkaConsumer.
Plain connection (PLAINTEXT)
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from varco_kafka.connection import KafkaConnectionSettings
conn = KafkaConnectionSettings(
bootstrap_servers="broker1:9092,broker2:9092",
group_id="order-service",
)
producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())
consumer = AIOKafkaConsumer("orders", **conn.to_aiokafka_kwargs())
await producer.start()
await consumer.start()
From environment variables
KAFKA_BOOTSTRAP_SERVERS=broker1:9092,broker2:9092
KAFKA_GROUP_ID=order-service
conn = KafkaConnectionSettings.from_env()
producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())
You can also configure a single broker via KAFKA_HOST + KAFKA_PORT without
setting KAFKA_BOOTSTRAP_SERVERS — the settings synthesise it automatically:
KAFKA_HOST=my-broker
KAFKA_PORT=9093
# bootstrap_servers → "my-broker:9093"
With TLS / SSL (no auth)
from varco_core.connection import SSLConfig
from pathlib import Path
ssl = SSLConfig(ca_cert=Path("/etc/ssl/kafka-ca.pem"), verify=True)
conn = KafkaConnectionSettings.with_ssl(
ssl,
bootstrap_servers="broker:9093",
group_id="my-service",
)
# security_protocol → "SSL"
producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())
Or from env:
KAFKA_BOOTSTRAP_SERVERS=broker:9093
KAFKA_SSL__CA_CERT=/etc/ssl/kafka-ca.pem
KAFKA_SSL__VERIFY=true
With SASL authentication (SASL_PLAINTEXT)
from varco_core.connection import SaslConfig
conn = KafkaConnectionSettings(
bootstrap_servers="broker:9092",
group_id="my-service",
auth=SaslConfig(
mechanism="SCRAM-SHA-256",
username="alice",
password="secret",
),
)
# security_protocol → "SASL_PLAINTEXT"
producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())
Or from env:
KAFKA_AUTH__TYPE=sasl
KAFKA_AUTH__MECHANISM=SCRAM-SHA-256
KAFKA_AUTH__USERNAME=alice
KAFKA_AUTH__PASSWORD=secret
With SASL + TLS (SASL_SSL)
ssl = SSLConfig(ca_cert=Path("/etc/ssl/ca.pem"), verify=True)
conn = KafkaConnectionSettings.with_ssl(
ssl,
bootstrap_servers="broker:9093",
group_id="my-service",
auth=SaslConfig(mechanism="SCRAM-SHA-256", username="alice", password="secret"),
)
# security_protocol → "SASL_SSL"
SASL PLAIN via BasicAuthConfig
BasicAuthConfig (type "basic") is automatically mapped to SASL PLAIN:
from varco_core.connection import BasicAuthConfig
conn = KafkaConnectionSettings(
bootstrap_servers="broker:9092",
auth=BasicAuthConfig(username="alice", password="secret"),
)
# sasl_mechanism → "PLAIN"
Or from env:
KAFKA_AUTH__TYPE=basic
KAFKA_AUTH__USERNAME=alice
KAFKA_AUTH__PASSWORD=secret
security_protocol matrix
ssl |
auth |
security_protocol |
|---|---|---|
| not set | not set | PLAINTEXT |
| set | not set | SSL |
| not set | set | SASL_PLAINTEXT |
| set | set | SASL_SSL |
Connection settings reference
| Env var | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
localhost:9092 |
Comma-separated broker addresses |
KAFKA_HOST |
localhost |
Single broker hostname (synthesises bootstrap_servers) |
KAFKA_PORT |
9092 |
Single broker port (synthesises bootstrap_servers) |
KAFKA_GROUP_ID |
varco-default |
Consumer group ID |
KAFKA_SSL__CA_CERT |
— | Path to CA certificate |
KAFKA_SSL__CLIENT_CERT |
— | Path to client certificate (mTLS) |
KAFKA_SSL__CLIENT_KEY |
— | Path to client private key (mTLS) |
KAFKA_SSL__VERIFY |
true |
TLS peer verification |
KAFKA_AUTH__TYPE |
— | sasl or basic |
KAFKA_AUTH__MECHANISM |
PLAIN |
SASL mechanism (PLAIN, SCRAM-SHA-256, etc.) |
KAFKA_AUTH__USERNAME |
— | SASL username |
KAFKA_AUTH__PASSWORD |
— | SASL password |
Note:
KafkaConnectionSettingsis a general-purpose connection config.KafkaEventBusSettings(used byKafkaEventBus) is a separate, independent class — existing event bus code is unaffected.
Running tests
# Unit tests (no Kafka required)
uv sync
uv run pytest
# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration
Delivery semantics
KafkaEventBus provides at-least-once delivery with enable_auto_commit=True.
Consumer errors are logged and do not stop the consumer loop.
For exactly-once semantics, configure Kafka transactions in
producer_kwargs / consumer_kwargs — this is out of scope for the bus itself.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file varco_kafka-1.0.6.tar.gz.
File metadata
- Download URL: varco_kafka-1.0.6.tar.gz
- Upload date:
- Size: 42.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a206450f5854abf540144bbec36a551af1f7281bcd571e0d1d51a56f71994dcb
|
|
| MD5 |
21d0e4a1a99218cb3efb968d68f5ad57
|
|
| BLAKE2b-256 |
30d108349a2597ffd499a2a5ba76f869e2eff5cee8933f882d9808c8271a1284
|
File details
Details for the file varco_kafka-1.0.6-py3-none-any.whl.
File metadata
- Download URL: varco_kafka-1.0.6-py3-none-any.whl
- Upload date:
- Size: 34.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5af953735aa0c7ad9472896ebf1d177c98454903a20db8ebb5c18eaa590e074f
|
|
| MD5 |
5a27db695c8d0015a1014dbc07a90360
|
|
| BLAKE2b-256 |
9701265e448731c1b844c779fe0cf52c93ef0a973bf935527fef5eb3897d8092
|