Event-driven messaging via exceptions - Ultra-lightweight event system
Project description
โก EventX
Event-driven messaging via exceptions - Because
raise Event()is simpler than async hell
Stop wrestling with async/await complexity and message broker configurations. EventX transforms Python exceptions into an elegant, high-performance event system.
๐ฏ The Problem
Python's event systems are unnecessarily complex:
# Traditional async hell ๐ต
import asyncio
from aioredis import Redis
async def setup():
redis = await Redis.from_url("redis://localhost")
pubsub = redis.pubsub()
await pubsub.subscribe("events")
async for message in pubsub.listen():
await process_event(message) # More async...
# Celery complexity ๐คฏ
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def handle_event(data):
# Heavy setup for simple events
โจ The EventX Solution
Natural exception-based events:
from eventx import EventBus, Event
bus = EventBus()
@bus.on("user_signup")
def send_welcome_email(event):
print(f"๐ง Welcome {event.data['email']}!")
raise Event("email_sent", {"user_id": event.data["id"]})
@bus.on("email_sent")
def track_analytics(event):
print(f"๐ Tracking email for user {event.data['user_id']}")
def signup_user():
user = {"id": 123, "email": "alice@example.com"}
raise Event("user_signup", user)
bus.dispatch(signup_user)
# ๐ง Welcome alice@example.com!
# ๐ Tracking email for user 123
๐ Installation & Quick Start
Install
pip install eventx
30-Second Demo
from eventx import EventBus, Event
# Create event bus
bus = EventBus()
# Listen for events (multiple handlers allowed)
@bus.on("order_placed")
def process_payment(event):
order = event.data
print(f"๐ณ Processing ${order['total']} payment")
raise Event("payment_processed", order) # Chain events naturally
@bus.on("order_placed")
def update_inventory(event):
print(f"๐ฆ Updating inventory for {len(event.data['items'])} items")
@bus.on("payment_processed")
def send_receipt(event):
print(f"๐ง Receipt sent for order #{event.data['id']}")
# Trigger the workflow
def place_order():
order = {"id": "12345", "total": 99.99, "items": ["laptop"]}
raise Event("order_placed", order)
bus.dispatch(place_order)
# Output:
# ๐ณ Processing $99.99 payment
# ๐ฆ Updating inventory for 1 items
# ๐ง Receipt sent for order #12345
๐ Why Choose EventX?
| Feature | EventX | Celery | asyncio | Traditional |
|---|---|---|---|---|
| Setup | pip install eventx |
Redis + config | Complex | Manual |
| Syntax | raise Event() |
@task |
async/await |
callback() |
| Dependencies | 0 | Redis/RabbitMQ | Built-in | Varies |
| Learning | 2 minutes | Hours | Days | Medium |
| Performance | 40K+ events/sec | Network bound | High | High |
| Debugging | Stack traces | Distributed | Complex | Manual |
| Error handling | Native | Complex | Try/catch hell | Manual |
๐ Real-World Examples
Web API Events
from flask import Flask, request
from eventx import EventBus, Event
app = Flask(__name__)
bus = EventBus()
@bus.on("user_registered")
def send_welcome_email(event):
user = event.data
print(f"๐ง Sending welcome email to {user['email']}")
@bus.on("user_registered")
def setup_user_profile(event):
user = event.data
print(f"๐ค Creating profile for {user['name']}")
raise Event("profile_created", {"user_id": user["id"]})
@bus.on("profile_created")
def start_onboarding(event):
print(f"๐ฏ Starting onboarding for user {event.data['user_id']}")
@app.route("/register", methods=["POST"])
def register():
user_data = request.json
user = {"id": 123, "name": user_data["name"], "email": user_data["email"]}
# Single line triggers entire workflow
bus.dispatch(lambda: raise Event("user_registered", user))
return {"status": "success"}
Data Processing Pipeline
from eventx import EventBus, Event
pipeline = EventBus("data_pipeline")
@pipeline.on("data_received")
def validate_data(event):
data = event.data
print(f"๐ Validating {len(data)} records")
if len(data) > 0:
raise Event("data_valid", data)
else:
raise Event("data_invalid", {"reason": "empty_dataset"})
@pipeline.on("data_valid")
def transform_data(event):
data = event.data
print(f"๐ Transforming {len(data)} records")
transformed = [{"processed": True, **item} for item in data]
raise Event("data_transformed", transformed)
@pipeline.on("data_transformed")
def save_results(event):
data = event.data
print(f"๐พ Saving {len(data)} transformed records")
raise Event("processing_complete", {"count": len(data)})
@pipeline.on("processing_complete")
def notify_completion(event):
count = event.data["count"]
print(f"โ
Pipeline complete! Processed {count} records")
@pipeline.on("data_invalid")
def handle_invalid_data(event):
reason = event.data["reason"]
print(f"โ Data processing failed: {reason}")
# Run the pipeline
def process_dataset(data):
pipeline.dispatch(lambda: raise Event("data_received", data))
# Test with valid data
process_dataset([{"name": "Alice"}, {"name": "Bob"}])
# Test with invalid data
process_dataset([])
Microservices Communication
from eventx import EventBus, Event
# Shared bus across services
services = EventBus("microservices")
# User Service
@services.on("create_user_account")
def user_service(event):
user_data = event.data
user_id = f"user_{hash(user_data['email']) % 10000}"
print(f"๐ค User Service: Created account {user_id}")
raise Event("user_account_created", {
"user_id": user_id,
"email": user_data["email"],
"plan": user_data.get("plan", "free")
})
# Email Service
@services.on("user_account_created")
def email_service(event):
user = event.data
print(f"๐ง Email Service: Welcome sequence for {user['email']}")
raise Event("welcome_email_sent", {"user_id": user["user_id"]})
# Analytics Service
@services.on("user_account_created")
def analytics_service(event):
user = event.data
print(f"๐ Analytics: New {user['plan']} user signup")
raise Event("signup_tracked", {"plan": user["plan"]})
# Billing Service (conditional)
@services.on("user_account_created")
def billing_service(event):
user = event.data
if user["plan"] != "free":
print(f"๐ณ Billing: Setting up {user['plan']} subscription")
raise Event("billing_configured", {"user_id": user["user_id"]})
# Trigger microservices workflow
def register_user(email, plan="free"):
services.dispatch(lambda: raise Event("create_user_account", {
"email": email,
"plan": plan
}))
# Test microservices
register_user("alice@example.com", "premium")
๐๏ธ Advanced Features
Global API (Convenience)
from eventx import on, dispatch, emit, Event
# Use global bus for simple cases
@on("quick_event")
def quick_handler(event):
print(f"โก Quick: {event.data}")
# Method 1: Exception style
dispatch(lambda: raise Event("quick_event", "hello"))
# Method 2: Direct emit
emit("quick_event", "world")
Event History & Debugging
bus = EventBus()
# Process some events
bus.emit("test1", "data1")
bus.emit("test2", "data2")
bus.emit("test3", "data3")
# Check what happened
history = bus.get_event_history(limit=5)
for event in history:
print(f"๐ {event.name} at {event.source_info}")
# Get comprehensive stats
stats = bus.get_stats()
print(f"๐ Events: {stats['events_raised']}, Handlers: {stats['handlers_executed']}")
Wildcard Handlers
bus = EventBus()
# Listen to ALL events
@bus.on_any
def log_everything(event):
print(f"๐ Logger: {event.name} with {type(event.data).__name__}")
# Specific handlers still work
@bus.on("important_event")
def handle_important(event):
print(f"๐ฅ Important: {event.data}")
bus.emit("user_login", {"user": "alice"})
bus.emit("important_event", {"priority": "high"})
# Output:
# ๐ Logger: user_login with dict
# ๐ Logger: important_event with dict
# ๐ฅ Important: {'priority': 'high'}
Error Handling Best Practices
bus = EventBus()
@bus.on("risky_operation")
def handle_risky(event):
try:
# Potentially failing operation
result = risky_computation(event.data)
raise Event("operation_success", {"result": result})
except ValueError as e:
# Convert errors to events for clean handling
raise Event("operation_failed", {"error": str(e), "retry": True})
except Exception as e:
# Unexpected errors
raise Event("operation_error", {"error": str(e), "retry": False})
@bus.on("operation_failed")
def retry_operation(event):
if event.data["retry"]:
print(f"๐ Retrying after: {event.data['error']}")
@bus.on("operation_error")
def log_critical_error(event):
print(f"๐จ Critical error: {event.data['error']}")
โก Performance
EventX is surprisingly fast for an exception-based system:
- ๐ฅ 40,000+ events/second (local processing)
- โก Sub-millisecond event propagation
- ๐พ Minimal memory footprint (~100KB)
- ๐ Zero startup time (no external dependencies)
Perfect for high-frequency events in web applications, data pipelines, and real-time systems.
๐ก๏ธ Production Ready
Thread Safety
import threading
from eventx import EventBus, Event
bus = EventBus()
@bus.on("concurrent_event")
def thread_safe_handler(event):
# Handlers are automatically thread-safe
print(f"๐งต Thread {threading.current_thread().name}: {event.data}")
# Multiple threads can safely emit events
for i in range(10):
threading.Thread(
target=lambda i=i: bus.emit("concurrent_event", f"data_{i}")
).start()
Error Recovery
bus = EventBus(max_cascade=50) # Prevent infinite loops
@bus.on("handler_error")
def monitor_errors(event):
error_info = event.data
print(f"๐ Error detected in {error_info['handler_name']}: {error_info['error_message']}")
# Automatic error recovery strategies
if error_info["error_type"] == "ConnectionError":
raise Event("retry_with_backoff", error_info)
# EventX automatically emits "handler_error" events when handlers fail
๐ฎ Upcoming Features
ETL Data Pipeline
from eventx import EventBus, Event
etl = EventBus("data_pipeline")
@etl.on("data_extract")
def extract_data(event):
source = event.data["source"]
print(f"๐ฅ Extracting from {source}")
# Simulate extraction
raw_data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
raise Event("data_extracted", {"source": source, "data": raw_data})
@etl.on("data_extracted")
def transform_data(event):
data = event.data["data"]
print(f"๐ Transforming {len(data)} records")
transformed = [{"user_id": item["id"], "username": item["name"]} for item in data]
raise Event("data_transformed", transformed)
@etl.on("data_transformed")
def load_data(event):
data = event.data
print(f"๐พ Loading {len(data)} records to database")
raise Event("etl_complete", {"count": len(data)})
# Run pipeline
etl.dispatch(lambda: raise Event("data_extract", {"source": "users.csv"}))
AI/ML Training Events
from eventx import EventBus, Event
ml = EventBus("ml_pipeline")
@ml.on("training_start")
def prepare_data(event):
config = event.data
print(f"๐ค Preparing {config['model_type']} training")
raise Event("data_ready", {"model": config["model_type"], "samples": 10000})
@ml.on("data_ready")
def train_model(event):
model = event.data["model"]
samples = event.data["samples"]
print(f"๐ง Training {model} with {samples} samples")
# Simulate training
accuracy = 0.95
raise Event("training_complete", {"model": model, "accuracy": accuracy})
@ml.on("training_complete")
def deploy_model(event):
model = event.data["model"]
accuracy = event.data["accuracy"]
print(f"๐ Deploying {model} (accuracy: {accuracy:.2%})")
# Start ML pipeline
ml.dispatch(lambda: raise Event("training_start", {"model_type": "classifier"}))
v0.2.0 - Enhanced (Coming Soon)
- Async handler support:
@bus.async_on("event") - Event middleware: Transform events before handling
- Event filtering:
@bus.on("event", filter=lambda e: e.data > 100) - Performance optimizations: Even faster event dispatch
- Enhanced debugging: Visual event flow tracing
v0.3.0 - Distributed (Q1 2025)
- Relay system: Events across network boundaries
- Event persistence: Redis/PostgreSQL backends
- Load balancing: Intelligent event routing
- Monitoring dashboard: Real-time event visualization
๐งช Testing
EventX includes comprehensive test utilities:
from eventx import EventBus, Event
def test_order_workflow():
bus = EventBus()
events_received = []
@bus.on("order_placed")
def capture_event(event):
events_received.append(event.name)
raise Event("order_processed", event.data)
@bus.on("order_processed")
def capture_processed(event):
events_received.append(event.name)
# Execute test
bus.dispatch(lambda: raise Event("order_placed", {"id": "test"}))
# Verify
assert events_received == ["order_placed", "order_processed"]
print("โ
Test passed!")
test_order_workflow()
๐ค Contributing
EventX is open for contributions!
Current priorities:
- Performance optimizations
- Async/await support
- Documentation improvements
- Real-world usage examples
Contact: nexusstudio100@gmail.com
๐ Comparison with Alternatives
vs asyncio Events
# asyncio (complex)
import asyncio
async def setup_asyncio_events():
event = asyncio.Event()
await event.wait() # Complex async management
# EventX (simple)
@bus.on("ready")
def handle_ready(event):
print("Ready!") # No async required
vs Traditional Callbacks
# Traditional callbacks (verbose)
class EventEmitter:
def __init__(self):
self.callbacks = {}
def on(self, event, callback):
if event not in self.callbacks:
self.callbacks[event] = []
self.callbacks[event].append(callback)
def emit(self, event, data):
for callback in self.callbacks.get(event, []):
callback(data)
# EventX (elegant)
@bus.on("event")
def handler(event): pass
bus.dispatch(lambda: raise Event("event", "data"))
๐ฃ๏ธ Migration Guide
From Celery
# Before (Celery)
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task
def process_data(data):
# Heavy setup
# After (EventX)
from eventx import EventBus, Event
bus = EventBus()
@bus.on("process_data")
def process_data(event):
# Same logic, simpler setup
From asyncio Events
# Before (asyncio)
import asyncio
async def wait_for_event():
event = asyncio.Event()
await event.wait()
# After (EventX)
@bus.on("ready")
def handle_ready(event):
# No async needed
๐ก Best Practices
Event Naming
# Good naming conventions
"user_created" # past_tense for completed actions
"data_processing" # present_continuous for ongoing operations
"validate_input" # imperative for commands
"system_error" # category_type for classifications
Event Data Structure
# Consistent event payload structure
raise Event("user_action", {
"user_id": "123",
"action_type": "click",
"target": "signup_button",
"metadata": {
"page": "homepage",
"session_id": "abc123",
"timestamp": time.time()
}
})
Error Boundaries
# Isolate risky operations
@bus.on("external_api_call")
def safe_api_call(event):
try:
result = external_api.call(event.data)
raise Event("api_success", result)
except Exception as e:
raise Event("api_error", {"error": str(e), "original_data": event.data})
๐ท๏ธ Version History
v0.1.1 (Current)
- ๐ก๏ธ Improved stability: Better error handling and validation
- ๐ Enhanced performance: Optimized event dispatch
- ๐ Better documentation: Comprehensive examples and API docs
- ๐งช Testing utilities: Built-in testing support
- ๐ง Thread safety: Robust concurrent operation
v0.1.0
- ๐ Initial release
- โก Basic event system via exceptions
- ๐ Event cascading support
๐ Support & Community
- ๐ง Email: nexusstudio100@gmail.com
- ๐ Issues: GitHub Issues
- ๐ Documentation: Full docs
- ๐ก Examples: Comprehensive examples
๐ License
MIT License - see LICENSE for details.
๐ฅ Transform your Python into event-driven architecture today!
pip install eventx
โก Powered by Nexus Studio
Making event-driven Python effortless and elegant
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventx-0.1.1.tar.gz.
File metadata
- Download URL: eventx-0.1.1.tar.gz
- Upload date:
- Size: 14.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cefcf0a47979db0bd0818a240e0d62121047cd02abfcd115b12a7c8ad7dd93f4
|
|
| MD5 |
7dcce98fdebeef85eda3e3d3bb182742
|
|
| BLAKE2b-256 |
18f5f3645c9491821bb43c20a22d8288964e5834df9f1297394814706d9fb601
|
File details
Details for the file eventx-0.1.1-py3-none-any.whl.
File metadata
- Download URL: eventx-0.1.1-py3-none-any.whl
- Upload date:
- Size: 12.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
535079b2efe6eeec6746c3821ed43b881288879926db27916d0c00d5f9339bef
|
|
| MD5 |
61263c4b09b1c3ca4c5f3f6bcacad11e
|
|
| BLAKE2b-256 |
a9f737a5da7ab9eb737d003b776198df40bb63ae4602871b57e23e690f141812
|