Skip to main content

Una librería para construir microservicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola y se despacha automáticamente mediante handlers tipados.

Project description

📦 Package Events Bus

Una librería para construir servicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola.


� Tabla de Contenidos


📝 Descripción

Esta librería permite construir microservicios desacoplados en Python, facilitando la publicación y consumo de eventos a través de AWS EventBridge y SQS. Cada caso de uso maneja su propia cola, permitiendo escalabilidad y mantenibilidad.


⚙️ Instalación

Instala la librería usando pip:

pip install package-events-bus

O usando poetry:

poetry add package-events-bus

🗂️ Estructura del Proyecto

package_events_bus/
├── __init__.py
├── config.py
├── exceptions.py
├── .vscode/settings.json
├── aws/
│   ├── event_bridge_publisher.py
│   └── sqs_dispatcher.py
├── core/
│   ├── __init__.py
│   ├── contracts/
│   │   ├── base_event.py
│   │   ├── deduplication.py
│   │   └── handler.py
│   ├── infrastructure/
│   │   ├── event_bus_publisher.py
│   │   ├── event_serializer.py
│   │   └── redis_deduplication.py
│   └── runtime/
│       └── event_register.py
  • /aws: Implementaciones específicas para servicios de AWS.
    • event_bridge_publisher.py: Publica eventos a AWS EventBridge.
    • sqs_dispatcher.py: Despacha mensajes de colas SQS y procesa eventos.

💡 ¿Cómo usarlo?

📤 Publicar eventos

  1. Crea una instancia de EventBridgePublisher y llama a publish con el evento deseado.
from events_bus.aws.event_bridge_publisher import EventBridgePublisher

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
)
  1. Define tu evento heredando de BaseEvent: Reemplaza DisbursementCreatedEvent con el nombre de tu evento, recuerda sobreescribir el método to_dict para serializar los atributos del evento que deseas enviar.
from events_bus.core.contracts.base_event import BaseEvent

@dataclass
class DisbursementCreatedEvent(BaseEvent):
    event_name = 'finkargo.portfolio.1.event.disbursement.created'

    def __init__(self, disbursement):
        super().__init__(event_name=self.event_name)
        self.id_disbursement = disbursement.id_disbursement

    def to_dict(self) -> dict:
        return {
            'id_disbursement': self.id_disbursement
        }

    @classmethod
    def from_dict(cls, event_id: str, occurred_on, attributes: dict) -> "DisbursementCreatedEvent":
        return cls(disbursement_id=attributes.get('id_disbursement'))

event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

🛡️ Uso de Failover para eventos no publicados

Por defecto, la librería utiliza un failover local en memoria (LocalFailover) para almacenar eventos que no se pueden publicar en AWS. Si deseas un mecanismo persistente, puedes usar RedisFailover u otro personalizado.

from events_bus.aws.event_bridge_publisher import EventBridgePublisher
from events_bus.core.infrastructure.redis_failover import RedisFailover

# Configura el failover con Redis (opcional, por defecto es LocalFailover en memoria)
failover = RedisFailover(url="redis://localhost:6379/0")

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
    failover=failover  # Si omites este argumento, usará LocalFailover por defecto
)

# Publica tu evento normalmente
event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

# Para reintentar publicar los eventos almacenados en failover:
event_bridge_publisher.publish_from_failover(total_events=10)
  • Si ocurre un error al publicar el evento en AWS, este se almacena en el failover configurado (por defecto, local en memoria).
  • Puedes reintentar la publicación de los eventos almacenados usando publish_from_failover.

También puedes implementar tu propio mecanismo de failover heredando de BaseFailover.


📥 Consumir eventos

Opción 1: Consumir eventos con múltiples colas por caso de uso

  1. Hereda de AsyncHandler o SyncHandler y sobrescribe el método handle. El método handle recibe el evento como parámetro, recuerda que el evento debe heredar de BaseEvent, y el método from_dict debe estar implementado para poder deserializar el evento.

    Usa AsyncHandler para manejar eventos de forma asíncrona y SyncHandler para eventos sincrónicos.

from events_bus.core import AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event: DisbursementCreatedEvent):
        print(f"Handling event: {event.event_name}")
        print(f"Finished handling event: {event.event_name}")
  1. Registra el handler en el bus de eventos: Usa EventHandlerRegister para registrar el handler y la cola SQS asociada, recuerda que el target debe ser configurado previamente en AWS. Para mas información sobre la configuración de AWS, consulta la documentación oficial.
from events_bus.core import EventHandlerRegister
from events_bus.aws.sqs_dispatcher import SQSDispatcher

dispatcher = SQSDispatcher()
EventHandlerRegister.register_by_queue(
    queue_url='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
    handler=CustomAsyncHandler()
)

Opción 2: Consumir eventos usando una sola cola por microservicio

Si quieres consumir solo una cola SQS por microservicio (y despachar a los handlers según el tipo de evento), registra tus handlers por nombre de evento y usa start_from_one_queue:

from events_bus.aws.sqs_dispatcher import SQSDispatcher
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

dispatcher = SQSDispatcher()

import asyncio

async def main():
    await dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')

asyncio.run(main())
  • Así solo consumes una cola por microservicio, y despachas a los handlers según el tipo de evento recibido.

Opción 3: Usar Mangum para AWS Lambda (SQS/EventBridge)

Si tu microservicio se ejecuta como Lambda y recibe eventos desde SQS o EventBridge, puedes usar la integración con Mangum:

from events_bus.aws.mangum import MangumExtended
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

app = FastAPI()

handler = MangumExtended(app)
  • Registra tus handlers usando EventHandlerRegister.register_handler.
  • Mangum detectará automáticamente si el evento proviene de SQS o EventBridge y despachará al handler correcto.
  • AWS Lambda llamará a handler(event, context) y la librería despachará el evento al handler correspondiente.

⚡ ¿Cómo implementar el dispatcher en FastAPI?

Opción 1: Cola de SQS por caso de uso

Puedes consumir eventos en FastAPI usando múltiples colas SQS por caso de uso. Registra tus handlers y usa start para iniciar el dispatcher.

@asynccontextmanager
async def lifespan(app: FastAPI):
    await dispatcher.start()
    yield
    dispatcher.stop()

app = FastAPI(lifespan=lifespan)

Opción 2: Una cola SQS por microservicio

Puedes consumir eventos usando una sola cola SQS por microservicio en FastAPI utilizando el método start_from_one_queue de SQSDispatcher. Así, tu microservicio solo escucha una cola y despacha a los handlers registrados según el tipo de evento recibido.

import asyncio

dispatcher = SQSDispatcher()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Consumir solo una cola por microservicio
    task = asyncio.create_task(
        dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')
    )
    yield
    dispatcher.stop()
    await task

app = FastAPI(lifespan=lifespan)

🛡️ Opcional: Configuración de Redis para deduplicación

from events_bus.core import RedisDeduplication
from events_bus.aws.sqs_dispatcher import SQSDispatcher

deduplication = RedisDeduplication(
    url='redis://localhost:6379/0',
    ttl=3600,
)

dispatcher = SQSDispatcher(deduplication=deduplication)

Si no deseas usar Redis, puedes heredar de BaseDeduplication e implementar tu propia lógica.


⚙️ Variables por Defecto

from events_bus import CONFIG
CONFIG.set_envs(os.environ, env_file='.env')

Puedes modificar las variables por defecto de la librería usando un archivo .env o configurando las variables de entorno directamente. Las variables configurables son:

Variable Tipo Por defecto Descripción
MAX_NUMBER_OF_MESSAGES int 5 Número máximo de mensajes a recibir por llamada.
WAIT_TIME_SECONDS int 10 Tiempo de espera para recibir mensajes.
VISIBILITY_TIMEOUT int 30 Tiempo de visibilidad del mensaje en la cola.
AWS_CLIENT_URL str None URL del cliente de AWS (útil para LocalStack u otros entornos locales).
AWS_REGION_NAME str us-east-1 Región de AWS.
SLEEP_BETWEEN_MESSAGES_SECONDS float 0.1 Tiempo de espera entre mensajes.
ERROR_SLEEP_SECONDS int 5 Tiempo de espera en caso de error.

ℹ️ Tip: Usa .env para mantener tu configuración fuera del código fuente.


🏷️ Definición de Topics

Los topics deben seguir la convención:

COMPANY.SERVICE.VERSION.MESSAGE_TYPE.RESOURCE_NAME.(EVENT_COMMAND_NAME)
  • COMPANY: Nombre de la empresa (Finkargo para internos).
  • SERVICE: Servicio generador del evento.
  • VERSION: Versión del topic.
  • MESSAGE_TYPE: command o event.
  • RESOURCE_NAME: Entidad relacionada.
  • EVENT_COMMAND_NAME: Verbo en pasado para eventos (created), infinitivo para comandos (create).

Regex de validación:

TOPIC_REGEX = "^[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\.\d+\.(command|event)\.[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+$"

Basado en: Topics Definition | AsyncApi


🛠️ TODO

  • Mejorar el control de excepciones en la publicación de eventos.
  • Agregar más ejemplos de integración con otros frameworks.
  • Documentar casos de uso avanzados.
  • Implementar métricas y logging para los eventos publicados y consumidos.
  • Añadir pruebas unitarias y de integración.
  • Mejorar la documentación en inglés y español.

📝 Buenas Prácticas

  • Usa nombres de eventos claros y consistentes.
  • Implementa deduplicación para evitar procesar eventos repetidos.
  • Mantén tus handlers simples y enfocados en una sola responsabilidad.
  • Documenta tus eventos y handlers.

📄 Licencia

Este proyecto está bajo la licencia MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

package_events_bus-0.1.37.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

package_events_bus-0.1.37-py3-none-any.whl (22.8 kB view details)

Uploaded Python 3

File details

Details for the file package_events_bus-0.1.37.tar.gz.

File metadata

  • Download URL: package_events_bus-0.1.37.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.4 Linux/6.11.0-1015-azure

File hashes

Hashes for package_events_bus-0.1.37.tar.gz
Algorithm Hash digest
SHA256 37bc63f0e1471cb8a73a7a93c3ee6bfaac7bb7df1762c4b043da8a3baf05d39c
MD5 63f66978db533e7c7f85e72140c44766
BLAKE2b-256 1a84294a5308cac52c75ad93bb8684e4cc1d435dd0dc4a6ebdd3b1961265fad8

See more details on using hashes here.

File details

Details for the file package_events_bus-0.1.37-py3-none-any.whl.

File metadata

  • Download URL: package_events_bus-0.1.37-py3-none-any.whl
  • Upload date:
  • Size: 22.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.4 Linux/6.11.0-1015-azure

File hashes

Hashes for package_events_bus-0.1.37-py3-none-any.whl
Algorithm Hash digest
SHA256 deac2b6d20219abf750daabbe269c7805411e01c14b2b3102874b64d310ccc93
MD5 d2de0492162ddd43ea45800b27033a8d
BLAKE2b-256 fc978b4f4f2edbca35bfd013ecbc4ca031cc3a889cf1e6eda85bfd0348e13e4c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page