Skip to main content

Pydantic-typed Kafka pub/sub with RPC semantics — Producer / Consumer roles over kfk_mng

Project description

kafka_rpc_pubsub

Pydantic-typed Kafka pub/sub с RPC-семантикой. Тонкий слой поверх kfk_mng — даёт типизированный KafkaProducer[T] / KafkaConsumer[T] для inter-service async events.

Установка

pip install kafka_rpc_pubsub

Возможности

  • ✅ Generic над Pydantic v2: KafkaProducer[MyMessage], KafkaConsumer[MyMessage]
  • ✅ ABC + concrete subclasses: Producer (только publish), Consumer (только subscribe)
  • ✅ JSON сериализация через model_dump_json() / model_validate_json()
  • ✅ Manual offset commit
  • ✅ Consumer groups для параллельной обработки
  • ✅ Layered API: KafkaProtocol → PubSub[T] → BaseKafkaService[T] → Producer/Consumer

Быстрый старт

import kfk_mng
from pydantic import BaseModel
from kafka_rpc_pubsub import KafkaProducer, KafkaConsumer

# 1. Регистрация подключения через kfk_mng
kfk_mng.register(
    alias="default",
    bootstrap_servers="kafka:9092",
    enable_producer=True,
    enable_consumer=True,
    consumer_topics=["events"],
    consumer_group_id="my-workers",
)

# 2. Описать сообщение через Pydantic
class EventMessage(BaseModel):
    id: str
    payload: dict


# 3. Создать роль (Producer / Consumer)
class EventProducer(KafkaProducer[EventMessage]):
    model = EventMessage


class EventConsumer(KafkaConsumer[EventMessage]):
    model = EventMessage


# 4. Использовать
producer = EventProducer(topic="events")
partition, offset = await producer.call(EventMessage(id="x", payload={"a": 1}))

consumer = EventConsumer(topic="events", group_id="my-workers")
while True:
    result = await consumer.process()
    if result.message:
        await handle(result.message)
        await consumer.commit(result)

API

Layered архитектура

kfk_mng.KafkaComponents       (raw aiokafka)
    ↓
KafkaProtocol                 (raw send / consume / commit)
    ↓
PubSub[T]                     (Pydantic-typed publish / subscribe / commit)
    ↓
BaseKafkaService[T]           (ABC, integration с alias, group_id, model)
    ↓
KafkaProducer[T] / KafkaConsumer[T]   (роли)

Классы

Класс Описание
KafkaProducer[T] Только публикация: await call(message)(partition, offset)
KafkaConsumer[T] Только потребление: await process()ConsumeResult[T]
BaseKafkaService[T] ABC с model: type[T] classvar и get_key(message) abstractmethod
PubSub[T] Низкоуровневый publish / subscribe / commit
ConsumeResult[T] dataclass: offset / partition / topic / message

Partitioning key

По умолчанию get_key(message) возвращает str(message.id) или str(message.oid). Override:

class OrderProducer(KafkaProducer[OrderMessage]):
    model = OrderMessage

    def get_key(self, message: OrderMessage) -> str:
        return message.customer_id  # партиционирование по клиенту

Связанные библиотеки

  • kfk_mng — connection manager (обязательная зависимость)
  • Inspired by redis_rpc_pubsub (тот же API, но через Redis)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_rpc_pubsub-1.0.1.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_rpc_pubsub-1.0.1-py3-none-any.whl (7.1 kB view details)

Uploaded Python 3

File details

Details for the file kafka_rpc_pubsub-1.0.1.tar.gz.

File metadata

  • Download URL: kafka_rpc_pubsub-1.0.1.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.0

File hashes

Hashes for kafka_rpc_pubsub-1.0.1.tar.gz
Algorithm Hash digest
SHA256 ff50bcfa8924016da42f92d3914844137b667f59e5c32fee90091369b12f740a
MD5 a1e061375ae8cd01fc7d406dbd4b0fa0
BLAKE2b-256 eca7bc00dd1913b66afc1adf1caa74eb7887d6180f4d957a0fac87302a68b695

See more details on using hashes here.

File details

Details for the file kafka_rpc_pubsub-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_rpc_pubsub-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 047eaacbbc63c53f5c4e1fe50a4873f6a638c6c66194f5e69c4731a35ea83e00
MD5 7c3ad318d3c4f87a04a58363418a69b9
BLAKE2b-256 7c35430e22fc63bcb0a3d2c6c5e4b13f798c684b93691452499e7a6a7cf02fa5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page