Skip to main content

Una librería para construir microservicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola y se despacha automáticamente mediante handlers tipados.

Project description

📦 Package Events Bus

Una librería para construir servicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola.


� Tabla de Contenidos


📝 Descripción

Esta librería permite construir microservicios desacoplados en Python, facilitando la publicación y consumo de eventos a través de AWS EventBridge y SQS. Cada caso de uso maneja su propia cola, permitiendo escalabilidad y mantenibilidad.


⚙️ Instalación

Instala la librería usando pip:

pip install package-events-bus

O usando poetry:

poetry add package-events-bus

🗂️ Estructura del Proyecto

package_events_bus/
├── __init__.py
├── config.py
├── exceptions.py
├── .vscode/settings.json
├── aws/
│   ├── event_bridge_publisher.py
│   └── sqs_dispatcher.py
├── core/
│   ├── __init__.py
│   ├── contracts/
│   │   ├── base_event.py
│   │   ├── deduplication.py
│   │   └── handler.py
│   ├── infrastructure/
│   │   ├── event_bus_publisher.py
│   │   ├── event_serializer.py
│   │   └── redis_deduplication.py
│   └── runtime/
│       └── event_register.py
  • /aws: Implementaciones específicas para servicios de AWS.
    • event_bridge_publisher.py: Publica eventos a AWS EventBridge.
    • sqs_dispatcher.py: Despacha mensajes de colas SQS y procesa eventos.

💡 ¿Cómo usarlo?

📤 Publicar eventos

  1. Crea una instancia de EventBridgePublisher y llama a publish con el evento deseado. Puedes cambiar el mecanismo de failover y la región AWS en cualquier momento usando los métodos set_failover y set_aws_region.
from events_bus.aws.event_bridge_publisher import EventBridgePublisher

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
)
 
# Cambiar el mecanismo de failover en tiempo de ejecución
from events_bus.core.infrastructure.redis_failover import RedisFailover
failover = RedisFailover(url="redis://localhost:6379/0")
event_bridge_publisher.set_failover(failover)

# Cambiar la región de AWS en tiempo de ejecución
event_bridge_publisher.set_aws_region("us-west-2")
  1. Define tu evento heredando de BaseEvent: Reemplaza DisbursementCreatedEvent con el nombre de tu evento, recuerda sobreescribir el método to_dict para serializar los atributos del evento que deseas enviar.
from events_bus.core.contracts.base_event import BaseEvent

@dataclass
class DisbursementCreatedEvent(BaseEvent):
    event_name = 'finkargo.portfolio.1.event.disbursement.created'

    def __init__(self, disbursement):
        super().__init__(event_name=self.event_name)
        self.id_disbursement = disbursement.id_disbursement

    def to_dict(self) -> dict:
        return {
            'id_disbursement': self.id_disbursement
        }

    @classmethod
    def from_dict(cls, event_id: str, occurred_on, attributes: dict) -> "DisbursementCreatedEvent":
        return cls(disbursement_id=attributes.get('id_disbursement'))

event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

🔄 Cambiar failover y región AWS dinámicamente

Puedes modificar el mecanismo de failover y la región AWS en cualquier momento después de crear la instancia:

# Cambiar failover
event_bridge_publisher.set_failover(RedisFailover(url="redis://localhost:6379/0"))

# Cambiar región AWS
event_bridge_publisher.set_aws_region("us-west-2")

🛡️ Uso de Failover para eventos no publicados

Por defecto, la librería utiliza un failover local en memoria (LocalFailover) para almacenar eventos que no se pueden publicar en AWS. Si deseas un mecanismo persistente, puedes usar RedisFailover u otro personalizado.

from events_bus.aws.event_bridge_publisher import EventBridgePublisher
from events_bus.core.infrastructure.redis_failover import RedisFailover

# Configura el failover con Redis (opcional, por defecto es LocalFailover en memoria)
failover = RedisFailover(url="redis://localhost:6379/0")

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
    failover=failover  # Si omites este argumento, usará LocalFailover por defecto
)

# Publica tu evento normalmente
event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

# Para reintentar publicar los eventos almacenados en failover:
event_bridge_publisher.publish_from_failover(total_events=10)
  • Si ocurre un error al publicar el evento en AWS, este se almacena en el failover configurado (por defecto, local en memoria).
  • Puedes reintentar la publicación de los eventos almacenados usando publish_from_failover.

También puedes implementar tu propio mecanismo de failover heredando de BaseFailover.


📥 Consumir eventos

Opción 1: Consumir eventos con múltiples colas por caso de uso

  1. Hereda de AsyncHandler o SyncHandler y sobrescribe el método handle. El método handle recibe el evento como parámetro, recuerda que el evento debe heredar de BaseEvent, y el método from_dict debe estar implementado para poder deserializar el evento.

    Usa AsyncHandler para manejar eventos de forma asíncrona y SyncHandler para eventos sincrónicos.

from events_bus.core import AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event: DisbursementCreatedEvent):
        print(f"Handling event: {event.event_name}")
        print(f"Finished handling event: {event.event_name}")
  1. Registra el handler en el bus de eventos: Usa EventHandlerRegister para registrar el handler y la cola SQS asociada, recuerda que el target debe ser configurado previamente en AWS. Para mas información sobre la configuración de AWS, consulta la documentación oficial.
from events_bus.core import EventHandlerRegister
from events_bus.aws.sqs_dispatcher import SQSDispatcher

dispatcher = SQSDispatcher()
EventHandlerRegister.register_by_queue(
    queue_url='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
    handler=CustomAsyncHandler()
)

Opción 2: Consumir eventos usando una sola cola por microservicio

Si quieres consumir solo una cola SQS por microservicio (y despachar a los handlers según el tipo de evento), registra tus handlers por nombre de evento y usa start_from_one_queue:

from events_bus.aws.sqs_dispatcher import SQSDispatcher
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

dispatcher = SQSDispatcher()

import asyncio

async def main():
    await dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')

asyncio.run(main())
  • Así solo consumes una cola por microservicio, y despachas a los handlers según el tipo de evento recibido.

Opción 3: Usar Mangum para AWS Lambda (SQS/EventBridge)

Si tu microservicio se ejecuta como Lambda y recibe eventos desde SQS o EventBridge, puedes usar la integración con Mangum:

from events_bus.aws.mangum import MangumExtended
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

app = FastAPI()

handler = MangumExtended(app)
  • Registra tus handlers usando EventHandlerRegister.register_handler.
  • Mangum detectará automáticamente si el evento proviene de SQS o EventBridge y despachará al handler correcto.
  • AWS Lambda llamará a handler(event, context) y la librería despachará el evento al handler correspondiente.

⚡ ¿Cómo implementar el dispatcher en FastAPI?

Opción 1: Cola de SQS por caso de uso

Puedes consumir eventos en FastAPI usando múltiples colas SQS por caso de uso. Registra tus handlers y usa start para iniciar el dispatcher.

@asynccontextmanager
async def lifespan(app: FastAPI):
    await dispatcher.start()
    yield
    dispatcher.stop()

app = FastAPI(lifespan=lifespan)

Opción 2: Una cola SQS por microservicio

Puedes consumir eventos usando una sola cola SQS por microservicio en FastAPI utilizando el método start_from_one_queue de SQSDispatcher. Así, tu microservicio solo escucha una cola y despacha a los handlers registrados según el tipo de evento recibido.

import asyncio

dispatcher = SQSDispatcher()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Consumir solo una cola por microservicio
    task = asyncio.create_task(
        dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')
    )
    yield
    dispatcher.stop()
    await task

app = FastAPI(lifespan=lifespan)

🛡️ Opcional: Configuración de Redis para deduplicación

from events_bus.core import RedisDeduplication
from events_bus.aws.sqs_dispatcher import SQSDispatcher

deduplication = RedisDeduplication(
    url='redis://localhost:6379/0',
    ttl=3600,
)

dispatcher = SQSDispatcher(deduplication=deduplication)

Si no deseas usar Redis, puedes heredar de BaseDeduplication e implementar tu propia lógica.


⚙️ Variables por Defecto

from events_bus import CONFIG
CONFIG.set_envs(os.environ, env_file='.env')

Puedes modificar las variables por defecto de la librería usando un archivo .env o configurando las variables de entorno directamente. Las variables configurables son:

Variable Tipo Por defecto Descripción
MAX_NUMBER_OF_MESSAGES int 5 Número máximo de mensajes a recibir por llamada.
WAIT_TIME_SECONDS int 10 Tiempo de espera para recibir mensajes.
VISIBILITY_TIMEOUT int 30 Tiempo de visibilidad del mensaje en la cola.
AWS_CLIENT_URL str None URL del cliente de AWS (útil para LocalStack u otros entornos locales).
AWS_REGION_NAME str us-east-1 Región de AWS.
SLEEP_BETWEEN_MESSAGES_SECONDS float 0.1 Tiempo de espera entre mensajes.
ERROR_SLEEP_SECONDS int 5 Tiempo de espera en caso de error.

ℹ️ Tip: Usa .env para mantener tu configuración fuera del código fuente.


🏷️ Definición de Topics

Los topics deben seguir la convención:

COMPANY.SERVICE.VERSION.MESSAGE_TYPE.RESOURCE_NAME.(EVENT_COMMAND_NAME)
  • COMPANY: Nombre de la empresa (Finkargo para internos).
  • SERVICE: Servicio generador del evento.
  • VERSION: Versión del topic.
  • MESSAGE_TYPE: command o event.
  • RESOURCE_NAME: Entidad relacionada.
  • EVENT_COMMAND_NAME: Verbo en pasado para eventos (created), infinitivo para comandos (create).

Regex de validación:

TOPIC_REGEX = "^[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\.\d+\.(command|event)\.[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+$"

Basado en: Topics Definition | AsyncApi


🛠️ TODO

  • Mejorar el control de excepciones en la publicación de eventos.
  • Agregar más ejemplos de integración con otros frameworks.
  • Documentar casos de uso avanzados.
  • Implementar métricas y logging para los eventos publicados y consumidos.
  • Añadir pruebas unitarias y de integración.
  • Mejorar la documentación en inglés y español.

📝 Buenas Prácticas

  • Usa nombres de eventos claros y consistentes.
  • Implementa deduplicación para evitar procesar eventos repetidos.
  • Mantén tus handlers simples y enfocados en una sola responsabilidad.
  • Documenta tus eventos y handlers.

📄 Licencia

Este proyecto está bajo la licencia MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

package_events_bus-0.1.40.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

package_events_bus-0.1.40-py3-none-any.whl (23.6 kB view details)

Uploaded Python 3

File details

Details for the file package_events_bus-0.1.40.tar.gz.

File metadata

  • Download URL: package_events_bus-0.1.40.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.13.11 Linux/6.11.0-1018-azure

File hashes

Hashes for package_events_bus-0.1.40.tar.gz
Algorithm Hash digest
SHA256 b0d59767113960d6e6873daec47d70ca31019bada396fe2da3f5822b38cd472a
MD5 fb4f825fe7a152f642e130fe8402090b
BLAKE2b-256 a2eca0edb4f7abc26a10544c3ade29c9697cc52fc59f01a4d68b102e45f75917

See more details on using hashes here.

File details

Details for the file package_events_bus-0.1.40-py3-none-any.whl.

File metadata

  • Download URL: package_events_bus-0.1.40-py3-none-any.whl
  • Upload date:
  • Size: 23.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.13.11 Linux/6.11.0-1018-azure

File hashes

Hashes for package_events_bus-0.1.40-py3-none-any.whl
Algorithm Hash digest
SHA256 7692cadc8dfb738470c1fb7a31123cbe45b2bbc24be792800da9b5c6a5a898f9
MD5 01d258d75b5993c312d88462d9ad8b9d
BLAKE2b-256 e5f393048ef61d5c2e0726585a1dda4d49a618ff8b460c421e8a1862c5b60189

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page