Apache Kafka plugin for Spakky framework
Project description
Spakky Kafka
Apache Kafka plugin for Spakky Framework.
Installation
pip install spakky-kafka
Or install via Spakky extras:
pip install spakky[kafka]
Configuration
Set environment variables with the SPAKKY_KAFKA__ prefix:
export SPAKKY_KAFKA__GROUP_ID="my-consumer-group"
export SPAKKY_KAFKA__CLIENT_ID="my-app"
export SPAKKY_KAFKA__BOOTSTRAP_SERVERS="localhost:9092"
export SPAKKY_KAFKA__AUTO_OFFSET_RESET="earliest" # earliest, latest, none
SASL Authentication (Optional)
export SPAKKY_KAFKA__SECURITY_PROTOCOL="SASL_SSL"
export SPAKKY_KAFKA__SASL_MECHANISM="PLAIN"
export SPAKKY_KAFKA__SASL_USERNAME="username"
export SPAKKY_KAFKA__SASL_PASSWORD="password"
Topic Configuration (Optional)
export SPAKKY_KAFKA__NUMBER_OF_PARTITIONS="3"
export SPAKKY_KAFKA__REPLICATION_FACTOR="1"
Usage
Event Publishing
from spakky.core.common.mutability import immutable
from spakky.domain.models.event import AbstractIntegrationEvent
from spakky.event.event_publisher import IEventPublisher
from spakky.core.pod.annotations.pod import Pod
@immutable
class UserCreatedEvent(AbstractIntegrationEvent):
user_id: int
email: str
@Pod()
class UserService:
def __init__(self, publisher: IEventPublisher) -> None:
self.publisher = publisher
def create_user(self, email: str) -> User:
user = User(email=email)
self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
return user
Event Consuming
from spakky.event.stereotype.event_handler import EventHandler, on_event
@EventHandler()
class UserEventHandler:
def __init__(self, notification_service: NotificationService) -> None:
self.notification_service = notification_service
@on_event(UserCreatedEvent)
async def on_user_created(self, event: UserCreatedEvent) -> None:
await self.notification_service.send_welcome_email(event.email)
Async Variants
For async applications, use IAsyncEventPublisher:
from spakky.event.event_publisher import IAsyncEventPublisher
@Pod()
class AsyncUserService:
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self.publisher = publisher
async def create_user(self, email: str) -> User:
user = User(email=email)
await self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
return user
Distributed Tracing
spakky-tracing은 필수 의존성으로 자동 설치됩니다. ITracePropagator가 컨테이너에 등록되어 있으면 이벤트 발행/소비 시 TraceContext가 자동으로 전파됩니다.
- 발행 측:
IEventTransport.send()시 현재TraceContext를 Kafka 메시지 헤더에 주입합니다 - 소비 측: 수신 메시지에서
TraceContext를 추출하여 자식 스팬을 생성합니다 - 헤더가 없으면 새로운 루트 트레이스를 시작합니다
Features
- Automatic topic creation: Topics are created based on event type names
- Sync and Async support: Both synchronous and asynchronous publishers/consumers
- Background service pattern: Consumer polling runs as a background service
- Pydantic serialization: Events are serialized/deserialized using Pydantic
- Confluent Kafka client: Built on the robust
confluent-kafkalibrary - Distributed tracing:
spakky-tracingintegration for cross-service trace propagation
Components
| Component | Description |
|---|---|
KafkaEventTransport |
Synchronous event transport (IEventTransport) |
AsyncKafkaEventTransport |
Asynchronous event transport (IAsyncEventTransport) |
KafkaEventConsumer |
Synchronous event consumer (background service) |
AsyncKafkaEventConsumer |
Asynchronous event consumer (background service) |
KafkaConnectionConfig |
Configuration via environment variables |
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spakky_kafka-6.4.0.tar.gz.
File metadata
- Download URL: spakky_kafka-6.4.0.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
45125d83072566c3edb06a4a372833f7445bdb3b45b9c60500b5efaf10a4abd6
|
|
| MD5 |
f8cac003602589335217cb100be9b6f8
|
|
| BLAKE2b-256 |
796e79483221e899891c8ef5385ab7f231b2356fc3048b56eda9ce8706194901
|
Provenance
The following attestation bundles were made for spakky_kafka-6.4.0.tar.gz:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_kafka-6.4.0.tar.gz -
Subject digest:
45125d83072566c3edb06a4a372833f7445bdb3b45b9c60500b5efaf10a4abd6 - Sigstore transparency entry: 1435904480
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@9b55f2f729fb6e6397bbf0d1ce584eaec72ffef8 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9b55f2f729fb6e6397bbf0d1ce584eaec72ffef8 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file spakky_kafka-6.4.0-py3-none-any.whl.
File metadata
- Download URL: spakky_kafka-6.4.0-py3-none-any.whl
- Upload date:
- Size: 12.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3fc1f25dc21841ae11a5c022ba2e6aa529042963bca8234fd63cfa3b26f18d62
|
|
| MD5 |
734e8ee9efcfcc574ae866f40b0ff34d
|
|
| BLAKE2b-256 |
06501e13a78de2defc2e1fc3a74b4b7f5267aaf86046f33897d4589912b9f450
|
Provenance
The following attestation bundles were made for spakky_kafka-6.4.0-py3-none-any.whl:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_kafka-6.4.0-py3-none-any.whl -
Subject digest:
3fc1f25dc21841ae11a5c022ba2e6aa529042963bca8234fd63cfa3b26f18d62 - Sigstore transparency entry: 1435904553
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@9b55f2f729fb6e6397bbf0d1ce584eaec72ffef8 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9b55f2f729fb6e6397bbf0d1ce584eaec72ffef8 -
Trigger Event:
workflow_dispatch
-
Statement type: