RabbitMQ plugin for Spakky framework
Project description
Spakky RabbitMQ
Spakky Framework를 위한 RabbitMQ 플러그인입니다.
설치
pip install spakky-rabbitmq
또는 Spakky extra로 설치합니다:
pip install spakky[rabbitmq]
설정
SPAKKY_RABBITMQ__ prefix를 가진 환경변수를 설정합니다:
export SPAKKY_RABBITMQ__USE_SSL="false"
export SPAKKY_RABBITMQ__HOST="localhost"
export SPAKKY_RABBITMQ__PORT="5672"
export SPAKKY_RABBITMQ__USER="guest"
export SPAKKY_RABBITMQ__PASSWORD="guest"
export SPAKKY_RABBITMQ__EXCHANGE_NAME="my-exchange" # Optional
사용법
이벤트 발행
from spakky.core.common.mutability import immutable
from spakky.domain.models.event import AbstractIntegrationEvent
from spakky.event.event_publisher import IEventPublisher
from spakky.core.pod.annotations.pod import Pod
@immutable
class UserCreatedEvent(AbstractIntegrationEvent):
user_id: int
email: str
@Pod()
class UserService:
def __init__(self, publisher: IEventPublisher) -> None:
self.publisher = publisher
def create_user(self, email: str) -> User:
user = User(email=email)
self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
return user
이벤트 수신
from spakky.event.stereotype.event_handler import EventHandler, on_event
@EventHandler()
class UserEventHandler:
def __init__(self, notification_service: NotificationService) -> None:
self.notification_service = notification_service
@on_event(UserCreatedEvent)
async def on_user_created(self, event: UserCreatedEvent) -> None:
await self.notification_service.send_welcome_email(event.email)
비동기 변형
비동기 애플리케이션에서는 IAsyncEventPublisher를 사용합니다:
from spakky.event.event_publisher import IAsyncEventPublisher
@Pod()
class AsyncUserService:
def __init__(self, publisher: IAsyncEventPublisher) -> None:
self.publisher = publisher
async def create_user(self, email: str) -> User:
user = User(email=email)
await self.publisher.publish(UserCreatedEvent(user_id=user.id, email=email))
return user
분산 트레이싱
spakky-tracing은 필수 의존성으로 자동 설치됩니다. ITracePropagator가 컨테이너에 등록되어 있으면 이벤트 발행/소비 시 TraceContext가 자동으로 전파됩니다.
- 발행 측:
IEventTransport.send()시 현재TraceContext를 메시지 헤더에 주입합니다 - 소비 측: 수신 메시지에서
TraceContext를 추출하여 자식 스팬을 생성합니다 - 헤더가 없으면 새로운 루트 트레이스를 시작합니다
주요 기능
- 자동 queue 선언: 이벤트 타입 이름을 기준으로 durable queue 생성
- 동기/비동기 지원: 동기 및 비동기 publisher/consumer 모두 지원
- Background service 패턴: consumer polling을 background service로 실행
- Pydantic 직렬화: 이벤트를 Pydantic으로 직렬화/역직렬화
- Exchange 라우팅: pub/sub 메시지 패턴을 위한 선택적 exchange
- SSL 지원: AMQPS protocol 기반 보안 연결
- 분산 트레이싱: 서비스 간 trace 전파를 위한
spakky-tracing통합
구성 요소
| 컴포넌트 | 설명 |
|---|---|
RabbitMQEventTransport |
동기 event transport(IEventTransport) |
AsyncRabbitMQEventTransport |
비동기 event transport(IAsyncEventTransport) |
RabbitMQEventConsumer |
동기 event consumer(background service) |
AsyncRabbitMQEventConsumer |
비동기 event consumer(background service) |
RabbitMQConnectionConfig |
환경변수 기반 설정 |
에러 처리
InvalidMessageError: 필수 metadata(consumer_tag또는delivery_tag)가 없는 message에서 발생
라이선스
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file spakky_rabbitmq-6.5.0.tar.gz.
File metadata
- Download URL: spakky_rabbitmq-6.5.0.tar.gz
- Upload date:
- Size: 8.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2b2392f5a2637cae1670570a910dc41c8c0432810730fd539233195efcb92669
|
|
| MD5 |
c120f72c5759409ea77d03bd8a80a8e1
|
|
| BLAKE2b-256 |
db20ecbf9be294fffd88e5dc860108dcf1a266474ce08607c14568af3de48221
|
Provenance
The following attestation bundles were made for spakky_rabbitmq-6.5.0.tar.gz:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_rabbitmq-6.5.0.tar.gz -
Subject digest:
2b2392f5a2637cae1670570a910dc41c8c0432810730fd539233195efcb92669 - Sigstore transparency entry: 1437042324
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file spakky_rabbitmq-6.5.0-py3-none-any.whl.
File metadata
- Download URL: spakky_rabbitmq-6.5.0-py3-none-any.whl
- Upload date:
- Size: 12.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e71de3034b9f43a69cd7dfbefaad1eb95b7475ba47c3e0ae10b7b1735ff297ba
|
|
| MD5 |
9e00db55eaad9f0fc57e2161e29f488d
|
|
| BLAKE2b-256 |
d41672ea39fb89519835681bf36fae83b28862f26264a16887938ee70ea332cc
|
Provenance
The following attestation bundles were made for spakky_rabbitmq-6.5.0-py3-none-any.whl:
Publisher:
release.yml on E5presso/spakky-framework
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
spakky_rabbitmq-6.5.0-py3-none-any.whl -
Subject digest:
e71de3034b9f43a69cd7dfbefaad1eb95b7475ba47c3e0ae10b7b1735ff297ba - Sigstore transparency entry: 1437042328
- Sigstore integration time:
-
Permalink:
E5presso/spakky-framework@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/E5presso
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@86c1d43bdc948ac432f27efeeeb2b56692d18ee4 -
Trigger Event:
workflow_dispatch
-
Statement type: