Skip to main content

Una librería para construir microservicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola y se despacha automáticamente mediante handlers tipados.

Project description

📦 Package Events Bus

Una librería para construir servicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola.


� Tabla de Contenidos


📝 Descripción

Esta librería permite construir microservicios desacoplados en Python, facilitando la publicación y consumo de eventos a través de AWS EventBridge y SQS. Cada caso de uso maneja su propia cola, permitiendo escalabilidad y mantenibilidad.


⚙️ Instalación

Instala la librería usando pip:

pip install package-events-bus

O usando poetry:

poetry add package-events-bus

🗂️ Estructura del Proyecto

package_events_bus/
├── __init__.py
├── config.py
├── exceptions.py
├── .vscode/settings.json
├── aws/
│   ├── event_bridge_publisher.py
│   └── sqs_dispatcher.py
├── core/
│   ├── __init__.py
│   ├── contracts/
│   │   ├── base_event.py
│   │   ├── deduplication.py
│   │   └── handler.py
│   ├── infrastructure/
│   │   ├── event_bus_publisher.py
│   │   ├── event_serializer.py
│   │   └── redis_deduplication.py
│   └── runtime/
│       └── event_register.py
  • /aws: Implementaciones específicas para servicios de AWS.
    • event_bridge_publisher.py: Publica eventos a AWS EventBridge.
    • sqs_dispatcher.py: Despacha mensajes de colas SQS y procesa eventos.

💡 ¿Cómo usarlo?

📤 Publicar eventos

  1. Crea una instancia de EventBridgePublisher y llama a publish con el evento deseado. Puedes cambiar el mecanismo de failover y la región AWS en cualquier momento usando los métodos set_failover y set_aws_region.
from events_bus.aws.event_bridge_publisher import EventBridgePublisher

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
)
 
# Cambiar el mecanismo de failover en tiempo de ejecución
from events_bus.core.infrastructure.redis_failover import RedisFailover
failover = RedisFailover(url="redis://localhost:6379/0")
event_bridge_publisher.set_failover(failover)

# Cambiar la región de AWS en tiempo de ejecución
event_bridge_publisher.set_aws_region("us-west-2")
  1. Define tu evento heredando de BaseEvent: Reemplaza DisbursementCreatedEvent con el nombre de tu evento, recuerda sobreescribir el método to_dict para serializar los atributos del evento que deseas enviar.
from events_bus.core.contracts.base_event import BaseEvent

@dataclass
class DisbursementCreatedEvent(BaseEvent):
    event_name = 'finkargo.portfolio.1.event.disbursement.created'

    def __init__(self, disbursement):
        super().__init__(event_name=self.event_name)
        self.id_disbursement = disbursement.id_disbursement

    def to_dict(self) -> dict:
        return {
            'id_disbursement': self.id_disbursement
        }

    @classmethod
    def from_dict(cls, event_id: str, occurred_on, attributes: dict) -> "DisbursementCreatedEvent":
        return cls(disbursement_id=attributes.get('id_disbursement'))

event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

🔄 Cambiar failover y región AWS dinámicamente

Puedes modificar el mecanismo de failover y la región AWS en cualquier momento después de crear la instancia:

# Cambiar failover
event_bridge_publisher.set_failover(RedisFailover(url="redis://localhost:6379/0"))

# Cambiar región AWS
event_bridge_publisher.set_aws_region("us-west-2")

🛡️ Uso de Failover para eventos no publicados

Por defecto, la librería utiliza un failover local en memoria (LocalFailover) para almacenar eventos que no se pueden publicar en AWS. Si deseas un mecanismo persistente, puedes usar RedisFailover u otro personalizado.

from events_bus.aws.event_bridge_publisher import EventBridgePublisher
from events_bus.core.infrastructure.redis_failover import RedisFailover

# Configura el failover con Redis (opcional, por defecto es LocalFailover en memoria)
failover = RedisFailover(url="redis://localhost:6379/0")

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
    failover=failover  # Si omites este argumento, usará LocalFailover por defecto
)

# Publica tu evento normalmente
event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

# Para reintentar publicar los eventos almacenados en failover:
event_bridge_publisher.publish_from_failover(total_events=10)
  • Si ocurre un error al publicar el evento en AWS, este se almacena en el failover configurado (por defecto, local en memoria).
  • Puedes reintentar la publicación de los eventos almacenados usando publish_from_failover.

También puedes implementar tu propio mecanismo de failover heredando de BaseFailover.


📅 Publicación programada con EventBridge Scheduler

Puedes programar la publicación de eventos en un momento específico usando AWS EventBridge Scheduler. El schedule se elimina automáticamente después de ejecutarse.

Requisito: Configura la variable de entorno SCHEDULER_ROLE_ARN con el ARN del rol IAM que EventBridge Scheduler usará para publicar en el bus de eventos.

ScheduleConfig soporta 3 formas de especificar el tiempo (mutuamente excluyentes):

from datetime import datetime, timedelta
from events_bus.aws.event_bridge_publisher import EventBridgePublisher
from events_bus.core import ScheduleConfig

publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
)

# Opción 1: datetime específico
publisher.publish(
    event=my_event,
    schedule=ScheduleConfig(schedule_at=datetime(2025, 12, 25, 10, 0, 0)),
)

# Opción 2: timedelta relativo desde ahora
publisher.publish(
    event=my_event,
    schedule=ScheduleConfig(schedule_in=timedelta(minutes=30)),
)

# Opción 3: string ISO 8601
publisher.publish(
    event=my_event,
    schedule=ScheduleConfig(schedule_iso="2025-08-01T14:00:00"),
)

📥 Consumir eventos

Opción 1: Consumir eventos con múltiples colas por caso de uso

  1. Hereda de AsyncHandler o SyncHandler y sobrescribe el método handle. El método handle recibe el evento como parámetro, recuerda que el evento debe heredar de BaseEvent, y el método from_dict debe estar implementado para poder deserializar el evento.

    Usa AsyncHandler para manejar eventos de forma asíncrona y SyncHandler para eventos sincrónicos.

from events_bus.core import AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event: DisbursementCreatedEvent):
        print(f"Handling event: {event.event_name}")
        print(f"Finished handling event: {event.event_name}")
  1. Registra el handler en el bus de eventos: Usa EventHandlerRegister para registrar el handler y la cola SQS asociada, recuerda que el target debe ser configurado previamente en AWS. Para mas información sobre la configuración de AWS, consulta la documentación oficial.
from events_bus.core import EventHandlerRegister
from events_bus.aws.sqs_dispatcher import SQSDispatcher

dispatcher = SQSDispatcher()
EventHandlerRegister.register_by_queue(
    queue_url='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
    handler=CustomAsyncHandler()
)

Opción 2: Consumir eventos usando una sola cola por microservicio

Si quieres consumir solo una cola SQS por microservicio (y despachar a los handlers según el tipo de evento), registra tus handlers por nombre de evento y usa start_from_one_queue:

from events_bus.aws.sqs_dispatcher import SQSDispatcher
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

dispatcher = SQSDispatcher()

import asyncio

async def main():
    await dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')

asyncio.run(main())
  • Así solo consumes una cola por microservicio, y despachas a los handlers según el tipo de evento recibido.

Opción 3: Usar Mangum para AWS Lambda (SQS/EventBridge)

Si tu microservicio se ejecuta como Lambda y recibe eventos desde SQS o EventBridge, puedes usar la integración con Mangum:

from events_bus.aws.mangum import MangumExtended
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

app = FastAPI()

handler = MangumExtended(app)
  • Registra tus handlers usando EventHandlerRegister.register_handler.
  • Mangum detectará automáticamente si el evento proviene de SQS o EventBridge y despachará al handler correcto.
  • AWS Lambda llamará a handler(event, context) y la librería despachará el evento al handler correspondiente.

⚡ ¿Cómo implementar el dispatcher en FastAPI?

Opción 1: Cola de SQS por caso de uso

Puedes consumir eventos en FastAPI usando múltiples colas SQS por caso de uso. Registra tus handlers y usa start para iniciar el dispatcher.

@asynccontextmanager
async def lifespan(app: FastAPI):
    await dispatcher.start()
    yield
    dispatcher.stop()

app = FastAPI(lifespan=lifespan)

Opción 2: Una cola SQS por microservicio

Puedes consumir eventos usando una sola cola SQS por microservicio en FastAPI utilizando el método start_from_one_queue de SQSDispatcher. Así, tu microservicio solo escucha una cola y despacha a los handlers registrados según el tipo de evento recibido.

import asyncio

dispatcher = SQSDispatcher()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Consumir solo una cola por microservicio
    task = asyncio.create_task(
        dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')
    )
    yield
    dispatcher.stop()
    await task

app = FastAPI(lifespan=lifespan)

🛡️ Opcional: Configuración de Redis para deduplicación

from events_bus.core import RedisDeduplication
from events_bus.aws.sqs_dispatcher import SQSDispatcher

deduplication = RedisDeduplication(
    url='redis://localhost:6379/0',
    ttl=3600,
)

dispatcher = SQSDispatcher(deduplication=deduplication)

Si no deseas usar Redis, puedes heredar de BaseDeduplication e implementar tu propia lógica.


⚙️ Variables por Defecto

from events_bus import CONFIG
CONFIG.set_envs(os.environ, env_file='.env')

Puedes modificar las variables por defecto de la librería usando un archivo .env o configurando las variables de entorno directamente. Las variables configurables son:

Variable Tipo Por defecto Descripción
MAX_NUMBER_OF_MESSAGES int 5 Número máximo de mensajes a recibir por llamada.
WAIT_TIME_SECONDS int 10 Tiempo de espera para recibir mensajes.
VISIBILITY_TIMEOUT int 30 Tiempo de visibilidad del mensaje en la cola.
AWS_CLIENT_URL str None URL del cliente de AWS (útil para LocalStack u otros entornos locales).
AWS_REGION_NAME str us-east-1 Región de AWS.
SLEEP_BETWEEN_MESSAGES_SECONDS float 0.1 Tiempo de espera entre mensajes.
ERROR_SLEEP_SECONDS int 5 Tiempo de espera en caso de error.
SCHEDULER_ROLE_ARN str None ARN del rol IAM para EventBridge Scheduler (requerido para publicación programada).

ℹ️ Tip: Usa .env para mantener tu configuración fuera del código fuente.


🏷️ Definición de Topics

Los topics deben seguir la convención:

COMPANY.SERVICE.VERSION.MESSAGE_TYPE.RESOURCE_NAME.(EVENT_COMMAND_NAME)
  • COMPANY: Nombre de la empresa (Finkargo para internos).
  • SERVICE: Servicio generador del evento.
  • VERSION: Versión del topic.
  • MESSAGE_TYPE: command o event.
  • RESOURCE_NAME: Entidad relacionada.
  • EVENT_COMMAND_NAME: Verbo en pasado para eventos (created), infinitivo para comandos (create).

Regex de validación:

TOPIC_REGEX = "^[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\.\d+\.(command|event)\.[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+$"

Basado en: Topics Definition | AsyncApi


🛠️ TODO

  • Mejorar el control de excepciones en la publicación de eventos.
  • Agregar más ejemplos de integración con otros frameworks.
  • Documentar casos de uso avanzados.
  • Implementar métricas y logging para los eventos publicados y consumidos.
  • Añadir pruebas unitarias y de integración.
  • Mejorar la documentación en inglés y español.

📝 Buenas Prácticas

  • Usa nombres de eventos claros y consistentes.
  • Implementa deduplicación para evitar procesar eventos repetidos.
  • Mantén tus handlers simples y enfocados en una sola responsabilidad.
  • Documenta tus eventos y handlers.

📄 Licencia

Este proyecto está bajo la licencia MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

package_events_bus-0.1.43.tar.gz (21.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

package_events_bus-0.1.43-py3-none-any.whl (25.6 kB view details)

Uploaded Python 3

File details

Details for the file package_events_bus-0.1.43.tar.gz.

File metadata

  • Download URL: package_events_bus-0.1.43.tar.gz
  • Upload date:
  • Size: 21.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.13.13 Linux/6.17.0-1010-azure

File hashes

Hashes for package_events_bus-0.1.43.tar.gz
Algorithm Hash digest
SHA256 a2c14fa34e5a4715fd701be987c0dd674007b9180cbb7bac08b5f27e52b1652f
MD5 8cd0ebe8406dfbcbcf64c0b421300b82
BLAKE2b-256 2c244986c12bfd34ff6040fa9ce7f65aa795612c5eaa3ec5b71fdd133327da31

See more details on using hashes here.

File details

Details for the file package_events_bus-0.1.43-py3-none-any.whl.

File metadata

  • Download URL: package_events_bus-0.1.43-py3-none-any.whl
  • Upload date:
  • Size: 25.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.13.13 Linux/6.17.0-1010-azure

File hashes

Hashes for package_events_bus-0.1.43-py3-none-any.whl
Algorithm Hash digest
SHA256 89946449c597e55160b73a047aa908bcf514c511b28d1c1666dfd25eeac1275d
MD5 e379b2d6f8588bb127fd746cb2c14b05
BLAKE2b-256 87d844dd82a099ebbd9223d23947452e0f2e0b86f0c297972c7938c66342a3de

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page