Event Hub shared components for Python services
Project description
FlexWurx Event Hub Python Library
A Python library for building event-driven microservices with Azure Event Hubs and Service Bus.
Development Process
Publishing New Features
- Make changes to your code
- Test locally -
uv run pytest(if you have tests) - Bump version in pyproject.toml (1.0.0 → 1.0.1)
- Commit and push to GitHub
- GitHub Actions automatically builds and publishes
- Services update with
uv add flexwurx-event-hub-python@latest
Manual Publishing
uv build
uv publish
Installation
# Using UV (recommended)
uv add flexwurx-event-hub-python
# Upgrading
uv sync --upgrade-package flexwurx-event-hub-python
# Using pip
pip install flexwurx-event-hub-python
Quick Start
import asyncio
from event_hub_shared import (
EventEmitter, EventConsumer, ServiceBusScheduler,
EventRegistry, BaseHandler, EventFilterUtil
)
# Setup
emitter = EventEmitter('your-namespace.servicebus.windows.net')
consumer = EventConsumer(
'your-namespace.servicebus.windows.net',
'storageaccount',
'checkpoints-container'
)
scheduler = ServiceBusScheduler('your-servicebus-namespace', emitter)
AI Service Implementation Example
import re
from event_hub_shared import BaseHandler, EventRegistry, EventFilterUtil
class AITaskHandler(BaseHandler):
"""Handle AI processing tasks."""
async def handle(self, event, context):
event_type = event.body.get("event_type", "")
domain, action, entity = event_type.split('.')
if action == "process" and entity == "ai":
await self.process_ai_request(event.body)
elif action == "generate" and entity == "response":
await self.generate_ai_response(event.body)
async def process_ai_request(self, data):
# Your AI processing logic
response = await self.run_ai_model(data.get("input"))
# Emit response event
await self.emit_event("sms-events", "sms.response.generated", {
"phone_number": data.get("phone_number"),
"response": response,
"original_message_id": data.get("message_id")
})
# Setup registry
registry = EventRegistry()
ai_handler = AITaskHandler(emitter)
registry.register_pattern(re.compile(r'^sms\.(process|generate)\..*'), ai_handler)
# Event filtering for AI service
ai_filter = EventFilterUtil([
re.compile(r'^sms\.process\.ai),
re.compile(r'^task\.execute\.ai.*')
])
# Main event processor
async def process_events(events, context):
relevant_events = ai_filter.filter_events(events)
for event in relevant_events:
try:
await registry.route(event, context)
await context.update_checkpoint(event)
except Exception as error:
print(f"Error processing event: {error}")
await context.update_checkpoint(event)
# Start AI service
async def start_ai_service():
client = await consumer.create_consumer('sms-events', 'ai-service')
async with client:
await client.receive(on_event=process_events)
if __name__ == "__main__":
asyncio.run(start_ai_service())
SMS Processing Workflow
# AI Service processes incoming SMS
class SMSAIHandler(BaseHandler):
async def handle(self, event, context):
if event.body.get("event_type") == "sms.process.ai":
phone_number = event.body.get("phone_number")
message = event.body.get("message")
# Process with AI
ai_response = await self.generate_response(message)
# Check if response should be delayed
if ai_response.get("delay_minutes"):
# Schedule delayed response
correlation_id = await self.scheduler.schedule_task({
"event_type": "sms.send.tenant",
"data": {
"phone_number": phone_number,
"message": ai_response["text"]
},
"execute_at": datetime.now() + timedelta(minutes=ai_response["delay_minutes"])
})
else:
# Send immediately
await self.emit_event("sms-events", "sms.send.tenant", {
"phone_number": phone_number,
"message": ai_response["text"]
})
Environment Configuration
# Event Hub
EVENT_HUB_NAMESPACE=your-namespace.servicebus.windows.net
STORAGE_ACCOUNT=yourstorageaccount
CHECKPOINT_CONTAINER=eventhub-checkpoints
# Service Bus
SERVICE_BUS_NAMESPACE=your-servicebus-namespace
# Azure credentials
AZURE_TENANT_ID=your-tenant-id
AZURE_CLIENT_ID=your-client-id
AZURE_CLIENT_SECRET=your-client-secret
Development Setup
# Clone repository
git clone https://github.com/flexwurx/event-hub-python.git
cd event-hub-python
# Install with UV
uv sync
# Install development dependencies
uv sync --group dev
# Run tests
uv run pytest
# Format code
uv run black src/
# Type checking
uv run mypy src/
Key Features
✅ Event Hub Integration - Full async support with blob checkpointing
✅ Service Bus Scheduling - Schedule tasks with correlation ID cancellation
✅ Dead Letter Handling - Failed events go to Service Bus for analysis
✅ Type Safety - Pydantic models for all event data
✅ Async/Await - Modern Python async patterns
✅ Event Filtering - Service-specific pattern matching
✅ Auto Partitioning - Intelligent event distribution
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flexwurx_event_hub_python-1.0.3.tar.gz.
File metadata
- Download URL: flexwurx_event_hub_python-1.0.3.tar.gz
- Upload date:
- Size: 10.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
955b45ca52cc874b8962828ebca67849441086a153bfb2f10f1efff1cf00d9ee
|
|
| MD5 |
65940eb3a64ed4f840bb5df37ad9059f
|
|
| BLAKE2b-256 |
93f7bebc7cfdd7c10212bf1363a5908e4fc2bb4b5f66952760708397591e6430
|
File details
Details for the file flexwurx_event_hub_python-1.0.3-py3-none-any.whl.
File metadata
- Download URL: flexwurx_event_hub_python-1.0.3-py3-none-any.whl
- Upload date:
- Size: 16.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
98f43ee07764f5bd4c53edaedf8027790d9f26d3befcb3992d5779e25d65a570
|
|
| MD5 |
a8dbbe09836ba754218a53dce4a04306
|
|
| BLAKE2b-256 |
349a15648cca6eb5e6d98bbfee6c066cf564054557c1ce85a3899f30707afd73
|