Skip to main content

OpenFrame Microservice Suite - Apache Kafka queue adapter.

Project description

openframe-adapters-queue-kafka

Apache Kafka queue adapter for the OpenFrame Microservice Development Suite.

Part of the openframe-adapters monorepo.


What it provides

Symbol Purpose
KafkaSettings Pydantic-settings subclass — reads all config from env vars
KafkaProducer[T] Generic async message producer — BaseProducer[T]
KafkaConsumer[T] Generic async message consumer — BaseConsumer[T]
KafkaPlugin OpenFramePlugin — structured lifecycle via PluginRegistry

Installation

# Via meta-package (recommended)
pip install "openframe-adapters[kafka]"

# Or directly
pip install openframe-adapters-queue-kafka

Quick start

from openframe.adapters.queue.kafka import KafkaSettings, KafkaProducer, KafkaConsumer

settings = KafkaSettings(kafka_bootstrap_servers="localhost:9092")

# Produce
producer = KafkaProducer(settings)
await producer.start()
await producer.publish({"event": "item.created", "id": "abc"})
await producer.publish_batch([{"event": "x"}, {"event": "y"}])
await producer.close()

# Consume
consumer = KafkaConsumer(settings)

async def handle(event: dict) -> None:
    print(f"Received: {event}")

await consumer.subscribe(handle)   # runs until consumer.close() called

Configuration

Env var Default Description
KAFKA_BOOTSTRAP_SERVERS required host:port,host:port
KAFKA_TOPIC "openframe" Default topic for producer and consumer
KAFKA_GROUP_ID "openframe-group" Consumer group ID
KAFKA_AUTO_OFFSET_RESET "earliest" "earliest" or "latest"
KAFKA_MAX_POLL_RECORDS 10 Max messages per poll
KAFKA_SESSION_TIMEOUT_MS 30000 Consumer session timeout
KAFKA_REQUEST_TIMEOUT_MS 30000 Broker request timeout
KAFKA_SECURITY_PROTOCOL "PLAINTEXT" "PLAINTEXT", "SSL", "SASL_PLAINTEXT"
KAFKA_SASL_MECHANISM "" "PLAIN", "SCRAM-SHA-256", etc.
KAFKA_SASL_USERNAME "" SASL username
KAFKA_SASL_PASSWORD "" SASL password

Typed domain objects

from openframe.adapters.queue.kafka import KafkaProducer, KafkaConsumer, KafkaSettings
from dataclasses import dataclass, asdict

@dataclass
class OrderEvent:
    order_id: str
    event_type: str

class OrderProducer(KafkaProducer[OrderEvent]):
    def _serialise(self, message: OrderEvent) -> bytes:
        import json
        return json.dumps(asdict(message)).encode("utf-8")

class OrderConsumer(KafkaConsumer[OrderEvent]):
    def _deserialise(self, raw: bytes) -> OrderEvent:
        import json
        return OrderEvent(**json.loads(raw.decode("utf-8")))

Plugin lifecycle (optional)

from openframe.core.plugins import PluginRegistry
from openframe.adapters.queue.kafka import KafkaPlugin, KafkaSettings

registry = PluginRegistry()
registry.register(KafkaPlugin(KafkaSettings()))
await registry.initialize_all()

plugin = registry.get("queue")
producer = plugin.get_producer()
await producer.publish({"event": "item.created"})

consumer = plugin.make_consumer()
await consumer.subscribe(handler)

Consumer acknowledgement semantics

Outcome Behaviour
Handler returns ack() called → offset committed → message consumed
Handler raises nack() called → no commit → message redelivered
consumer.close() polling loop exits → consumer stopped cleanly

License

MIT — © Furious Meteors Engineering

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openframe_adapters_queue_kafka-1.1.0.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

openframe_adapters_queue_kafka-1.1.0-py3-none-any.whl (11.8 kB view details)

Uploaded Python 3

File details

Details for the file openframe_adapters_queue_kafka-1.1.0.tar.gz.

File metadata

File hashes

Hashes for openframe_adapters_queue_kafka-1.1.0.tar.gz
Algorithm Hash digest
SHA256 a82830e89dc8bd3a7ec5a2e78b1f1f076f324ff49c22c8629b1f6e4f14e64b34
MD5 e11470d1825beb0b0759b87be716224d
BLAKE2b-256 4de4bb99375eb6b808954c56ef927a9d70802ea5a6236a0ba7ed0f624945b88e

See more details on using hashes here.

File details

Details for the file openframe_adapters_queue_kafka-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for openframe_adapters_queue_kafka-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cb8f0f4874ec28be13998506c0444518904b9052cd532c1467f54c39eb0968b7
MD5 3e6445ffc42b735263a349a37061acdb
BLAKE2b-256 2b8ec65a6d48087abf458a8e10ffb0a9a2fcff2b9f95b3b80864cf9902a22a28

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page