Event-driven system with acknowledgment tracking using Pydantic and ReactiveX
Project description
Rx-Events
A flexible event-driven system with acknowledgment tracking using Pydantic and ReactiveX.
Features
- ✅ Extensible Event Classes - Create custom event types with Pydantic BaseModel
- ✅ Acknowledgment Tracking - Built-in support for event processing acknowledgments
- ✅ Reactive Streams - Powered by ReactiveX (RxPY) for reactive event processing
- ✅ Type Safety - Full type hints and Pydantic validation
- ✅ Event Bus Pattern - Centralized event management with channels
- ✅ Timeout Handling - Automatic timeout detection and handling
Installation
# Install from local directory (development)
pip install -e /path/to/rx-events
# Or install from source
cd /path/to/rx-events
pip install .
Quick Start
Basic Usage
from rx_events import EventBus, BaseEvent, EventStatus, EventAck
from pydantic import Field
import time
import uuid as uuid_lib
from typing import Dict, Any, Optional
# Create a custom event
class MyEvent(BaseEvent):
user_id: str
data: Dict[str, Any]
uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
timestamp: float = Field(default_factory=time.time)
correlation_id: Optional[str] = None
@property
def event_type(self) -> str:
return "my_event"
def to_payload(self) -> Dict[str, Any]:
return {"user_id": self.user_id, "data": self.data}
# Create event bus and channel
bus = EventBus()
bus.create_ack_channel("my_channel", timeout_seconds=180)
# Publish event
event = MyEvent(user_id="123", data={"key": "value"})
await bus.publish("my_channel", event)
# Subscribe to events
channel = bus.get_channel("my_channel")
channel.get_event_stream().subscribe(
on_next=lambda event: print(f"Received: {event.event_type}"),
on_error=lambda error: print(f"Error: {error}")
)
# Send acknowledgment
ack = EventAck.create(
event_uuid=event.uuid,
status=EventStatus.COMPLETED,
result={"processed": True}
)
await bus.acknowledge("my_channel", ack)
Custom Event with Validation
from pydantic import Field, field_validator
class UserEvent(BaseEvent):
user_id: str
email: str
uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
timestamp: float = Field(default_factory=time.time)
correlation_id: Optional[str] = None
@field_validator('email')
@classmethod
def validate_email(cls, v: str) -> str:
if '@' not in v:
raise ValueError("Invalid email")
return v
@property
def event_type(self) -> str:
return "user_event"
def to_payload(self) -> Dict[str, Any]:
return {"user_id": self.user_id, "email": self.email}
Testing
Run tests with pytest:
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run with coverage
pytest --cov=rx_events --cov-report=html
API Reference
Core Classes
BaseEvent- Abstract base class for custom eventsEvent- Standard event class for general-purpose eventsBaseEventAck- Abstract base class for custom acknowledgmentsEventAck- Standard acknowledgment classEventStatus- Enum for event statuses (PENDING, PROCESSING, COMPLETED, FAILED, TIMEOUT)EventBus- Central event bus for managing channelsAckChannel- Channel for event publishing and acknowledgment tracking
EventBus Methods
create_ack_channel(name, allow_duplicates=False, timeout_seconds=300)- Create a new acknowledgment channelget_channel(name)- Get an existing channelpublish(channel_name, event)- Publish an event to a channelacknowledge(channel_name, ack)- Send an acknowledgment
AckChannel Methods
publish(event)- Publish an eventacknowledge(ack)- Receive an acknowledgmentget_event_stream()- Get ReactiveX Observable for eventsget_ack_stream()- Get ReactiveX Observable for acknowledgmentsget_stats()- Get channel statisticsget_event_status(event_uuid)- Get status of a specific event
Examples
See the tests/ directory for comprehensive examples.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
rx_events-0.1.1.tar.gz
(111.1 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rx_events-0.1.1.tar.gz.
File metadata
- Download URL: rx_events-0.1.1.tar.gz
- Upload date:
- Size: 111.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9cc542f64cc0e7ae81b234a866ac4abd99dab6609cf6f7ecf3828fb681ec1801
|
|
| MD5 |
a61cadbd9a6cebefd1eb4fe6ae8871a7
|
|
| BLAKE2b-256 |
8cbee43e3ba9054f7b8159ec330075d81fdc3e0971f8172447cea91f5a06788c
|
File details
Details for the file rx_events-0.1.1-py3-none-any.whl.
File metadata
- Download URL: rx_events-0.1.1-py3-none-any.whl
- Upload date:
- Size: 8.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
efb01a6b611291616a05a43c153f38db7fb23f24963fb8c641bd0be8e2a541cc
|
|
| MD5 |
2cd7a116600f63085c00d82e915455a6
|
|
| BLAKE2b-256 |
cb31f59af9f29e6c42c2e9397e71a8ba3a39ec4c5f3684a710c152b6baaa3ea5
|