Pydantic-typed Kafka pub/sub with RPC semantics — Producer / Consumer roles over kfk_mng
Project description
kafka_rpc_pubsub
Pydantic-typed Kafka pub/sub с RPC-семантикой. Тонкий слой поверх kfk_mng — даёт типизированный KafkaProducer[T] / KafkaConsumer[T] для inter-service async events.
Установка
pip install kafka_rpc_pubsub
Возможности
- ✅ Generic над Pydantic v2:
KafkaProducer[MyMessage],KafkaConsumer[MyMessage] - ✅ ABC + concrete subclasses: Producer (только publish), Consumer (только subscribe)
- ✅ JSON сериализация через
model_dump_json()/model_validate_json() - ✅ Manual offset commit
- ✅ Consumer groups для параллельной обработки
- ✅ Layered API:
KafkaProtocol → PubSub[T] → BaseKafkaService[T] → Producer/Consumer
Быстрый старт
import kfk_mng
from pydantic import BaseModel
from kafka_rpc_pubsub import KafkaProducer, KafkaConsumer
# 1. Регистрация подключения через kfk_mng
kfk_mng.register(
alias="default",
bootstrap_servers="kafka:9092",
enable_producer=True,
enable_consumer=True,
consumer_topics=["events"],
consumer_group_id="my-workers",
)
# 2. Описать сообщение через Pydantic
class EventMessage(BaseModel):
id: str
payload: dict
# 3. Создать роль (Producer / Consumer)
class EventProducer(KafkaProducer[EventMessage]):
model = EventMessage
class EventConsumer(KafkaConsumer[EventMessage]):
model = EventMessage
# 4. Использовать
producer = EventProducer(topic="events")
partition, offset = await producer.call(EventMessage(id="x", payload={"a": 1}))
consumer = EventConsumer(topic="events", group_id="my-workers")
while True:
result = await consumer.process()
if result.message:
await handle(result.message)
await consumer.commit(result)
API
Layered архитектура
kfk_mng.KafkaComponents (raw aiokafka)
↓
KafkaProtocol (raw send / consume / commit)
↓
PubSub[T] (Pydantic-typed publish / subscribe / commit)
↓
BaseKafkaService[T] (ABC, integration с alias, group_id, model)
↓
KafkaProducer[T] / KafkaConsumer[T] (роли)
Классы
| Класс | Описание |
|---|---|
KafkaProducer[T] |
Только публикация: await call(message) → (partition, offset) |
KafkaConsumer[T] |
Только потребление: await process() → ConsumeResult[T] |
BaseKafkaService[T] |
ABC с model: type[T] classvar и get_key(message) abstractmethod |
PubSub[T] |
Низкоуровневый publish / subscribe / commit |
ConsumeResult[T] |
dataclass: offset / partition / topic / message |
Partitioning key
По умолчанию get_key(message) возвращает str(message.id) или str(message.oid). Override:
class OrderProducer(KafkaProducer[OrderMessage]):
model = OrderMessage
def get_key(self, message: OrderMessage) -> str:
return message.customer_id # партиционирование по клиенту
Связанные библиотеки
kfk_mng— connection manager (обязательная зависимость)- Inspired by
redis_rpc_pubsub(тот же API, но через Redis)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kafka_rpc_pubsub-1.0.0.tar.gz.
File metadata
- Download URL: kafka_rpc_pubsub-1.0.0.tar.gz
- Upload date:
- Size: 5.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3f49cfc3509fd44003d380768c773737331824cab9182967d20085885b2a84ea
|
|
| MD5 |
bf16376ddab1c51a02b0a48ec9e3d1bc
|
|
| BLAKE2b-256 |
d2ae756bd8f02bcd1401cdd9e1f300440b5f52f4e86f8a14f7c1c939a4aff4c3
|
File details
Details for the file kafka_rpc_pubsub-1.0.0-py3-none-any.whl.
File metadata
- Download URL: kafka_rpc_pubsub-1.0.0-py3-none-any.whl
- Upload date:
- Size: 7.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ee2b3bd60cb56feb7f8611f07c485a17681e98dbd4538ae049eb3ae309b8bb7b
|
|
| MD5 |
0452e88fef9d77d2f02cca53e9436654
|
|
| BLAKE2b-256 |
b8ed555d14b627f045ca8153810ebc6ad270e8c5a5423b840178b2c123533637
|