Skip to main content

Apache Kafka event bus backend for varco — KafkaEventBus built on aiokafka

Project description

varco-kafka

Apache Kafka event bus backend for varco.

KafkaEventBus implements AbstractEventBus from varco_core using aiokafka. Published events are serialized to JSON (via JsonEventSerializer) and sent to Kafka topics. A background consumer task reads from those topics and dispatches messages to locally registered handlers.


Installation

pip install varco-kafka
# or with uv:
uv add varco-kafka

Quick start

from varco_kafka import KafkaEventBus, KafkaEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen, Event

# Define your events
class OrderPlacedEvent(Event):
    __event_type__ = "order.placed"
    order_id: str
    total: float

# Configure the bus
settings = KafkaEventBusSettings(
    bootstrap_servers="localhost:9092",
    group_id="order-service",
)

async def main():
    async with KafkaEventBus(settings) as bus:
        # --- Consumer side ---
        class OrderConsumer(EventConsumer):
            @listen(OrderPlacedEvent, channel="orders")
            async def on_placed(self, event: OrderPlacedEvent) -> None:
                print(f"Order placed: {event.order_id}")

        OrderConsumer().register_to(bus)

        # --- Producer side ---
        producer = BusEventProducer(bus)
        await producer._produce(
            OrderPlacedEvent(order_id="abc", total=99.0),
            channel="orders",
        )

Configuration

Event bus

from varco_kafka import KafkaEventBusSettings

settings = KafkaEventBusSettings(
    bootstrap_servers="kafka.internal:9092",   # broker address(es)
    group_id="my-service",                     # consumer group ID
    channel_prefix="prod.",                    # optional — "orders" → "prod.orders"
    auto_offset_reset="latest",                # "latest" or "earliest"
    enable_auto_commit=True,                   # at-least-once delivery
)
Field Default Env var Description
bootstrap_servers "localhost:9092" VARCO_KAFKA_BOOTSTRAP_SERVERS Kafka broker address(es)
group_id "varco-default" VARCO_KAFKA_GROUP_ID Consumer group ID
channel_prefix "" VARCO_KAFKA_CHANNEL_PREFIX Prepended to every topic name
auto_offset_reset "latest" VARCO_KAFKA_AUTO_OFFSET_RESET Offset policy for new consumer groups
enable_auto_commit True VARCO_KAFKA_ENABLE_AUTO_COMMIT Auto-commit consumer offsets
producer_kwargs {} Extra kwargs for AIOKafkaProducer
consumer_kwargs {} Extra kwargs for AIOKafkaConsumer

Channel manager (topic administration)

KafkaChannelManager handles Kafka topic creation and deletion. It uses a separate settings class so admin credentials never bleed into the bus:

from varco_kafka import KafkaChannelManager, KafkaChannelManagerSettings

admin_settings = KafkaChannelManagerSettings(
    bootstrap_servers="kafka.internal:9092",
    # env prefix: VARCO_KAFKA_ADMIN_
)

async with KafkaChannelManager(admin_settings) as manager:
    await manager.declare_channel("orders")      # create topic if absent
    await manager.delete_channel("orders")       # delete topic
Field Default Env var Description
bootstrap_servers "localhost:9092" VARCO_KAFKA_ADMIN_BOOTSTRAP_SERVERS Kafka broker address(es)
admin_kwargs {} Extra kwargs for AIOKafkaAdminClient

Lifecycle

# Explicit lifecycle
bus = KafkaEventBus(settings)
await bus.start()     # connects producer, starts consumer task
# ... use bus ...
await bus.stop()      # flushes producer, cancels consumer task

# Context manager (recommended)
async with KafkaEventBus(settings) as bus:
    ...

DI integration

varco_kafka ships two @Configuration classes so you can install only what each service needs:

from providify import DIContainer
from varco_core.event import AbstractEventBus, ChannelManager
from varco_kafka import KafkaEventBusConfiguration, KafkaChannelManagerConfiguration

# Services that only publish/consume events
container = DIContainer()
await container.ainstall(KafkaEventBusConfiguration)
bus = await container.aget(AbstractEventBus)

# Admin services that also manage topics
await container.ainstall(KafkaChannelManagerConfiguration)
manager = await container.aget(ChannelManager)

await container.ashutdown()

Connection settings

KafkaConnectionSettings is a structured, env-var loadable config object that produces kwargs for AIOKafkaProducer and AIOKafkaConsumer.

Plain connection (PLAINTEXT)

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from varco_kafka.connection import KafkaConnectionSettings

conn = KafkaConnectionSettings(
    bootstrap_servers="broker1:9092,broker2:9092",
    group_id="order-service",
)

producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())
consumer = AIOKafkaConsumer("orders", **conn.to_aiokafka_kwargs())

await producer.start()
await consumer.start()

From environment variables

KAFKA_BOOTSTRAP_SERVERS=broker1:9092,broker2:9092
KAFKA_GROUP_ID=order-service
conn = KafkaConnectionSettings.from_env()
producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())

You can also configure a single broker via KAFKA_HOST + KAFKA_PORT without setting KAFKA_BOOTSTRAP_SERVERS — the settings synthesise it automatically:

KAFKA_HOST=my-broker
KAFKA_PORT=9093
# bootstrap_servers → "my-broker:9093"

With TLS / SSL (no auth)

from varco_core.connection import SSLConfig
from pathlib import Path

ssl = SSLConfig(ca_cert=Path("/etc/ssl/kafka-ca.pem"), verify=True)
conn = KafkaConnectionSettings.with_ssl(
    ssl,
    bootstrap_servers="broker:9093",
    group_id="my-service",
)
# security_protocol → "SSL"
producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())

Or from env:

KAFKA_BOOTSTRAP_SERVERS=broker:9093
KAFKA_SSL__CA_CERT=/etc/ssl/kafka-ca.pem
KAFKA_SSL__VERIFY=true

With SASL authentication (SASL_PLAINTEXT)

from varco_core.connection import SaslConfig

conn = KafkaConnectionSettings(
    bootstrap_servers="broker:9092",
    group_id="my-service",
    auth=SaslConfig(
        mechanism="SCRAM-SHA-256",
        username="alice",
        password="secret",
    ),
)
# security_protocol → "SASL_PLAINTEXT"
producer = AIOKafkaProducer(**conn.to_aiokafka_kwargs())

Or from env:

KAFKA_AUTH__TYPE=sasl
KAFKA_AUTH__MECHANISM=SCRAM-SHA-256
KAFKA_AUTH__USERNAME=alice
KAFKA_AUTH__PASSWORD=secret

With SASL + TLS (SASL_SSL)

ssl = SSLConfig(ca_cert=Path("/etc/ssl/ca.pem"), verify=True)
conn = KafkaConnectionSettings.with_ssl(
    ssl,
    bootstrap_servers="broker:9093",
    group_id="my-service",
    auth=SaslConfig(mechanism="SCRAM-SHA-256", username="alice", password="secret"),
)
# security_protocol → "SASL_SSL"

SASL PLAIN via BasicAuthConfig

BasicAuthConfig (type "basic") is automatically mapped to SASL PLAIN:

from varco_core.connection import BasicAuthConfig

conn = KafkaConnectionSettings(
    bootstrap_servers="broker:9092",
    auth=BasicAuthConfig(username="alice", password="secret"),
)
# sasl_mechanism → "PLAIN"

Or from env:

KAFKA_AUTH__TYPE=basic
KAFKA_AUTH__USERNAME=alice
KAFKA_AUTH__PASSWORD=secret

security_protocol matrix

ssl auth security_protocol
not set not set PLAINTEXT
set not set SSL
not set set SASL_PLAINTEXT
set set SASL_SSL

Connection settings reference

Env var Default Description
KAFKA_BOOTSTRAP_SERVERS localhost:9092 Comma-separated broker addresses
KAFKA_HOST localhost Single broker hostname (synthesises bootstrap_servers)
KAFKA_PORT 9092 Single broker port (synthesises bootstrap_servers)
KAFKA_GROUP_ID varco-default Consumer group ID
KAFKA_SSL__CA_CERT Path to CA certificate
KAFKA_SSL__CLIENT_CERT Path to client certificate (mTLS)
KAFKA_SSL__CLIENT_KEY Path to client private key (mTLS)
KAFKA_SSL__VERIFY true TLS peer verification
KAFKA_AUTH__TYPE sasl or basic
KAFKA_AUTH__MECHANISM PLAIN SASL mechanism (PLAIN, SCRAM-SHA-256, etc.)
KAFKA_AUTH__USERNAME SASL username
KAFKA_AUTH__PASSWORD SASL password

Note: KafkaConnectionSettings is a general-purpose connection config. KafkaEventBusSettings (used by KafkaEventBus) is a separate, independent class — existing event bus code is unaffected.


Running tests

# Unit tests (no Kafka required)
uv sync
uv run pytest

# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration

Delivery semantics

KafkaEventBus provides at-least-once delivery with enable_auto_commit=True. Consumer errors are logged and do not stop the consumer loop.

For exactly-once semantics, configure Kafka transactions in producer_kwargs / consumer_kwargs — this is out of scope for the bus itself.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_kafka-1.0.6.tar.gz (42.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_kafka-1.0.6-py3-none-any.whl (34.3 kB view details)

Uploaded Python 3

File details

Details for the file varco_kafka-1.0.6.tar.gz.

File metadata

  • Download URL: varco_kafka-1.0.6.tar.gz
  • Upload date:
  • Size: 42.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_kafka-1.0.6.tar.gz
Algorithm Hash digest
SHA256 a206450f5854abf540144bbec36a551af1f7281bcd571e0d1d51a56f71994dcb
MD5 21d0e4a1a99218cb3efb968d68f5ad57
BLAKE2b-256 30d108349a2597ffd499a2a5ba76f869e2eff5cee8933f882d9808c8271a1284

See more details on using hashes here.

File details

Details for the file varco_kafka-1.0.6-py3-none-any.whl.

File metadata

  • Download URL: varco_kafka-1.0.6-py3-none-any.whl
  • Upload date:
  • Size: 34.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_kafka-1.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 5af953735aa0c7ad9472896ebf1d177c98454903a20db8ebb5c18eaa590e074f
MD5 5a27db695c8d0015a1014dbc07a90360
BLAKE2b-256 9701265e448731c1b844c779fe0cf52c93ef0a973bf935527fef5eb3897d8092

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page