Skip to main content

Pydantic-typed Kafka pub/sub with RPC semantics — Producer / Consumer roles over kfk_mng

Project description

kafka_rpc_pubsub

Pydantic-typed Kafka pub/sub с RPC-семантикой. Тонкий слой поверх kfk_mng — даёт типизированный KafkaProducer[T] / KafkaConsumer[T] для inter-service async events.

Установка

pip install kafka_rpc_pubsub

Возможности

  • ✅ Generic над Pydantic v2: KafkaProducer[MyMessage], KafkaConsumer[MyMessage]
  • ✅ ABC + concrete subclasses: Producer (только publish), Consumer (только subscribe)
  • ✅ JSON сериализация через model_dump_json() / model_validate_json()
  • ✅ Manual offset commit
  • ✅ Consumer groups для параллельной обработки
  • ✅ Layered API: KafkaProtocol → PubSub[T] → BaseKafkaService[T] → Producer/Consumer

Быстрый старт

import kfk_mng
from pydantic import BaseModel
from kafka_rpc_pubsub import KafkaProducer, KafkaConsumer

# 1. Регистрация подключения через kfk_mng
kfk_mng.register(
    alias="default",
    bootstrap_servers="kafka:9092",
    enable_producer=True,
    enable_consumer=True,
    consumer_topics=["events"],
    consumer_group_id="my-workers",
)

# 2. Описать сообщение через Pydantic
class EventMessage(BaseModel):
    id: str
    payload: dict


# 3. Создать роль (Producer / Consumer)
class EventProducer(KafkaProducer[EventMessage]):
    model = EventMessage


class EventConsumer(KafkaConsumer[EventMessage]):
    model = EventMessage


# 4. Использовать
producer = EventProducer(topic="events")
partition, offset = await producer.call(EventMessage(id="x", payload={"a": 1}))

consumer = EventConsumer(topic="events", group_id="my-workers")
while True:
    result = await consumer.process()
    if result.message:
        await handle(result.message)
        await consumer.commit(result)

API

Layered архитектура

kfk_mng.KafkaComponents       (raw aiokafka)
    ↓
KafkaProtocol                 (raw send / consume / commit)
    ↓
PubSub[T]                     (Pydantic-typed publish / subscribe / commit)
    ↓
BaseKafkaService[T]           (ABC, integration с alias, group_id, model)
    ↓
KafkaProducer[T] / KafkaConsumer[T]   (роли)

Классы

Класс Описание
KafkaProducer[T] Только публикация: await call(message)(partition, offset)
KafkaConsumer[T] Только потребление: await process()ConsumeResult[T]
BaseKafkaService[T] ABC с model: type[T] classvar и get_key(message) abstractmethod
PubSub[T] Низкоуровневый publish / subscribe / commit
ConsumeResult[T] dataclass: offset / partition / topic / message

Partitioning key

По умолчанию get_key(message) возвращает str(message.id) или str(message.oid). Override:

class OrderProducer(KafkaProducer[OrderMessage]):
    model = OrderMessage

    def get_key(self, message: OrderMessage) -> str:
        return message.customer_id  # партиционирование по клиенту

Связанные библиотеки

  • kfk_mng — connection manager (обязательная зависимость)
  • Inspired by redis_rpc_pubsub (тот же API, но через Redis)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka_rpc_pubsub-1.0.0.tar.gz (5.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kafka_rpc_pubsub-1.0.0-py3-none-any.whl (7.3 kB view details)

Uploaded Python 3

File details

Details for the file kafka_rpc_pubsub-1.0.0.tar.gz.

File metadata

  • Download URL: kafka_rpc_pubsub-1.0.0.tar.gz
  • Upload date:
  • Size: 5.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.0

File hashes

Hashes for kafka_rpc_pubsub-1.0.0.tar.gz
Algorithm Hash digest
SHA256 3f49cfc3509fd44003d380768c773737331824cab9182967d20085885b2a84ea
MD5 bf16376ddab1c51a02b0a48ec9e3d1bc
BLAKE2b-256 d2ae756bd8f02bcd1401cdd9e1f300440b5f52f4e86f8a14f7c1c939a4aff4c3

See more details on using hashes here.

File details

Details for the file kafka_rpc_pubsub-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_rpc_pubsub-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ee2b3bd60cb56feb7f8611f07c485a17681e98dbd4538ae049eb3ae309b8bb7b
MD5 0452e88fef9d77d2f02cca53e9436654
BLAKE2b-256 b8ed555d14b627f045ca8153810ebc6ad270e8c5a5423b840178b2c123533637

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page