Skip to main content

Apache Kafka plugin for Spakky framework

Project description

Spakky Kafka

Apache Kafka plugin for Spakky Framework.

Installation

pip install spakky-kafka

Or install via Spakky extras:

pip install spakky[kafka]

Configuration

Set environment variables with the SPAKKY_KAFKA__ prefix:

export SPAKKY_KAFKA__GROUP_ID="my-consumer-group"
export SPAKKY_KAFKA__CLIENT_ID="my-app"
export SPAKKY_KAFKA__BOOTSTRAP_SERVERS="localhost:9092"
export SPAKKY_KAFKA__AUTO_OFFSET_RESET="earliest"  # earliest, latest, none

SASL Authentication (Optional)

export SPAKKY_KAFKA__SECURITY_PROTOCOL="SASL_SSL"
export SPAKKY_KAFKA__SASL_MECHANISM="PLAIN"
export SPAKKY_KAFKA__SASL_USERNAME="username"
export SPAKKY_KAFKA__SASL_PASSWORD="password"

Topic Configuration (Optional)

export SPAKKY_KAFKA__NUMBER_OF_PARTITIONS="3"
export SPAKKY_KAFKA__REPLICATION_FACTOR="1"

Usage

Event Publishing

from spakky.core.common.mutability import immutable
from spakky.domain.models.event import AbstractIntegrationEvent
from spakky.event.event_publisher import IEventPublisher
from spakky.core.pod.annotations.pod import Pod

@immutable
class UserCreatedEvent(AbstractIntegrationEvent):
    user_id: int
    email: str

@Pod()
class UserService:
    def __init__(self, publisher: IEventPublisher) -> None:
        self.publisher = publisher

    def create_user(self, email: str) -> User:
        user = User(email=email)
        self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
        return user

Event Consuming

from spakky.event.stereotype.event_handler import EventHandler, on_event

@EventHandler()
class UserEventHandler:
    def __init__(self, notification_service: NotificationService) -> None:
        self.notification_service = notification_service

    @on_event(UserCreatedEvent)
    async def on_user_created(self, event: UserCreatedEvent) -> None:
        await self.notification_service.send_welcome_email(event.email)

Async Variants

For async applications, use IAsyncEventPublisher:

from spakky.event.event_publisher import IAsyncEventPublisher

@Pod()
class AsyncUserService:
    def __init__(self, publisher: IAsyncEventPublisher) -> None:
        self.publisher = publisher

    async def create_user(self, email: str) -> User:
        user = User(email=email)
        await self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
        return user

Distributed Tracing

spakky-tracing은 필수 의존성으로 자동 설치됩니다. ITracePropagator가 컨테이너에 등록되어 있으면 이벤트 발행/소비 시 TraceContext가 자동으로 전파됩니다.

  • 발행 측: IEventTransport.send() 시 현재 TraceContext를 Kafka 메시지 헤더에 주입합니다
  • 소비 측: 수신 메시지에서 TraceContext를 추출하여 자식 스팬을 생성합니다
  • 헤더가 없으면 새로운 루트 트레이스를 시작합니다

Features

  • Automatic topic creation: Topics are created based on event type names
  • Sync and Async support: Both synchronous and asynchronous publishers/consumers
  • Background service pattern: Consumer polling runs as a background service
  • Pydantic serialization: Events are serialized/deserialized using Pydantic
  • Confluent Kafka client: Built on the robust confluent-kafka library
  • Distributed tracing: spakky-tracing integration for cross-service trace propagation

Components

Component Description
KafkaEventTransport Synchronous event transport (IEventTransport)
AsyncKafkaEventTransport Asynchronous event transport (IAsyncEventTransport)
KafkaEventConsumer Synchronous event consumer (background service)
AsyncKafkaEventConsumer Asynchronous event consumer (background service)
KafkaConnectionConfig Configuration via environment variables

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_kafka-6.3.0.tar.gz (8.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_kafka-6.3.0-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file spakky_kafka-6.3.0.tar.gz.

File metadata

  • Download URL: spakky_kafka-6.3.0.tar.gz
  • Upload date:
  • Size: 8.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_kafka-6.3.0.tar.gz
Algorithm Hash digest
SHA256 dbba599f3a019286e4d845f172876f399b89f1868bfcc46acf20bb33e99f7823
MD5 0afcf7cc4758e35eb04ea5f7dfd436db
BLAKE2b-256 9b3ce2878daa3fd041fc4ed764042a225c3d065cd0680500c8207d24446a8269

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_kafka-6.3.0.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_kafka-6.3.0-py3-none-any.whl.

File metadata

  • Download URL: spakky_kafka-6.3.0-py3-none-any.whl
  • Upload date:
  • Size: 12.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_kafka-6.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b562de51dd18e1a348590c1f689bc0349e9bc62f536fdca2df603ccc146ac2a9
MD5 9607a2ee4ac1eb6c924eb2d49031c291
BLAKE2b-256 e9ab2c8bb3ca1465866546cb94ef6c2f1fb88b22f6865f47dea34e09812c8cc8

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_kafka-6.3.0-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page