Skip to main content

A reliability-first event processing runtime for Python microservices.

Project description

Evora

A production-grade event processing runtime with strict reliability guarantees.

Python 3.11+ License: MIT Status: Alpha

๐Ÿ“š Documentation | Examples | Contributing


๐ŸŽฏ What is Evora?

Evora is not another Kafka wrapper. It's a complete event processing runtime that enforces reliability patterns at the framework level.

The Problem

Most event-driven systems fail in production because:

  • โŒ Developers forget to implement idempotency
  • โŒ Retry logic is inconsistent across services
  • โŒ Failed messages disappear into the void
  • โŒ Consumer crashes leave messages stuck forever
  • โŒ Schema changes break everything silently
  • โŒ No one knows what "exactly-once" actually means

The Solution

Evora makes reliability impossible to ignore:

  • โœ… Strict mode by default - Idempotency and versioning are mandatory
  • โœ… Explicit error classification - RetryableError vs FatalError vs ContractError
  • โœ… Durable retry queues - Crash-safe exponential backoff
  • โœ… Poison message detection - Automatic recovery from stuck processing
  • โœ… Structured DLQ - Full forensics for every failure
  • โœ… Self-healing consumers - Automatic reclaim of idle messages

๐Ÿ—๏ธ Architecture

Evora consists of three layers:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚           Application Layer                 โ”‚
โ”‚  โ€ข Event decoding/validation                โ”‚
โ”‚  โ€ข Idempotency enforcement                  โ”‚
โ”‚  โ€ข Error classification                     โ”‚
โ”‚  โ€ข Handler dispatch                         โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                   โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚           Broker Layer                      โ”‚
โ”‚  โ€ข Redis Streams (production)               โ”‚
โ”‚  โ€ข Durable retry scheduler                  โ”‚
โ”‚  โ€ข Poison message detector                  โ”‚
โ”‚  โ€ข Consumer group coordination              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                   โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚           Storage Layer (Redis)             โ”‚
โ”‚  โ€ข Event streams                            โ”‚
โ”‚  โ€ข Retry delay queues (ZSET)                โ”‚
โ”‚  โ€ข Idempotency store (SET)                  โ”‚
โ”‚  โ€ข Dead letter queue                        โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key Components:

  • evora.core: Event contracts, envelopes, registry
  • evora.app: Application runtime, handler registry, @subscribe decorator
  • evora.brokers: Broker implementations (Redis, Memory)
  • evora.runtime: Retry policies, execution logic
  • evora.idempotency: Deduplication interfaces and Redis store
  • evora.errors: Error classification (Retryable/Fatal/Contract)
  • evora.schema: Schema governance tools and CLI
  • evora.observability: Telemetry hooks (OpenTelemetry ready)

๐Ÿš€ Quick Start

Installation

pip install redis pydantic anyio

Your First Event Handler

from evora.app import App, subscribe
from evora.brokers.redis_streams import RedisStreamsBroker
from evora.idempotency import IdempotencyPolicy
from evora.idempotency_redis import RedisIdempotencyStore
from evora.core import Event
from pydantic import BaseModel
import redis.asyncio as redis

# 1. Define your event
class UserRegisteredEvent(Event):
    __version__ = 1  # Explicit versioning required
    
    class Data(BaseModel):
        user_id: int
        email: str
        name: str
    
    data: Data
    
    @classmethod
    def event_type(cls) -> str:
        return "users.registered"

# 2. Define your handler
@subscribe(
    UserRegisteredEvent,
    retry="exponential",
    max_attempts=5,
    dlq=True,
    idempotency=IdempotencyPolicy(mode="event_id", ttl_seconds=86400),
)
async def send_welcome_email(event: UserRegisteredEvent, ctx):
    """
    This handler will:
    - Run exactly once (idempotency)
    - Retry up to 5 times on transient failures
    - Go to DLQ if all retries fail
    - Survive consumer crashes (poison detection)
    """
    await email_service.send(
        to=event.data.email,
        template="welcome",
        data={"name": event.data.name},
    )

# 3. Setup the application
async def main():
    r = redis.Redis(host="localhost", port=6379, decode_responses=False)
    
    broker = RedisStreamsBroker(client=r, group_id="email-service")
    idempotency = RedisIdempotencyStore(client=r)
    
    app = App(
        broker=broker,
        source="email-service",
        idempotency_store=idempotency,
        strict=True,  # Enforces best practices
    )
    
    app.add_handler(send_welcome_email)
    
    await app.run()  # Start consuming

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Publishing Events

async def register_user(user_data):
    # Your business logic
    user = await db.create_user(user_data)
    
    # Publish event
    event = UserRegisteredEvent(
        data=UserRegisteredEvent.Data(
            user_id=user.id,
            email=user.email,
            name=user.name,
        )
    )
    
    await app.publish(event, key=f"user-{user.id}")

That's it! You now have:

  • โœ… Guaranteed exactly-once processing
  • โœ… Automatic retry with exponential backoff
  • โœ… Poison message detection
  • โœ… Structured DLQ for failures

๐Ÿ’Ž What Evora Offers

1. Strict Reliability Enforcement

Problem: Developers often forget critical reliability patterns.

Solution: Evora makes them mandatory.

# โŒ This won't compile in strict mode
@subscribe(MyEvent)  # Missing idempotency policy!
async def handler(event, ctx):
    pass

# โœ… This is required
@subscribe(MyEvent, idempotency=IdempotencyPolicy(...))
async def handler(event, ctx):
    pass

Result: No production incidents from forgotten patterns.


2. Intelligent Error Classification

Problem: Most systems treat all errors the same.

Solution: Explicit error types drive different behaviors.

from evora.errors import RetryableError, FatalError, ContractError

@subscribe(OrderEvent, idempotency=IdempotencyPolicy(...))
async def process_order(event, ctx):
    try:
        await payment_service.charge(event.data.order_id)
    
    except PaymentTimeoutError:
        # Transient โ†’ Retry with exponential backoff
        raise RetryableError("Payment service timeout")
    
    except InvalidCardError:
        # Business rule violation โ†’ DLQ immediately, no retry
        raise FatalError("Invalid payment method")
    
    except MalformedEventError:
        # Bad schema โ†’ DLQ, alert developers
        raise ContractError("Invalid event structure")

Result:

  • Transient failures retry automatically
  • Permanent failures go to DLQ immediately
  • No wasted retries on unretryable errors

3. Durable Retry Queues

Problem: In-memory retries are lost on crash.

Solution: Redis ZSET-based delay queue.

Normal Processing:
  XREADGROUP โ†’ Handler โ†’ Success โ†’ XACK

Retryable Failure:
  XREADGROUP โ†’ Handler โ†’ RetryableError
    โ†“
  schedule_retry():
    โ€ข Compute backoff: 1s, 2s, 4s, 8s...
    โ€ข ZADD retry queue (scored by due time)
    โ€ข XACK original message
    โ†“
  Background Scheduler:
    โ€ข Every 500ms: ZRANGEBYSCORE (find due)
    โ€ข XADD back to stream (attempt + 1)
    โ€ข ZREM from retry queue

Result:

  • Retries survive crashes
  • Exponential backoff (500ms โ†’ 30s)
  • No blocking the main consumer loop

4. Poison Message Detection

Problem: Crashed consumers leave messages stuck in PEL forever.

Solution: Automatic detection and recovery.

Background Poison Checker (every 10s):
  
  1. Scan XPENDING for idle messages
  2. For each idle message:
     
     if idle_time > 60s AND delivery_count < 10:
       โ†’ XCLAIM (reclaim to active consumer)
     
     elif delivery_count >= 10:
       โ†’ Route to DLQ with metadata:
          โ€ข reason: "poison_message"
          โ€ข delivery_count: 10
          โ€ข idle_ms: 65000
          โ€ข original_channel: "orders.events"
          โ€ข original_msg_id: "1234567890-0"
       โ†’ XACK (clear from PEL)

Result:

  • Consumer crashes don't block processing
  • Messages can't get stuck forever
  • Self-healing consumer groups
  • Full forensics in DLQ

5. Guaranteed Idempotency

Problem: Duplicate processing causes data corruption.

Solution: Built-in deduplication by event ID.

@subscribe(
    PaymentEvent,
    idempotency=IdempotencyPolicy(
        mode="event_id",      # Dedupe by event.id
        ttl_seconds=86400,    # 24 hours
    ),
)
async def charge_customer(event, ctx):
    # This will run EXACTLY ONCE per event ID
    # Even if:
    # - Message is redelivered
    # - Consumer crashes and restarts
    # - Multiple consumers process same partition
    await payment_gateway.charge(event.data.amount)

Storage:

Redis SET:
  Key: evora:idempotency:{handler}:{event_id}
  Value: "processed"
  TTL: 86400 seconds

Result: Financial-grade exactly-once guarantees.


6. Structured Dead Letter Queue

Problem: Failed messages disappear with no context.

Solution: DLQ with full forensics.

{
  "v": "<original event bytes>",
  "reason": "poison_message",
  "delivery_count": "10",
  "idle_ms": "65000",
  "original_channel": "users.events",
  "original_msg_id": "1234567890-0",
  "error_type": "RetryableError",
  "failed_handler": "email_service.send_welcome_email",
  "timestamp": "2026-03-01T10:30:00Z"
}

Result:

  • Full debugging context
  • Easy to replay after fixes
  • Operational visibility

7. Consumer Group Coordination

Problem: Manual offset management is error-prone.

Solution: Redis Streams consumer groups.

# Service A
broker = RedisStreamsBroker(
    client=r,
    group_id="email-service",  # Consumer group
)

# Service B
broker = RedisStreamsBroker(
    client=r,
    group_id="analytics-service",  # Different group
)

# Both receive same events independently

Result:

  • Multiple services can subscribe to same events
  • Horizontal scaling (add more consumers)
  • Automatic load balancing
  • PEL-based crash recovery

8. Typed Event Contracts

Problem: Runtime schema mismatches cause crashes.

Solution: Pydantic-based validation + versioning.

class UserEvent(Event):
    __version__ = 2  # Explicit version required
    
    class Data(BaseModel):
        user_id: int
        email: str
        name: str | None = None  # Made optional in v2
    
    data: Data

Result:

  • Schema validation at decode time
  • Contract errors go to DLQ (not retried)
  • Version enforcement (strict mode)
  • Ready for schema evolution governance

๐Ÿ—๏ธ Architecture

Message Flow

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                      Publisher                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                            โ”‚
                            โ–ผ
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚ XADD stream   โ”‚
                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                            โ”‚
                            โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Consumer Group                          โ”‚
โ”‚                                                            โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚           Main Consumer Loop                       โ”‚    โ”‚
โ”‚  โ”‚  โ€ข XREADGROUP                                      โ”‚    โ”‚
โ”‚  โ”‚  โ€ข Decode envelope                                 โ”‚    โ”‚
โ”‚  โ”‚  โ€ข Check idempotency                               โ”‚    โ”‚
โ”‚  โ”‚  โ€ข Execute handler                                 โ”‚    โ”‚
โ”‚  โ”‚  โ€ข XACK on success                                 โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                                                            โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚          Retry Scheduler (background)              โ”‚    โ”‚
โ”‚  โ”‚  โ€ข Poll ZSET every 500ms                           โ”‚    โ”‚
โ”‚  โ”‚  โ€ข XADD due retries back to stream                 โ”‚    โ”‚
โ”‚  โ”‚  โ€ข ZREM from retry queue                           โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚                                                            โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚          Poison Checker (background)               โ”‚    โ”‚
โ”‚  โ”‚  โ€ข Scan XPENDING every 10s                         โ”‚    โ”‚
โ”‚  โ”‚  โ€ข XCLAIM idle messages (< max deliveries)         โ”‚    โ”‚
โ”‚  โ”‚  โ€ข DLQ poison messages (>= max deliveries)         โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                            โ”‚
                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ”‚                โ”‚
                    โ–ผ                โ–ผ
              โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
              โ”‚   XACK   โ”‚    โ”‚    DLQ    โ”‚
              โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Three-Layer Design

  1. Application Layer (evora/app.py)

    • Event decoding/validation
    • Handler dispatch
    • Error classification
    • Idempotency gate
    • Telemetry hooks
  2. Broker Layer (evora/brokers/)

    • Message transport (Redis Streams, Kafka, etc.)
    • Durable retry scheduling
    • Poison message detection
    • Consumer group coordination
  3. Storage Layer

    • Idempotency store (Redis)
    • Retry queue (Redis ZSET)
    • DLQ (Redis Stream)
    • PEL (Redis Streams native)

๐Ÿ“Š Reliability Guarantees

Property Mechanism Crash-Safe?
At-least-once delivery Consumer groups + XACK โœ…
Exactly-once processing Idempotency store โœ…
Ordered processing Partition keys โœ…
Durable retry ZSET delay queue โœ…
Exponential backoff Computed delay โœ…
Retry exhaustion Max attempts โ†’ DLQ โœ…
Poison detection XPENDING scan โœ…
Delivery limits Redis PEL tracking โœ…
Consumer coordination Consumer groups โœ…
Crash recovery Automatic (PEL + poison checker) โœ…

๐ŸŽฏ Use Cases

1. Microservices Communication

# Order Service โ†’ Payment Service โ†’ Fulfillment Service
# Each service subscribes to relevant events
# Automatic retry on transient failures
# DLQ for permanent failures

2. Event Sourcing

# Append-only event log
# Multiple projections (read models)
# Idempotent replay
# Version-safe schema evolution

3. CQRS

# Command side publishes events
# Query side builds read models
# Guaranteed exactly-once updates
# Crash-safe processing

4. Saga Orchestration

# Multi-step distributed transactions
# Each step is idempotent
# Automatic retry on failures
# Compensation via DLQ handlers

5. CDC (Change Data Capture)

# Database changes โ†’ Events
# Multiple downstream consumers
# Guaranteed delivery
# Order preservation per entity

๐Ÿ”ง Configuration Reference

Broker Configuration

broker = RedisStreamsBroker(
    client=redis_client,
    group_id="my-service",              # Consumer group name
    consumer_name=None,                 # Auto: hostname-pid
    
    # Consumption
    block_ms=2000,                      # XREADGROUP block time
    batch_size=50,                      # Messages per poll
    mkstream=True,                      # Auto-create streams
    
    # Retry (app-level)
    base_delay_ms=500,                  # Initial retry delay
    max_delay_ms=30_000,                # Max delay (exponential cap)
    retry_zset_suffix=".retry.z",      # ZSET key suffix
    
    # Poison detection (broker-level)
    poison_idle_ms=60_000,              # 60s idle โ†’ reclaim
    poison_max_deliveries=10,           # 10 deliveries โ†’ DLQ
    poison_check_interval_s=10.0,       # Check PEL every 10s
    
    # DLQ
    dlq_suffix=".dlq",                  # DLQ stream suffix
)

App Configuration

app = App(
    broker=broker,
    source="my-service",                # Service name
    idempotency_store=store,
    strict=True,                        # Enforce best practices
    dlq_suffix=".dlq",                  # Must match broker
    telemetry=None,                     # Custom telemetry
)

Handler Configuration

@subscribe(
    MyEvent,
    channel=None,                       # Override event_type()
    retry="exponential",                # Or "constant"
    max_attempts=5,                     # Retry limit
    dlq=True,                           # Enable DLQ routing
    idempotency=IdempotencyPolicy(
        mode="event_id",                # Dedupe strategy
        ttl_seconds=86400,              # 24 hours
    ),
)
async def my_handler(event, ctx):
    pass

๐Ÿ“ˆ Performance

Throughput

  • Single consumer: 5,000-10,000 msgs/sec (handler dependent)
  • Multi-consumer: Linear scaling with consumer count
  • Bottleneck: Handler execution time

Latency

  • Normal path: 1-5ms (XREADGROUP โ†’ handler โ†’ XACK)
  • Retry path: Configured delay (500ms, 1s, 2s, 4s...)
  • Poison detection: 10s interval + idle threshold

Resource Usage

  • Redis memory: ~1KB per pending message
  • ZSET entries: 1 per scheduled retry
  • Idempotency keys: TTL-based cleanup

๐Ÿ” Monitoring & Observability

Key Metrics to Track

# Processing
evora.consume.success          # Successful processing
evora.consume.retry            # App-level retries
evora.consume.dlq              # DLQ routing
evora.consume.duration         # Handler latency

# Poison detection
evora.poison.reclaimed         # XCLAIM operations
evora.poison.dlq               # Poison โ†’ DLQ

# Health
evora.pel.depth{stream}        # PEL size per stream
evora.dlq.depth{stream}        # DLQ size per stream
evora.retry.scheduled          # Retry queue additions

Redis Commands

# Stream health
redis-cli XLEN users.events
redis-cli XINFO GROUPS users.events

# PEL inspection
redis-cli XPENDING users.events my-service
redis-cli XPENDING users.events my-service - + 100

# DLQ inspection
redis-cli XLEN users.events.dlq
redis-cli XRANGE users.events.dlq - + COUNT 10

# Retry queue
redis-cli ZCARD users.events.retry.z
redis-cli ZRANGE users.events.retry.z 0 -1 WITHSCORES

๐Ÿงช Testing

Unit Tests

from evora.brokers.memory import MemoryBroker

async def test_my_handler():
    # Use in-memory broker for fast tests
    broker = MemoryBroker()
    app = App(broker=broker, ...)
    
    event = MyEvent(...)
    await app.publish(event)
    
    # Assert handler effects

Integration Tests

# Use real Redis, test full flow
broker = RedisStreamsBroker(...)
app = App(broker=broker, ...)

await app.publish(event)
await asyncio.sleep(0.5)  # Allow processing

# Assert side effects (DB, external APIs)

Chaos Tests

# Kill consumer mid-processing
python my_consumer.py &
kill -9 $!

# Verify:
# - Message enters PEL
# - Poison checker reclaims after 60s
# - Message reprocessed by new consumer

๐Ÿš€ Getting Started

1. Run Examples

# Start Redis
docker run -d -p 6379:6379 redis:7-alpine

# Run basic example
python examples/memory_ex.py

# Run Redis example with poison detection
python examples/redis_poison_demo.py --consumer

2. Build Your Service

# 1. Define events (evora/events.py)
# 2. Define handlers (evora/handlers.py)
# 3. Setup app (evora/main.py)
# 4. Deploy with monitoring

๐ŸŽ“ Philosophy

1. Fail-Fast Configuration

"If you can forget it, it should be required."

Evora enforces critical patterns at compile time, not runtime.

2. Explicit Over Implicit

"Retryable errors look different from fatal errors."

Error classification is part of your code, not buried in config.

3. Crash-Safe By Default

"If a process dies, work continues."

Durable retry queues and poison detection are built-in, not add-ons.

4. Observability First

"You can't fix what you can't see."

Structured DLQ, telemetry hooks, and full message forensics.

5. Progressive Complexity

"Start simple, scale when needed."

In-memory broker for tests, Redis for production, Kafka when you outgrow Redis.


๐Ÿ“š Documentation

Essential Docs:

Reference Docs:

Deep Dives:

Examples:

  • examples/ - Working code examples with README

๐Ÿ”ฎ Roadmap

โœ… Completed (v0.1.0 - v0.2.0)

  • Strict mode enforcement
  • Error classification
  • Durable retry (ZSET)
  • Idempotency (Redis)
  • Poison message detection
  • Consumer groups
  • Structured DLQ
  • Schema governance CLI
  • Breaking change detection
  • Compatibility checks

๐Ÿšง In Progress (v0.3.0)

  • Rabbit broker implementation
  • OpenTelemetry integration
  • Prometheus metrics
  • Structured logging

๐Ÿ“‹ Planned (v0.4.0+)

  • Outbox pattern
  • Kafka broker implementation
  • Admin CLI tools
  • Performance benchmarks
  • Message replay utilities

๐Ÿค Contributing

We welcome contributions! See CONTRIBUTING.md for guidelines.

Quick Start for Contributors

# Clone and setup
git clone https://github.com/tase-nikol/evora.git
cd evora
python -m venv venv
source venv/bin/activate
pip install -e ".[dev]"

# Run tests
pytest

# Run linting
ruff check evora/

๐Ÿ“„ License

Evora is released under the MIT License. See LICENSE file for details.


๐Ÿ™ Acknowledgments

Inspired by:

  • AWS SQS - Visibility timeout, DLQ patterns
  • Kafka - Consumer groups, offset management
  • Celery - Task retry logic
  • Temporal - Durable execution
  • Redis Streams - PEL, consumer groups

But simpler than all of them while maintaining production reliability.


๐ŸŽฏ Status

Current Version: 0.2.0 (Alpha)
Production Status: Redis backend production-ready
Next Milestone: Rabbit Broker (v0.3.0)


Evora: Event processing that doesn't fail silently.

Stop treating events like fire-and-forget. Start treating them like financial transactions.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

evora-0.2.2.tar.gz (70.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

evora-0.2.2-py3-none-any.whl (40.1 kB view details)

Uploaded Python 3

File details

Details for the file evora-0.2.2.tar.gz.

File metadata

  • Download URL: evora-0.2.2.tar.gz
  • Upload date:
  • Size: 70.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for evora-0.2.2.tar.gz
Algorithm Hash digest
SHA256 1921886139a84de8420e9079c256129e86b202a95e109e4c0884f0d1a94b3090
MD5 c153551df4c8c5d33e467944ac4a2ee2
BLAKE2b-256 8b96988bf2483c3759fff7ab6cc41585d2d6f68d30c0955a3ca60875d4fc769e

See more details on using hashes here.

Provenance

The following attestation bundles were made for evora-0.2.2.tar.gz:

Publisher: ci.yml on tase-nikol/evora

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file evora-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: evora-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 40.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for evora-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 36955f9249ed2472ad5d7aadb9768e81cc133acb3200bca3d184063051f064e5
MD5 0b7db3fd86b082e366a6f6e5a99dc0de
BLAKE2b-256 a86f3a97e01b837a17f35182cdbf894b6f9ef6b2792458c469f51a0413bec3d1

See more details on using hashes here.

Provenance

The following attestation bundles were made for evora-0.2.2-py3-none-any.whl:

Publisher: ci.yml on tase-nikol/evora

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page