Skip to main content

Event-driven system with acknowledgment tracking using Pydantic and ReactiveX

Project description

Rx-Events

A flexible event-driven system with acknowledgment tracking using Pydantic and ReactiveX.

Features

  • Extensible Event Classes - Create custom event types with Pydantic BaseModel
  • Acknowledgment Tracking - Built-in support for event processing acknowledgments
  • Reactive Streams - Powered by ReactiveX (RxPY) for reactive event processing
  • Type Safety - Full type hints and Pydantic validation
  • Event Bus Pattern - Centralized event management with channels
  • Timeout Handling - Automatic timeout detection and handling

Installation

# Install from local directory (development)
pip install -e /path/to/rx-events

# Or install from source
cd /path/to/rx-events
pip install .

Quick Start

Basic Usage

from rx_events import EventBus, BaseEvent, EventStatus, EventAck
from pydantic import Field
import time
import uuid as uuid_lib
from typing import Dict, Any, Optional

# Create a custom event
class MyEvent(BaseEvent):
    user_id: str
    data: Dict[str, Any]
    
    uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
    timestamp: float = Field(default_factory=time.time)
    correlation_id: Optional[str] = None
    
    @property
    def event_type(self) -> str:
        return "my_event"
    
    def to_payload(self) -> Dict[str, Any]:
        return {"user_id": self.user_id, "data": self.data}

# Create event bus and channel
bus = EventBus()
bus.create_channel("my_channel", timeout_seconds=180)

# Publish event
event = MyEvent(user_id="123", data={"key": "value"})
await bus.publish("my_channel", event)

# Subscribe to events
channel = bus.get_channel("my_channel")
channel.get_event_stream().subscribe(
    on_next=lambda event: print(f"Received: {event.event_type}"),
    on_error=lambda error: print(f"Error: {error}")
)

# Send acknowledgment
ack = EventAck.create(
    event_uuid=event.uuid,
    status=EventStatus.COMPLETED,
    result={"processed": True}
)
await bus.acknowledge("my_channel", ack)

Custom Event with Validation

from pydantic import Field, field_validator

class UserEvent(BaseEvent):
    user_id: str
    email: str
    
    uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
    timestamp: float = Field(default_factory=time.time)
    correlation_id: Optional[str] = None
    
    @field_validator('email')
    @classmethod
    def validate_email(cls, v: str) -> str:
        if '@' not in v:
            raise ValueError("Invalid email")
        return v
    
    @property
    def event_type(self) -> str:
        return "user_event"
    
    def to_payload(self) -> Dict[str, Any]:
        return {"user_id": self.user_id, "email": self.email}

Testing

Run tests with pytest:

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run with coverage
pytest --cov=rx_events --cov-report=html

API Reference

Core Classes

  • BaseEvent - Abstract base class for custom events
  • Event - Standard event class for general-purpose events
  • BaseEventAck - Abstract base class for custom acknowledgments
  • EventAck - Standard acknowledgment class
  • EventStatus - Enum for event statuses (PENDING, PROCESSING, COMPLETED, FAILED, TIMEOUT)
  • EventBus - Central event bus for managing channels
  • AckChannel - Channel for event publishing and acknowledgment tracking

EventBus Methods

  • create_channel(name, allow_duplicates=False, timeout_seconds=300) - Create a new channel
  • get_channel(name) - Get an existing channel
  • publish(channel_name, event) - Publish an event to a channel
  • acknowledge(channel_name, ack) - Send an acknowledgment

AckChannel Methods

  • publish(event) - Publish an event
  • acknowledge(ack) - Receive an acknowledgment
  • get_event_stream() - Get ReactiveX Observable for events
  • get_ack_stream() - Get ReactiveX Observable for acknowledgments
  • get_stats() - Get channel statistics
  • get_event_status(event_uuid) - Get status of a specific event

Examples

See the tests/ directory for comprehensive examples.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rx_events-0.1.0.tar.gz (33.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rx_events-0.1.0-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file rx_events-0.1.0.tar.gz.

File metadata

  • Download URL: rx_events-0.1.0.tar.gz
  • Upload date:
  • Size: 33.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for rx_events-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6efd62399e0d922b2e1ef4d7a56a1bcacdadbaf813e91bea4b691a847341902f
MD5 a4ea9a8a185e95d8ff968cd4b8ca9e90
BLAKE2b-256 3c9885aa6cf566bd56b1dc365392ef177dfee9c793ab5b88df33a5afcf6521b6

See more details on using hashes here.

File details

Details for the file rx_events-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rx_events-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 7.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for rx_events-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4d4fe7e15064ffa0911259fc5a8bfbee60c59f26a62b236220fe6e6b19cd1bed
MD5 dcc320d2667b2f342d08fa49ff08bb6f
BLAKE2b-256 7cb84ce5b3bfdec2ef0f6ef43b7e9a981a1cf4af1c50c061e495b1bb976c555b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page