Skip to main content

Libreria para gestion de eventos en python

Project description

Papa Events

A Python library for event-driven communication between services with a declarative interface.

Features

  • Declarative Event Handling: Define event handlers with simple decorators
  • Flexible Event Routing: Support for exact event names and wildcard patterns
  • Built-in Retry Mechanism: Automatic retries with configurable limits
  • Dead Letter Queue: Failed events are moved to DLQ after max retries
  • Failover Support: Database backup when RabbitMQ is unavailable
  • Pydantic Integration: Type-safe event payloads with Pydantic models
  • OpenTelemetry Support: Built-in tracing for event processing

Installation

pip install papa-events

Quick Start

Basic Setup

from fastapi import FastAPI
from contextlib import asynccontextmanager
from papa_events import PapaApp

# Initialize the event application
event_app = PapaApp(broker_uri="amqp://user:password@localhost/")

@asynccontextmanager
async def lifespan(_: FastAPI):
    await event_app.start()
    yield
    await event_app.stop()

app = FastAPI(lifespan=lifespan)

Event Consumption

Define your event models and handlers:

from pydantic import BaseModel

class UserCreated(BaseModel):
    name: str
    email: str
    age: int

# Handler with event name parameter (optional)
@event_app.on_event(['user.created'], use_case_name="welcome_email")
async def send_welcome_email(event_name: str, event: UserCreated):
    print(f"Processing {event_name} for user: {event.name}")

# Handler without event name parameter
@event_app.on_event(['user.created'], use_case_name="analytics_track")
async def track_user_creation(event: UserCreated):
    print(f"Tracking user: {event.name}")

# Multiple events with wildcards
@event_app.on_event(['user.*', 'company.deleted'], use_case_name="activity_log")
async def log_activity(event_name: str, event: UserCreated):
    print(f"Activity: {event_name} - {event.name}")

Event Publication

Publishing Single Events

# From anywhere in your application
user_event = UserCreated(name="John Doe", email="john@example.com", age=30)
await event_app.new_event("user.created", user_event)

Publishing Events from Handlers

class EmailSent(BaseModel):
    destination: str
    subject: str

@event_app.on_event(['user.created'], use_case_name="send_welcome_email")
async def send_welcome_email(event: UserCreated):
    # Process the event
    print(f"Sending welcome email to {event.email}")
    
    # Return new events to publish
    return [{
        "name": "email.sent",
        "payload": EmailSent(
            destination=event.email,
            subject="Welcome!"
        )
    }]

Advanced Configuration

Retry Configuration

# Custom retry settings
@event_app.on_event(
    ['user.created'], 
    use_case_name="critical_process",
    retries=10  # Default is 5
)
async def critical_handler(event: UserCreated):
    # Your critical business logic
    pass

Failover Setup

# Initialize with failover database support
event_app = PapaApp(
    broker_uri="amqp://user:password@localhost/",
    failover_uri="postgresql://user:password@localhost/dbname"
)

Advanced Initialization

import logging

# Custom logger and configuration
logger = logging.getLogger("my_events")
event_app = PapaApp(
    broker_uri="amqp://user:password@localhost/",
    max_jobs=50,  # Maximum concurrent event processors
    logger=logger
)

Event Flow

The system creates dedicated queues for each use case. Events are automatically routed to the appropriate queues based on routing keys.

Processing Flow

  1. Event Received: Event arrives at the main exchange
  2. Queue Binding: Event is routed to relevant use case queues
  3. Processing: Handler processes the event
  4. Success: Event is acknowledged and removed from queue
  5. Failure: Event is sent to retry queue with exponential backoff
  6. Max Retries Exceeded: Event is moved to Dead Letter Queue (DLQ)

Queue Structure

  • use_case_name: Main processing queue
  • use_case_name.retry: Retry queue with delayed delivery
  • use_case_name.dlq: Dead letter queue for manual intervention

Error Handling

Automatic Retries

Events that fail processing are automatically retried with configurable limits. The retry mechanism includes:

  • Exponential backoff between retries
  • Configurable maximum retry attempts
  • Automatic dead letter queue routing after max retries

Dead Letter Queue

Events that exceed the maximum retry count are moved to the DLQ where they can be:

  • Manually inspected
  • Reprocessed after fixing the underlying issue
  • Archived for audit purposes

Development

Running Tests

# Install development dependencies
uv sync --group dev

# Run all tests
uv run pytest tests/ -v

# Run specific test file
uv run pytest tests/test_events.py -v

Code Quality

# Format code
uvx ruff format

# Check code style
uvx ruff check

# Type checking
uvx ty check

API Reference

PapaApp Class

PapaApp(
    broker_uri: str,
    failover_uri: str | None = None,
    max_jobs: int = 20,
    logger: logging.Logger | None = None
)

Methods

  • on_event(event_names: list[str], use_case_name: str, retries: int = 5): Event handler decorator
  • start(): Initialize connections and start processing
  • stop(): Gracefully shutdown event processing
  • new_event(event_name: str, payload: bytes | BaseModel): Publish new event

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests for new functionality
  5. Ensure all tests pass
  6. Submit a pull request

License

This project is licensed under the terms of the LICENSE file included in the distribution.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

papa_events-0.107.0.tar.gz (12.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

papa_events-0.107.0-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file papa_events-0.107.0.tar.gz.

File metadata

  • Download URL: papa_events-0.107.0.tar.gz
  • Upload date:
  • Size: 12.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.3

File hashes

Hashes for papa_events-0.107.0.tar.gz
Algorithm Hash digest
SHA256 e59926272c8730f575f4defbbb55908fcac14c5481308c335317ca6128ae1aa2
MD5 483b0205b64dbdd6153db36babf37366
BLAKE2b-256 13a3fef95be920980b1e8f713e3ef0e99ce976adb5f2bc3c4965e9a269299717

See more details on using hashes here.

File details

Details for the file papa_events-0.107.0-py3-none-any.whl.

File metadata

File hashes

Hashes for papa_events-0.107.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5a8072aab25961a877477d0a18becf92e4c7a2af89e49d7e7f0d4405e2cc0c31
MD5 98942f416a6aac0a2d503a9951ae6e47
BLAKE2b-256 9259bb5fef8c49b9620ed55722f7bc9839299d6ad5a13b72608ec4ee50027ff8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page