Skip to main content

Apache Kafka plugin for Spakky framework

Project description

Spakky Kafka

Apache Kafka plugin for Spakky Framework.

Installation

pip install spakky-kafka

Or install via Spakky extras:

pip install spakky[kafka]

Configuration

Set environment variables with the SPAKKY_KAFKA_ prefix:

export SPAKKY_KAFKA_GROUP_ID="my-consumer-group"
export SPAKKY_KAFKA_CLIENT_ID="my-app"
export SPAKKY_KAFKA_BOOTSTRAP_SERVERS="localhost:9092"
export SPAKKY_KAFKA_AUTO_OFFSET_RESET="earliest"  # earliest, latest, none

SASL Authentication (Optional)

export SPAKKY_KAFKA_SECURITY_PROTOCOL="SASL_SSL"
export SPAKKY_KAFKA_SASL_MECHANISM="PLAIN"
export SPAKKY_KAFKA_SASL_USERNAME="username"
export SPAKKY_KAFKA_SASL_PASSWORD="password"

Topic Configuration (Optional)

export SPAKKY_KAFKA_NUMBER_OF_PARTITIONS="3"
export SPAKKY_KAFKA_REPLICATION_FACTOR="1"

Usage

Event Publishing

from spakky.domain.models.event import AbstractDomainEvent
from spakky.domain.ports.event.event_publisher import IEventPublisher
from spakky.pod.annotations.pod import Pod

class UserCreatedEvent(AbstractDomainEvent):
    user_id: int
    email: str

@Pod()
class UserService:
    def __init__(self, publisher: IEventPublisher) -> None:
        self.publisher = publisher

    def create_user(self, email: str) -> User:
        user = User(email=email)
        self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
        return user

Event Consuming

from spakky.stereotype.event_handler import EventHandler, on_event

@EventHandler()
class UserEventHandler:
    def __init__(self, notification_service: NotificationService) -> None:
        self.notification_service = notification_service

    @on_event(UserCreatedEvent)
    async def on_user_created(self, event: UserCreatedEvent) -> None:
        await self.notification_service.send_welcome_email(event.email)

Async Variants

For async applications, use IAsyncEventPublisher:

from spakky.domain.ports.event.event_publisher import IAsyncEventPublisher

@Pod()
class AsyncUserService:
    def __init__(self, publisher: IAsyncEventPublisher) -> None:
        self.publisher = publisher

    async def create_user(self, email: str) -> User:
        user = User(email=email)
        await self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
        return user

Features

  • Automatic topic creation: Topics are created based on event type names
  • Sync and Async support: Both synchronous and asynchronous publishers/consumers
  • Background service pattern: Consumer polling runs as a background service
  • Pydantic serialization: Events are serialized/deserialized using Pydantic
  • Confluent Kafka client: Built on the robust confluent-kafka library

Components

Component Description
KafkaEventPublisher Synchronous event publisher
AsyncKafkaEventPublisher Asynchronous event publisher
KafkaEventConsumer Synchronous event consumer (background service)
AsyncKafkaEventConsumer Asynchronous event consumer (background service)
KafkaConnectionConfig Configuration via environment variables

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spakky_kafka-3.4.0.tar.gz (6.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spakky_kafka-3.4.0-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file spakky_kafka-3.4.0.tar.gz.

File metadata

  • Download URL: spakky_kafka-3.4.0.tar.gz
  • Upload date:
  • Size: 6.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_kafka-3.4.0.tar.gz
Algorithm Hash digest
SHA256 f92c20089e8d2c891920d3544967fb323e690b0cc272f12e669eb1ae9935857a
MD5 51709026e07d4b8a28bbc7640be6fdad
BLAKE2b-256 182f6160d0025e941322e025ed306990597903cc9cad8b26eded64e2a24a8797

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_kafka-3.4.0.tar.gz:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file spakky_kafka-3.4.0-py3-none-any.whl.

File metadata

  • Download URL: spakky_kafka-3.4.0-py3-none-any.whl
  • Upload date:
  • Size: 10.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for spakky_kafka-3.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 abbe54e6d367913fd9d9110a96606d845a35abc1d44c6476db2e0130a092c066
MD5 08eafdb708b6fd4a0d7fdda15951ee07
BLAKE2b-256 162134ec9207edec050388e5da5214b333ec3959f451e652b0cd5ea0d294eadc

See more details on using hashes here.

Provenance

The following attestation bundles were made for spakky_kafka-3.4.0-py3-none-any.whl:

Publisher: release.yml on E5presso/spakky-framework

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page