Una librería para construir microservicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola y se despacha automáticamente mediante handlers tipados.
Project description
📦 Package Events Bus
Una librería para construir servicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola.
� Tabla de Contenidos
- 📦 Package Events Bus
- � Tabla de Contenidos
- 📝 Descripción
- ⚙️ Instalación
- 🗂️ Estructura del Proyecto
- 💡 ¿Cómo usarlo?
- ⚙️ Variables por Defecto
- 🏷️ Definición de Topics
- 🛠️ TODO
- 📝 Buenas Prácticas
- 📄 Licencia
📝 Descripción
Esta librería permite construir microservicios desacoplados en Python, facilitando la publicación y consumo de eventos a través de AWS EventBridge y SQS. Cada caso de uso maneja su propia cola, permitiendo escalabilidad y mantenibilidad.
⚙️ Instalación
Instala la librería usando pip:
pip install package-events-bus
O usando poetry:
poetry add package-events-bus
🗂️ Estructura del Proyecto
package_events_bus/
├── __init__.py
├── config.py
├── exceptions.py
├── .vscode/settings.json
├── aws/
│ ├── event_bridge_publisher.py
│ └── sqs_dispatcher.py
├── core/
│ ├── __init__.py
│ ├── contracts/
│ │ ├── base_event.py
│ │ ├── deduplication.py
│ │ └── handler.py
│ ├── infrastructure/
│ │ ├── event_bus_publisher.py
│ │ ├── event_serializer.py
│ │ └── redis_deduplication.py
│ └── runtime/
│ └── event_register.py
- /aws: Implementaciones específicas para servicios de AWS.
- event_bridge_publisher.py: Publica eventos a AWS EventBridge.
- sqs_dispatcher.py: Despacha mensajes de colas SQS y procesa eventos.
💡 ¿Cómo usarlo?
📤 Publicar eventos
- Crea una instancia de
EventBridgePublishery llama apublishcon el evento deseado. Puedes cambiar el mecanismo de failover y la región AWS en cualquier momento usando los métodosset_failoveryset_aws_region.
from events_bus.aws.event_bridge_publisher import EventBridgePublisher
event_bridge_publisher = EventBridgePublisher(
bus_name="finkargo-events",
source="my.service",
)
# Cambiar el mecanismo de failover en tiempo de ejecución
from events_bus.core.infrastructure.redis_failover import RedisFailover
failover = RedisFailover(url="redis://localhost:6379/0")
event_bridge_publisher.set_failover(failover)
# Cambiar la región de AWS en tiempo de ejecución
event_bridge_publisher.set_aws_region("us-west-2")
- Define tu evento heredando de
BaseEvent: ReemplazaDisbursementCreatedEventcon el nombre de tu evento, recuerda sobreescribir el métodoto_dictpara serializar los atributos del evento que deseas enviar.
from events_bus.core.contracts.base_event import BaseEvent
@dataclass
class DisbursementCreatedEvent(BaseEvent):
event_name = 'finkargo.portfolio.1.event.disbursement.created'
def __init__(self, disbursement):
super().__init__(event_name=self.event_name)
self.id_disbursement = disbursement.id_disbursement
def to_dict(self) -> dict:
return {
'id_disbursement': self.id_disbursement
}
@classmethod
def from_dict(cls, event_id: str, occurred_on, attributes: dict) -> "DisbursementCreatedEvent":
return cls(disbursement_id=attributes.get('id_disbursement'))
event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))
🔄 Cambiar failover y región AWS dinámicamente
Puedes modificar el mecanismo de failover y la región AWS en cualquier momento después de crear la instancia:
# Cambiar failover
event_bridge_publisher.set_failover(RedisFailover(url="redis://localhost:6379/0"))
# Cambiar región AWS
event_bridge_publisher.set_aws_region("us-west-2")
🛡️ Uso de Failover para eventos no publicados
Por defecto, la librería utiliza un failover local en memoria (LocalFailover) para almacenar eventos que no se pueden publicar en AWS. Si deseas un mecanismo persistente, puedes usar RedisFailover u otro personalizado.
from events_bus.aws.event_bridge_publisher import EventBridgePublisher
from events_bus.core.infrastructure.redis_failover import RedisFailover
# Configura el failover con Redis (opcional, por defecto es LocalFailover en memoria)
failover = RedisFailover(url="redis://localhost:6379/0")
event_bridge_publisher = EventBridgePublisher(
bus_name="finkargo-events",
source="my.service",
failover=failover # Si omites este argumento, usará LocalFailover por defecto
)
# Publica tu evento normalmente
event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))
# Para reintentar publicar los eventos almacenados en failover:
event_bridge_publisher.publish_from_failover(total_events=10)
- Si ocurre un error al publicar el evento en AWS, este se almacena en el failover configurado (por defecto, local en memoria).
- Puedes reintentar la publicación de los eventos almacenados usando
publish_from_failover.
También puedes implementar tu propio mecanismo de failover heredando de
BaseFailover.
📅 Publicación programada con EventBridge Scheduler
Puedes programar la publicación de eventos en un momento específico usando AWS EventBridge Scheduler. El schedule se elimina automáticamente después de ejecutarse.
Requisito: Configura la variable de entorno
SCHEDULER_ROLE_ARNcon el ARN del rol IAM que EventBridge Scheduler usará para publicar en el bus de eventos.
ScheduleConfig soporta 3 formas de especificar el tiempo (mutuamente excluyentes):
from datetime import datetime, timedelta
from events_bus.aws.event_bridge_publisher import EventBridgePublisher
from events_bus.core import ScheduleConfig
publisher = EventBridgePublisher(
bus_name="finkargo-events",
source="my.service",
)
# Opción 1: datetime específico
publisher.publish(
event=my_event,
schedule=ScheduleConfig(schedule_at=datetime(2025, 12, 25, 10, 0, 0)),
)
# Opción 2: timedelta relativo desde ahora
publisher.publish(
event=my_event,
schedule=ScheduleConfig(schedule_in=timedelta(minutes=30)),
)
# Opción 3: string ISO 8601
publisher.publish(
event=my_event,
schedule=ScheduleConfig(schedule_iso="2025-08-01T14:00:00"),
)
📥 Consumir eventos
Opción 1: Consumir eventos con múltiples colas por caso de uso
-
Hereda de
AsyncHandleroSyncHandlery sobrescribe el métodohandle. El métodohandlerecibe el evento como parámetro, recuerda que el evento debe heredar deBaseEvent, y el métodofrom_dictdebe estar implementado para poder deserializar el evento.Usa
AsyncHandlerpara manejar eventos de forma asíncrona ySyncHandlerpara eventos sincrónicos.
from events_bus.core import AsyncHandler
class CustomAsyncHandler(AsyncHandler):
async def handle(self, event: DisbursementCreatedEvent):
print(f"Handling event: {event.event_name}")
print(f"Finished handling event: {event.event_name}")
- Registra el handler en el bus de eventos:
Usa
EventHandlerRegisterpara registrar el handler y la cola SQS asociada, recuerda que el target debe ser configurado previamente en AWS. Para mas información sobre la configuración de AWS, consulta la documentación oficial.
from events_bus.core import EventHandlerRegister
from events_bus.aws.sqs_dispatcher import SQSDispatcher
dispatcher = SQSDispatcher()
EventHandlerRegister.register_by_queue(
queue_url='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
handler=CustomAsyncHandler()
)
Opción 2: Consumir eventos usando una sola cola por microservicio
Si quieres consumir solo una cola SQS por microservicio (y despachar a los handlers según el tipo de evento), registra tus handlers por nombre de evento y usa start_from_one_queue:
from events_bus.aws.sqs_dispatcher import SQSDispatcher
from events_bus.core import EventHandlerRegister, AsyncHandler
class CustomAsyncHandler(AsyncHandler):
async def handle(self, event):
print(f"Handling event: {event.event_name}")
# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
event_name='finkargo.portfolio.1.event.disbursement.created',
handler=CustomAsyncHandler()
)
dispatcher = SQSDispatcher()
import asyncio
async def main():
await dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')
asyncio.run(main())
- Así solo consumes una cola por microservicio, y despachas a los handlers según el tipo de evento recibido.
Opción 3: Usar Mangum para AWS Lambda (SQS/EventBridge)
Si tu microservicio se ejecuta como Lambda y recibe eventos desde SQS o EventBridge, puedes usar la integración con Mangum:
from events_bus.aws.mangum import MangumExtended
from events_bus.core import EventHandlerRegister, AsyncHandler
class CustomAsyncHandler(AsyncHandler):
async def handle(self, event):
print(f"Handling event: {event.event_name}")
# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
event_name='finkargo.portfolio.1.event.disbursement.created',
handler=CustomAsyncHandler()
)
app = FastAPI()
handler = MangumExtended(app)
- Registra tus handlers usando
EventHandlerRegister.register_handler. - Mangum detectará automáticamente si el evento proviene de SQS o EventBridge y despachará al handler correcto.
- AWS Lambda llamará a
handler(event, context)y la librería despachará el evento al handler correspondiente.
⚡ ¿Cómo implementar el dispatcher en FastAPI?
Opción 1: Cola de SQS por caso de uso
Puedes consumir eventos en FastAPI usando múltiples colas SQS por caso de uso. Registra tus handlers y usa start para iniciar el dispatcher.
@asynccontextmanager
async def lifespan(app: FastAPI):
await dispatcher.start()
yield
dispatcher.stop()
app = FastAPI(lifespan=lifespan)
Opción 2: Una cola SQS por microservicio
Puedes consumir eventos usando una sola cola SQS por microservicio en FastAPI utilizando el método start_from_one_queue de SQSDispatcher. Así, tu microservicio solo escucha una cola y despacha a los handlers registrados según el tipo de evento recibido.
import asyncio
dispatcher = SQSDispatcher()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Consumir solo una cola por microservicio
task = asyncio.create_task(
dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')
)
yield
dispatcher.stop()
await task
app = FastAPI(lifespan=lifespan)
🛡️ Opcional: Configuración de Redis para deduplicación
from events_bus.core import RedisDeduplication
from events_bus.aws.sqs_dispatcher import SQSDispatcher
deduplication = RedisDeduplication(
url='redis://localhost:6379/0',
ttl=3600,
)
dispatcher = SQSDispatcher(deduplication=deduplication)
Si no deseas usar Redis, puedes heredar de
BaseDeduplicatione implementar tu propia lógica.
⚙️ Variables por Defecto
from events_bus import CONFIG
CONFIG.set_envs(os.environ, env_file='.env')
Puedes modificar las variables por defecto de la librería usando un archivo .env o configurando las variables de entorno directamente. Las variables configurables son:
| Variable | Tipo | Por defecto | Descripción |
|---|---|---|---|
| MAX_NUMBER_OF_MESSAGES | int | 5 | Número máximo de mensajes a recibir por llamada. |
| WAIT_TIME_SECONDS | int | 10 | Tiempo de espera para recibir mensajes. |
| VISIBILITY_TIMEOUT | int | 30 | Tiempo de visibilidad del mensaje en la cola. |
| AWS_CLIENT_URL | str | None | URL del cliente de AWS (útil para LocalStack u otros entornos locales). |
| AWS_REGION_NAME | str | us-east-1 | Región de AWS. |
| SLEEP_BETWEEN_MESSAGES_SECONDS | float | 0.1 | Tiempo de espera entre mensajes. |
| ERROR_SLEEP_SECONDS | int | 5 | Tiempo de espera en caso de error. |
| SCHEDULER_ROLE_ARN | str | None | ARN del rol IAM para EventBridge Scheduler (requerido para publicación programada). |
ℹ️ Tip: Usa
.envpara mantener tu configuración fuera del código fuente.
🏷️ Definición de Topics
Los topics deben seguir la convención:
COMPANY.SERVICE.VERSION.MESSAGE_TYPE.RESOURCE_NAME.(EVENT_COMMAND_NAME)
- COMPANY: Nombre de la empresa (
Finkargopara internos). - SERVICE: Servicio generador del evento.
- VERSION: Versión del topic.
- MESSAGE_TYPE:
commandoevent. - RESOURCE_NAME: Entidad relacionada.
- EVENT_COMMAND_NAME: Verbo en pasado para eventos (
created), infinitivo para comandos (create).
Regex de validación:
TOPIC_REGEX = "^[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\.\d+\.(command|event)\.[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+$"
Basado en: Topics Definition | AsyncApi
🛠️ TODO
- Mejorar el control de excepciones en la publicación de eventos.
- Agregar más ejemplos de integración con otros frameworks.
- Documentar casos de uso avanzados.
- Implementar métricas y logging para los eventos publicados y consumidos.
- Añadir pruebas unitarias y de integración.
- Mejorar la documentación en inglés y español.
📝 Buenas Prácticas
- Usa nombres de eventos claros y consistentes.
- Implementa deduplicación para evitar procesar eventos repetidos.
- Mantén tus handlers simples y enfocados en una sola responsabilidad.
- Documenta tus eventos y handlers.
📄 Licencia
Este proyecto está bajo la licencia MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file package_events_bus-0.1.43.tar.gz.
File metadata
- Download URL: package_events_bus-0.1.43.tar.gz
- Upload date:
- Size: 21.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.13.13 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a2c14fa34e5a4715fd701be987c0dd674007b9180cbb7bac08b5f27e52b1652f
|
|
| MD5 |
8cd0ebe8406dfbcbcf64c0b421300b82
|
|
| BLAKE2b-256 |
2c244986c12bfd34ff6040fa9ce7f65aa795612c5eaa3ec5b71fdd133327da31
|
File details
Details for the file package_events_bus-0.1.43-py3-none-any.whl.
File metadata
- Download URL: package_events_bus-0.1.43-py3-none-any.whl
- Upload date:
- Size: 25.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.13.13 Linux/6.17.0-1010-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89946449c597e55160b73a047aa908bcf514c511b28d1c1666dfd25eeac1275d
|
|
| MD5 |
e379b2d6f8588bb127fd746cb2c14b05
|
|
| BLAKE2b-256 |
87d844dd82a099ebbd9223d23947452e0f2e0b86f0c297972c7938c66342a3de
|