Skip to main content

Una librería para construir microservicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola y se despacha automáticamente mediante handlers tipados.

Project description

📦 Package Events Bus

Una librería para construir servicios desacoplados en Python usando AWS EventBridge y SQS, donde cada caso de uso maneja su propia cola.


� Tabla de Contenidos


📝 Descripción

Esta librería permite construir microservicios desacoplados en Python, facilitando la publicación y consumo de eventos a través de AWS EventBridge y SQS. Cada caso de uso maneja su propia cola, permitiendo escalabilidad y mantenibilidad.


⚙️ Instalación

Instala la librería usando pip:

pip install package-events-bus

O usando poetry:

poetry add package-events-bus

🗂️ Estructura del Proyecto

package_events_bus/
├── __init__.py
├── config.py
├── exceptions.py
├── .vscode/settings.json
├── aws/
│   ├── event_bridge_publisher.py
│   └── sqs_dispatcher.py
├── core/
│   ├── __init__.py
│   ├── contracts/
│   │   ├── base_event.py
│   │   ├── deduplication.py
│   │   └── handler.py
│   ├── infrastructure/
│   │   ├── event_bus_publisher.py
│   │   ├── event_serializer.py
│   │   └── redis_deduplication.py
│   └── runtime/
│       └── event_register.py
  • /aws: Implementaciones específicas para servicios de AWS.
    • event_bridge_publisher.py: Publica eventos a AWS EventBridge.
    • sqs_dispatcher.py: Despacha mensajes de colas SQS y procesa eventos.

💡 ¿Cómo usarlo?

📤 Publicar eventos

  1. Crea una instancia de EventBridgePublisher y llama a publish con el evento deseado.
from events_bus.aws.event_bridge_publisher import EventBridgePublisher

event_bridge_publisher = EventBridgePublisher(
    bus_name="finkargo-events",
    source="my.service",
)
  1. Define tu evento heredando de BaseEvent: Reemplaza DisbursementCreatedEvent con el nombre de tu evento, recuerda sobreescribir el método to_dict para serializar los atributos del evento que deseas enviar.
from events_bus.core.contracts.base_event import BaseEvent

@dataclass
class DisbursementCreatedEvent(BaseEvent):
    event_name = 'finkargo.portfolio.1.event.disbursement.created'

    def __init__(self, disbursement):
        super().__init__(event_name=self.event_name)
        self.id_disbursement = disbursement.id_disbursement

    def to_dict(self) -> dict:
        return {
            'id_disbursement': self.id_disbursement
        }

    @classmethod
    def from_dict(cls, event_id: str, occurred_on, attributes: dict) -> "DisbursementCreatedEvent":
        return cls(disbursement_id=attributes.get('id_disbursement'))

event_bridge_publisher.publish(event=DisbursementCreatedEvent(instance))

📥 Consumir eventos

  1. Hereda de AsyncHandler o SyncHandler y sobrescribe el método handle. El método handle recibe el evento como parámetro, recuerda que el evento debe heredar de BaseEvent, y el método from_dict debe estar implementado para poder deserializar el evento.

    Usa AsyncHandler para manejar eventos de forma asíncrona y SyncHandler para eventos sincrónicos.

from events_bus.core import AsyncHandler

class CustomAsyncHandler(AsyncHandler):
    async def handle(self, data: DisbursementCreatedEvent):
        print(f"Handling event: {data.event_name}")
        print(f"Finished handling event: {data.event_name}")
  1. Registra el handler en el bus de eventos: Usa EventHandlerRegister para registrar el handler y la cola SQS asociada, recuerda que el target debe ser configurado previamente en AWS. Para mas información sobre la configuración de AWS, consulta la documentación oficial.
from events_bus.core import EventHandlerRegister
from events_bus.aws.sqs_dispatcher import SQSDispatcher

dispatcher = SQSDispatcher()
EventHandlerRegister.register(
    queue_url='https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
    handler=CustomAsyncHandler()
)

⚡ ¿Cómo implementar el dispatcher en FastAPI?

@asynccontextmanager
async def lifespan(app: FastAPI):
    await dispatcher.start()
    yield
    dispatcher.stop()

app = FastAPI(lifespan=lifespan)

🛡️ Opcional: Configuración de Redis para deduplicación

from events_bus.core import RedisDeduplication
from events_bus.aws.sqs_dispatcher import SQSDispatcher

deduplication = RedisDeduplication(
    url='redis://localhost:6379/0',
    ttl=3600,
)

dispatcher = SQSDispatcher(deduplication=deduplication)

Si no deseas usar Redis, puedes heredar de BaseDeduplication e implementar tu propia lógica.


⚙️ Variables por Defecto

from events_bus import CONFIG
CONFIG.set_envs(os.environ, env_file='.env')

Puedes modificar las variables por defecto de la librería usando un archivo .env o configurando las variables de entorno directamente. Las variables configurables son:

Variable Tipo Por defecto Descripción
MAX_NUMBER_OF_MESSAGES int 5 Número máximo de mensajes a recibir por llamada.
WAIT_TIME_SECONDS int 10 Tiempo de espera para recibir mensajes.
VISIBILITY_TIMEOUT int 30 Tiempo de visibilidad del mensaje en la cola.
AWS_CLIENT_URL str None URL del cliente de AWS (útil para LocalStack u otros entornos locales).
AWS_REGION_NAME str us-east-1 Región de AWS.
SLEEP_BETWEEN_MESSAGES_SECONDS float 0.1 Tiempo de espera entre mensajes.
ERROR_SLEEP_SECONDS int 5 Tiempo de espera en caso de error.

ℹ️ Tip: Usa .env para mantener tu configuración fuera del código fuente.


🏷️ Definición de Topics

Los topics deben seguir la convención:

COMPANY.SERVICE.VERSION.MESSAGE_TYPE.RESOURCE_NAME.(EVENT_COMMAND_NAME)
  • COMPANY: Nombre de la empresa (Finkargo para internos).
  • SERVICE: Servicio generador del evento.
  • VERSION: Versión del topic.
  • MESSAGE_TYPE: command o event.
  • RESOURCE_NAME: Entidad relacionada.
  • EVENT_COMMAND_NAME: Verbo en pasado para eventos (created), infinitivo para comandos (create).

Regex de validación:

TOPIC_REGEX = "^[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+\.\d+\.(command|event)\.[a-zA-Z0-9_]+\.[a-zA-Z0-9_]+$"

Basado en: Topics Definition | AsyncApi


🛠️ TODO

  • Mejorar el control de excepciones en la publicación de eventos.
  • Agregar más ejemplos de integración con otros frameworks.
  • Documentar casos de uso avanzados.
  • Implementar métricas y logging para los eventos publicados y consumidos.
  • Añadir pruebas unitarias y de integración.
  • Mejorar la documentación en inglés y español.

📝 Buenas Prácticas

  • Usa nombres de eventos claros y consistentes.
  • Implementa deduplicación para evitar procesar eventos repetidos.
  • Mantén tus handlers simples y enfocados en una sola responsabilidad.
  • Documenta tus eventos y handlers.

📄 Licencia

Este proyecto está bajo la licencia MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

package_events_bus-0.1.0.tar.gz (12.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

package_events_bus-0.1.0-py3-none-any.whl (15.1 kB view details)

Uploaded Python 3

File details

Details for the file package_events_bus-0.1.0.tar.gz.

File metadata

  • Download URL: package_events_bus-0.1.0.tar.gz
  • Upload date:
  • Size: 12.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.9.22

File hashes

Hashes for package_events_bus-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c8ac6e4ef0bdf0eb139096993647b846a19ffc7ad74f948cc04331130a240aea
MD5 0a04810826f11b1f3a4bc9f9ecd76c29
BLAKE2b-256 59d61769ffb08737ebea8d670a004520a27f4db2b3dc4395f7b3aad80da1a147

See more details on using hashes here.

File details

Details for the file package_events_bus-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for package_events_bus-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 eee20fdffb00eb666d7f7ed4d126bafe647e49720fbb339a94cb4d058cc7a977
MD5 121ad065045ce1ffe2030d718d137a77
BLAKE2b-256 6edd78838bb8e3275b97a9a0f8efdfb94d2a0d991475c02fee43794615c7bcfa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page