OpenFrame Microservice Suite - Apache Kafka queue adapter.
Project description
openframe-adapters-queue-kafka
Apache Kafka queue adapter for the OpenFrame Microservice Development Suite.
Part of the openframe-adapters monorepo.
What it provides
| Symbol | Purpose |
|---|---|
KafkaSettings |
Pydantic-settings subclass — reads all config from env vars |
KafkaProducer[T] |
Generic async message producer — BaseProducer[T] |
KafkaConsumer[T] |
Generic async message consumer — BaseConsumer[T] |
KafkaPlugin |
OpenFramePlugin — structured lifecycle via PluginRegistry |
Installation
# Via meta-package (recommended)
pip install "openframe-adapters[kafka]"
# Or directly
pip install openframe-adapters-queue-kafka
Quick start
from openframe.adapters.queue.kafka import KafkaSettings, KafkaProducer, KafkaConsumer
settings = KafkaSettings(kafka_bootstrap_servers="localhost:9092")
# Produce
producer = KafkaProducer(settings)
await producer.start()
await producer.publish({"event": "item.created", "id": "abc"})
await producer.publish_batch([{"event": "x"}, {"event": "y"}])
await producer.close()
# Consume
consumer = KafkaConsumer(settings)
async def handle(event: dict) -> None:
print(f"Received: {event}")
await consumer.subscribe(handle) # runs until consumer.close() called
Configuration
| Env var | Default | Description |
|---|---|---|
KAFKA_BOOTSTRAP_SERVERS |
required | host:port,host:port |
KAFKA_TOPIC |
"openframe" |
Default topic for producer and consumer |
KAFKA_GROUP_ID |
"openframe-group" |
Consumer group ID |
KAFKA_AUTO_OFFSET_RESET |
"earliest" |
"earliest" or "latest" |
KAFKA_MAX_POLL_RECORDS |
10 |
Max messages per poll |
KAFKA_SESSION_TIMEOUT_MS |
30000 |
Consumer session timeout |
KAFKA_REQUEST_TIMEOUT_MS |
30000 |
Broker request timeout |
KAFKA_SECURITY_PROTOCOL |
"PLAINTEXT" |
"PLAINTEXT", "SSL", "SASL_PLAINTEXT" |
KAFKA_SASL_MECHANISM |
"" |
"PLAIN", "SCRAM-SHA-256", etc. |
KAFKA_SASL_USERNAME |
"" |
SASL username |
KAFKA_SASL_PASSWORD |
"" |
SASL password |
Typed domain objects
from openframe.adapters.queue.kafka import KafkaProducer, KafkaConsumer, KafkaSettings
from dataclasses import dataclass, asdict
@dataclass
class OrderEvent:
order_id: str
event_type: str
class OrderProducer(KafkaProducer[OrderEvent]):
def _serialise(self, message: OrderEvent) -> bytes:
import json
return json.dumps(asdict(message)).encode("utf-8")
class OrderConsumer(KafkaConsumer[OrderEvent]):
def _deserialise(self, raw: bytes) -> OrderEvent:
import json
return OrderEvent(**json.loads(raw.decode("utf-8")))
Plugin lifecycle (optional)
from openframe.core.plugins import PluginRegistry
from openframe.adapters.queue.kafka import KafkaPlugin, KafkaSettings
registry = PluginRegistry()
registry.register(KafkaPlugin(KafkaSettings()))
await registry.initialize_all()
plugin = registry.get("queue")
producer = plugin.get_producer()
await producer.publish({"event": "item.created"})
consumer = plugin.make_consumer()
await consumer.subscribe(handler)
Consumer acknowledgement semantics
| Outcome | Behaviour |
|---|---|
| Handler returns | ack() called → offset committed → message consumed |
| Handler raises | nack() called → no commit → message redelivered |
consumer.close() |
polling loop exits → consumer stopped cleanly |
License
MIT — © Furious Meteors Engineering
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file openframe_adapters_queue_kafka-1.1.0.tar.gz.
File metadata
- Download URL: openframe_adapters_queue_kafka-1.1.0.tar.gz
- Upload date:
- Size: 13.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a82830e89dc8bd3a7ec5a2e78b1f1f076f324ff49c22c8629b1f6e4f14e64b34
|
|
| MD5 |
e11470d1825beb0b0759b87be716224d
|
|
| BLAKE2b-256 |
4de4bb99375eb6b808954c56ef927a9d70802ea5a6236a0ba7ed0f624945b88e
|
File details
Details for the file openframe_adapters_queue_kafka-1.1.0-py3-none-any.whl.
File metadata
- Download URL: openframe_adapters_queue_kafka-1.1.0-py3-none-any.whl
- Upload date:
- Size: 11.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cb8f0f4874ec28be13998506c0444518904b9052cd532c1467f54c39eb0968b7
|
|
| MD5 |
3e6445ffc42b735263a349a37061acdb
|
|
| BLAKE2b-256 |
2b8ec65a6d48087abf458a8e10ffb0a9a2fcff2b9f95b3b80864cf9902a22a28
|