Event-driven system with acknowledgment tracking using Pydantic and ReactiveX
Project description
Rx-Events
A flexible event-driven system with acknowledgment tracking using Pydantic and ReactiveX.
Features
- ✅ Extensible Event Classes - Create custom event types with Pydantic BaseModel
- ✅ Acknowledgment Tracking - Built-in support for event processing acknowledgments
- ✅ Reactive Streams - Powered by ReactiveX (RxPY) for reactive event processing
- ✅ Type Safety - Full type hints and Pydantic validation
- ✅ Event Bus Pattern - Centralized event management with channels
- ✅ Timeout Handling - Automatic timeout detection and handling
Installation
# Install from local directory (development)
pip install -e /path/to/rx-events
# Or install from source
cd /path/to/rx-events
pip install .
Quick Start
Basic Usage
from rx_events import EventBus, BaseEvent, EventStatus, EventAck
from pydantic import Field
import time
import uuid as uuid_lib
from typing import Dict, Any, Optional
# Create a custom event
class MyEvent(BaseEvent):
user_id: str
data: Dict[str, Any]
uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
timestamp: float = Field(default_factory=time.time)
correlation_id: Optional[str] = None
@property
def event_type(self) -> str:
return "my_event"
def to_payload(self) -> Dict[str, Any]:
return {"user_id": self.user_id, "data": self.data}
# Create event bus and channel
bus = EventBus()
bus.create_channel("my_channel", timeout_seconds=180)
# Publish event
event = MyEvent(user_id="123", data={"key": "value"})
await bus.publish("my_channel", event)
# Subscribe to events
channel = bus.get_channel("my_channel")
channel.get_event_stream().subscribe(
on_next=lambda event: print(f"Received: {event.event_type}"),
on_error=lambda error: print(f"Error: {error}")
)
# Send acknowledgment
ack = EventAck.create(
event_uuid=event.uuid,
status=EventStatus.COMPLETED,
result={"processed": True}
)
await bus.acknowledge("my_channel", ack)
Custom Event with Validation
from pydantic import Field, field_validator
class UserEvent(BaseEvent):
user_id: str
email: str
uuid: str = Field(default_factory=lambda: str(uuid_lib.uuid4()))
timestamp: float = Field(default_factory=time.time)
correlation_id: Optional[str] = None
@field_validator('email')
@classmethod
def validate_email(cls, v: str) -> str:
if '@' not in v:
raise ValueError("Invalid email")
return v
@property
def event_type(self) -> str:
return "user_event"
def to_payload(self) -> Dict[str, Any]:
return {"user_id": self.user_id, "email": self.email}
Testing
Run tests with pytest:
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run with coverage
pytest --cov=rx_events --cov-report=html
API Reference
Core Classes
BaseEvent- Abstract base class for custom eventsEvent- Standard event class for general-purpose eventsBaseEventAck- Abstract base class for custom acknowledgmentsEventAck- Standard acknowledgment classEventStatus- Enum for event statuses (PENDING, PROCESSING, COMPLETED, FAILED, TIMEOUT)EventBus- Central event bus for managing channelsAckChannel- Channel for event publishing and acknowledgment tracking
EventBus Methods
create_channel(name, allow_duplicates=False, timeout_seconds=300)- Create a new channelget_channel(name)- Get an existing channelpublish(channel_name, event)- Publish an event to a channelacknowledge(channel_name, ack)- Send an acknowledgment
AckChannel Methods
publish(event)- Publish an eventacknowledge(ack)- Receive an acknowledgmentget_event_stream()- Get ReactiveX Observable for eventsget_ack_stream()- Get ReactiveX Observable for acknowledgmentsget_stats()- Get channel statisticsget_event_status(event_uuid)- Get status of a specific event
Examples
See the tests/ directory for comprehensive examples.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
rx_events-0.1.0.tar.gz
(33.7 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rx_events-0.1.0.tar.gz.
File metadata
- Download URL: rx_events-0.1.0.tar.gz
- Upload date:
- Size: 33.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6efd62399e0d922b2e1ef4d7a56a1bcacdadbaf813e91bea4b691a847341902f
|
|
| MD5 |
a4ea9a8a185e95d8ff968cd4b8ca9e90
|
|
| BLAKE2b-256 |
3c9885aa6cf566bd56b1dc365392ef177dfee9c793ab5b88df33a5afcf6521b6
|
File details
Details for the file rx_events-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rx_events-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4d4fe7e15064ffa0911259fc5a8bfbee60c59f26a62b236220fe6e6b19cd1bed
|
|
| MD5 |
dcc320d2667b2f342d08fa49ff08bb6f
|
|
| BLAKE2b-256 |
7cb84ce5b3bfdec2ef0f6ef43b7e9a981a1cf4af1c50c061e495b1bb976c555b
|