Skip to main content

Event-driven system with acknowledgment tracking using Pydantic and ReactiveX

Project description

Rx-Events

A flexible event-driven system with acknowledgment tracking using Pydantic and ReactiveX.

Features

  • Extensible Event Classes - Create custom event types with Pydantic BaseModel
  • Acknowledgment Tracking - Built-in support for event processing acknowledgments
  • Reactive Streams - Powered by ReactiveX (RxPY) for reactive event processing
  • Type Safety - Full type hints and Pydantic validation
  • Event Bus Pattern - Centralized event management with channels
  • Timeout Handling - Automatic timeout detection and handling

Installation

# Install from local directory (development)
pip install -e /path/to/rx-events

# Or install from source
cd /path/to/rx-events
pip install .

Quick Start

Basic Usage

from rx_events import EventBus, BaseEvent, EventStatus, EventAck
from pydantic import Field
import time
import uuid as uuid_lib
from typing import Dict, Any, Optional

# Create a custom event
class MyEvent(BaseEvent):
    user_id: str
    data: Dict[str, Any]
    
    uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
    timestamp: float = Field(default_factory=time.time)
    correlation_id: Optional[str] = None
    
    @property
    def event_type(self) -> str:
        return "my_event"
    
    def to_payload(self) -> Dict[str, Any]:
        return {"user_id": self.user_id, "data": self.data}

# Create event bus and channel
bus = EventBus()
bus.create_ack_channel("my_channel", timeout_seconds=180)

# Publish event
event = MyEvent(user_id="123", data={"key": "value"})
await bus.publish("my_channel", event)

# Subscribe to events
channel = bus.get_channel("my_channel")
channel.get_event_stream().subscribe(
    on_next=lambda event: print(f"Received: {event.event_type}"),
    on_error=lambda error: print(f"Error: {error}")
)

# Send acknowledgment
ack = EventAck.create(
    event_uuid=event.uuid,
    status=EventStatus.COMPLETED,
    result={"processed": True}
)
await bus.acknowledge("my_channel", ack)

Custom Event with Validation

from pydantic import Field, field_validator

class UserEvent(BaseEvent):
    user_id: str
    email: str
    
    uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
    timestamp: float = Field(default_factory=time.time)
    correlation_id: Optional[str] = None
    
    @field_validator('email')
    @classmethod
    def validate_email(cls, v: str) -> str:
        if '@' not in v:
            raise ValueError("Invalid email")
        return v
    
    @property
    def event_type(self) -> str:
        return "user_event"
    
    def to_payload(self) -> Dict[str, Any]:
        return {"user_id": self.user_id, "email": self.email}

Testing

Run tests with pytest:

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run with coverage
pytest --cov=rx_events --cov-report=html

API Reference

Core Classes

  • BaseEvent - Abstract base class for custom events
  • Event - Standard event class for general-purpose events
  • BaseEventAck - Abstract base class for custom acknowledgments
  • EventAck - Standard acknowledgment class
  • EventStatus - Enum for event statuses (PENDING, PROCESSING, COMPLETED, FAILED, TIMEOUT)
  • EventBus - Central event bus for managing channels
  • AckChannel - Channel for event publishing and acknowledgment tracking

EventBus Methods

  • create_ack_channel(name, allow_duplicates=False, timeout_seconds=300) - Create a new acknowledgment channel
  • get_channel(name) - Get an existing channel
  • publish(channel_name, event) - Publish an event to a channel
  • acknowledge(channel_name, ack) - Send an acknowledgment

AckChannel Methods

  • publish(event) - Publish an event
  • acknowledge(ack) - Receive an acknowledgment
  • get_event_stream() - Get ReactiveX Observable for events
  • get_ack_stream() - Get ReactiveX Observable for acknowledgments
  • get_stats() - Get channel statistics
  • get_event_status(event_uuid) - Get status of a specific event

Examples

See the tests/ directory for comprehensive examples.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rx_events-0.1.1.tar.gz (111.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rx_events-0.1.1-py3-none-any.whl (8.6 kB view details)

Uploaded Python 3

File details

Details for the file rx_events-0.1.1.tar.gz.

File metadata

  • Download URL: rx_events-0.1.1.tar.gz
  • Upload date:
  • Size: 111.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for rx_events-0.1.1.tar.gz
Algorithm Hash digest
SHA256 9cc542f64cc0e7ae81b234a866ac4abd99dab6609cf6f7ecf3828fb681ec1801
MD5 a61cadbd9a6cebefd1eb4fe6ae8871a7
BLAKE2b-256 8cbee43e3ba9054f7b8159ec330075d81fdc3e0971f8172447cea91f5a06788c

See more details on using hashes here.

File details

Details for the file rx_events-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: rx_events-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 8.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for rx_events-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 efb01a6b611291616a05a43c153f38db7fb23f24963fb8c641bd0be8e2a541cc
MD5 2cd7a116600f63085c00d82e915455a6
BLAKE2b-256 cb31f59af9f29e6c42c2e9397e71a8ba3a39ec4c5f3684a710c152b6baaa3ea5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page