Skip to main content

Una librería para construir microservicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola y se despacha automáticamente mediante handlers tipados.

Project description

📦 Package Events Bus

Una librería para construir servicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola.


� Tabla de Contenidos


📝 Descripción

Esta librería permite construir microservicios desacoplados en Python, facilitando la publicación y consumo de eventos a través de AWS EventBridge y SQS. Cada caso de uso maneja su propia cola, permitiendo escalabilidad y mantenibilidad.


⚙️ Instalación

Instala la librería usando pip:

pip install package-events-bus

O usando poetry:

poetry add package-events-bus

🗂️ Estructura del Proyecto

package_events_bus/
├── __init__.py
├── config.py
├── exceptions.py
├── .vscode/settings.json
├── aws/
│   ├── event_bridge_publisher.py
│   └── sqs_dispatcher.py
├── core/
│   ├── __init__.py
│   ├── contracts/
│   │   ├── base_event.py
│   │   ├── deduplication.py
│   │   └── handler.py
│   ├── infrastructure/
│   │   ├── event_bus_publisher.py
│   │   ├── event_serializer.py
│   │   └── redis_deduplication.py
│   └── runtime/
│       └── event_register.py
  • /aws: Implementaciones específicas para servicios de AWS.
    • event_bridge_publisher.py: Publica eventos a AWS EventBridge.
    • sqs_dispatcher.py: Despacha mensajes de colas SQS y procesa eventos.

💡 ¿Cómo usarlo?

📤 Publicar eventos

  1. Crea una instancia de EventBridgePublisher y llama a publish con el evento deseado.
from events_bus.aws.event_bridge_publisher import EventBridgePublisher

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
)
  1. Define tu evento heredando de BaseEvent: Reemplaza DisbursementCreatedEvent con el nombre de tu evento, recuerda sobreescribir el método to_dict para serializar los atributos del evento que deseas enviar.
from events_bus.core.contracts.base_event import BaseEvent

@dataclass
class DisbursementCreatedEvent(BaseEvent):
    event_name = 'finkargo.portfolio.1.event.disbursement.created'

    def __init__(self, disbursement):
        super().__init__(event_name=self.event_name)
        self.id_disbursement = disbursement.id_disbursement

    def to_dict(self) -> dict:
        return {
            'id_disbursement': self.id_disbursement
        }

    @classmethod
    def from_dict(cls, event_id: str, occurred_on, attributes: dict) -> "DisbursementCreatedEvent":
        return cls(disbursement_id=attributes.get('id_disbursement'))

event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

📥 Consumir eventos

Opción 1: Consumir eventos con múltiples colas por caso de uso

  1. Hereda de AsyncHandler o SyncHandler y sobrescribe el método handle. El método handle recibe el evento como parámetro, recuerda que el evento debe heredar de BaseEvent, y el método from_dict debe estar implementado para poder deserializar el evento.

    Usa AsyncHandler para manejar eventos de forma asíncrona y SyncHandler para eventos sincrónicos.

from events_bus.core import AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event: DisbursementCreatedEvent):
        print(f"Handling event: {event.event_name}")
        print(f"Finished handling event: {event.event_name}")
  1. Registra el handler en el bus de eventos: Usa EventHandlerRegister para registrar el handler y la cola SQS asociada, recuerda que el target debe ser configurado previamente en AWS. Para mas información sobre la configuración de AWS, consulta la documentación oficial.
from events_bus.core import EventHandlerRegister
from events_bus.aws.sqs_dispatcher import SQSDispatcher

dispatcher = SQSDispatcher()
EventHandlerRegister.register_by_queue(
    queue_url='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
    handler=CustomAsyncHandler()
)

Opción 2: Consumir eventos usando una sola cola por microservicio

Si quieres consumir solo una cola SQS por microservicio (y despachar a los handlers según el tipo de evento), registra tus handlers por nombre de evento y usa start_from_one_queue:

from events_bus.aws.sqs_dispatcher import SQSDispatcher
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

dispatcher = SQSDispatcher()

import asyncio

async def main():
    await dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')

asyncio.run(main())
  • Así solo consumes una cola por microservicio, y despachas a los handlers según el tipo de evento recibido.

Opción 3: Usar Mangum para AWS Lambda (SQS/EventBridge)

Si tu microservicio se ejecuta como Lambda y recibe eventos desde SQS o EventBridge, puedes usar la integración con Mangum:

from events_bus.aws.mangum import MangumExtended
from events_bus.core import EventHandlerRegister, AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, event):
        print(f"Handling event: {event.event_name}")

# Registra el handler para el tipo de evento
EventHandlerRegister.register_handler(
    event_name='finkargo.portfolio.1.event.disbursement.created',
    handler=CustomAsyncHandler()
)

handler = MangumExtended()
  • Registra tus handlers usando EventHandlerRegister.register_handler.
  • Mangum detectará automáticamente si el evento proviene de SQS o EventBridge y despachará al handler correcto.
  • AWS Lambda llamará a handler(event, context) y la librería despachará el evento al handler correspondiente.

⚡ ¿Cómo implementar el dispatcher en FastAPI?

Opción 1: Cola de SQS por caso de uso

Puedes consumir eventos en FastAPI usando múltiples colas SQS por caso de uso. Registra tus handlers y usa start para iniciar el dispatcher.

@asynccontextmanager
async def lifespan(app: FastAPI):
    await dispatcher.start()
    yield
    dispatcher.stop()

app = FastAPI(lifespan=lifespan)

Opción 2: Una cola SQS por microservicio

Puedes consumir eventos usando una sola cola SQS por microservicio en FastAPI utilizando el método start_from_one_queue de SQSDispatcher. Así, tu microservicio solo escucha una cola y despacha a los handlers registrados según el tipo de evento recibido.

import asyncio

dispatcher = SQSDispatcher()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Consumir solo una cola por microservicio
    task = asyncio.create_task(
        dispatcher.start_from_one_queue('https://sqs.us-east-1.amazonaws.com/123456789012/my-queue')
    )
    yield
    dispatcher.stop()
    await task

app = FastAPI(lifespan=lifespan)

🛡️ Opcional: Configuración de Redis para deduplicación

from events_bus.core import RedisDeduplication
from events_bus.aws.sqs_dispatcher import SQSDispatcher

deduplication = RedisDeduplication(
    url='redis://localhost:6379/0',
    ttl=3600,
)

dispatcher = SQSDispatcher(deduplication=deduplication)

Si no deseas usar Redis, puedes heredar de BaseDeduplication e implementar tu propia lógica.


⚙️ Variables por Defecto

from events_bus import CONFIG
CONFIG.set_envs(os.environ, env_file='.env')

Puedes modificar las variables por defecto de la librería usando un archivo .env o configurando las variables de entorno directamente. Las variables configurables son:

Variable Tipo Por defecto Descripción
MAX_NUMBER_OF_MESSAGES int 5 Número máximo de mensajes a recibir por llamada.
WAIT_TIME_SECONDS int 10 Tiempo de espera para recibir mensajes.
VISIBILITY_TIMEOUT int 30 Tiempo de visibilidad del mensaje en la cola.
AWS_CLIENT_URL str None URL del cliente de AWS (útil para LocalStack u otros entornos locales).
AWS_REGION_NAME str us-east-1 Región de AWS.
SLEEP_BETWEEN_MESSAGES_SECONDS float 0.1 Tiempo de espera entre mensajes.
ERROR_SLEEP_SECONDS int 5 Tiempo de espera en caso de error.

ℹ️ Tip: Usa .env para mantener tu configuración fuera del código fuente.


🏷️ Definición de Topics

Los topics deben seguir la convención:

COMPANY.SERVICE.VERSION.MESSAGE_TYPE.RESOURCE_NAME.(EVENT_COMMAND_NAME)
  • COMPANY: Nombre de la empresa (Finkargo para internos).
  • SERVICE: Servicio generador del evento.
  • VERSION: Versión del topic.
  • MESSAGE_TYPE: command o event.
  • RESOURCE_NAME: Entidad relacionada.
  • EVENT_COMMAND_NAME: Verbo en pasado para eventos (created), infinitivo para comandos (create).

Regex de validación:

TOPIC_REGEX = "^[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\.\d+\.(command|event)\.[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+$"

Basado en: Topics Definition | AsyncApi


🛠️ TODO

  • Mejorar el control de excepciones en la publicación de eventos.
  • Agregar más ejemplos de integración con otros frameworks.
  • Documentar casos de uso avanzados.
  • Implementar métricas y logging para los eventos publicados y consumidos.
  • Añadir pruebas unitarias y de integración.
  • Mejorar la documentación en inglés y español.

📝 Buenas Prácticas

  • Usa nombres de eventos claros y consistentes.
  • Implementa deduplicación para evitar procesar eventos repetidos.
  • Mantén tus handlers simples y enfocados en una sola responsabilidad.
  • Documenta tus eventos y handlers.

📄 Licencia

Este proyecto está bajo la licencia MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

package_events_bus-0.1.29.tar.gz (16.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

package_events_bus-0.1.29-py3-none-any.whl (18.9 kB view details)

Uploaded Python 3

File details

Details for the file package_events_bus-0.1.29.tar.gz.

File metadata

  • Download URL: package_events_bus-0.1.29.tar.gz
  • Upload date:
  • Size: 16.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.3 Linux/6.11.0-1015-azure

File hashes

Hashes for package_events_bus-0.1.29.tar.gz
Algorithm Hash digest
SHA256 f223c7119243ba4dfe075ba555a6f74ded09d95530e37c49992945cac3a607f0
MD5 63dfc8196c13f49b137fc664376711da
BLAKE2b-256 b384de275043c3715a2400af3d6e0f9f79b1163f84f69366a76df825e8866ca0

See more details on using hashes here.

File details

Details for the file package_events_bus-0.1.29-py3-none-any.whl.

File metadata

  • Download URL: package_events_bus-0.1.29-py3-none-any.whl
  • Upload date:
  • Size: 18.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.3 Linux/6.11.0-1015-azure

File hashes

Hashes for package_events_bus-0.1.29-py3-none-any.whl
Algorithm Hash digest
SHA256 5a935a95aca676e2dc932d554fe218361b7d0c65bb3ac6d62966ade887f54882
MD5 187968e0ad4439ceb9f1ce4bf37f6e75
BLAKE2b-256 a0a776f0151ad5f2b3c984dc569e8f3dfcd62c0227e636d3568c8daff56ac06e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page