Skip to main content

Fast and simple queue workers

Project description

Eventide

PyPI version Python Versions License

A fast, simple, and extensible queue worker framework for Python.

Overview

Eventide is a modern, lightweight framework for building robust queue-based worker systems in Python. It provides a clean, modular, and provider-agnostic architecture for consuming and processing messages from a variety of queue backends (SQS, Cloudflare, and more).

Key Features:

  • Multiprocess architecture for high throughput and resilience
  • Provider-agnostic queue abstraction with built-in and custom queue support
  • Declarative, decorator-based message handler registration
  • Robust retry/backoff and dead-letter logic
  • Graceful startup and shutdown with signal handling
  • Type-safe configuration using Pydantic models
  • Extensible handler matching and routing

Architecture

Eventide orchestrates the lifecycle of your queue worker system:

  • Main Process: Manages configuration, queue instantiation, worker process lifecycle, and graceful shutdown.
  • Queue Threads: Continuously pull messages from external queues (SQS, Cloudflare, etc.) into internal buffers.
  • Worker Processes: Each worker consumes messages from the buffer and routes them to user-defined handlers that run within the Worker's process.
  • Handlers: User functions decorated with @eventide_handler that process messages matching specific patterns.

All configuration is done via Pydantic models, ensuring type safety and validation.

Installation

pip install eventide

# With SQS support:
pip install eventide[sqs]

# With Cloudflare Queues support:
pip install eventide[cloudflare]

Quick Start

from eventide import (
    Eventide,
    EventideConfig,
    Message,
    SQSQueueConfig,
    eventide_handler,
)

# Define a handler
@eventide_handler("body.type == 'greeting'")
def handle_greeting(message: Message):
    print(f"Received greeting: {message.body.get('content')}")

# Configure Eventide
config = EventideConfig(
    queue=SQSQueueConfig(
        region="us-east-1",
        url="https://sqs.us-east-1.amazonaws.com/123456789012/my-queue",
    ),
    concurrency=2,  # Number of worker processes
)

# Start Eventide
Eventide(config).run()

Configuration

All configuration is via Pydantic models:

  • EventideConfig: Main config (queue, concurrency, handler paths, timeouts, retry policies, etc)
  • SQSQueueConfig, CloudflareQueueConfig, ...: Provider-specific queue configs

Example:

from eventide import EventideConfig, SQSQueueConfig

config = EventideConfig(
    queue=SQSQueueConfig(
        region="us-east-1",
        url="https://sqs.us-east-1.amazonaws.com/123456789012/my-queue",
    ),
    concurrency=4,
    timeout=30.0,  # Handler timeout (seconds)
    retry_limit=5,  # Max retries per message
    retry_min_backoff=1.0,  # Min backoff in seconds
    retry_max_backoff=60.0, # Max backoff in seconds
    handler_paths=["myapp/handlers"],  # Auto-discover handler modules
)

Message Handlers

Handlers are registered using the @eventide_handler decorator. You can match on message attributes, set retry/backoff policies, and more.

from eventide import eventide_handler

@eventide_handler("body.type == 'email'", retry_limit=3, retry_for=[ValueError])
def process_email(message):
    # Process email
    print(f"Processing email: {message.body}")
    return True  # Acknowledge

Advanced matching (multiple matchers, logical operators):

from eventide import eventide_handler

@eventide_handler(
    "body.type == 'notification'",
    "body.priority == 'high'",
    operator="all"  # or "any"
)
def process_notification(_message):
    ...

Advanced Usage

  • Graceful Shutdown: Eventide handles SIGINT/SIGTERM for clean shutdown.
  • Retries & Backoff: Handlers can specify retry policies and backoff intervals.
  • Dead-lettering: Messages that exceed retry limits are sent to a DLQ (if supported).
  • Extensible Matching: Handler matcher logic can be customized for advanced routing.

SQS Queue

from eventide import SQSQueueConfig

sqs_config = SQSQueueConfig(
    region="us-east-1",
    url="https://sqs.us-east-1.amazonaws.com/123456789012/my-queue",
    visibility_timeout=30,  # seconds
    max_number_of_messages=10,  # max messages to fetch
    wait_time_seconds=20,  # for long polling
    buffer_size=100,  # internal buffer size
)

Cloudflare Queue

from eventide import CloudflareQueueConfig

cf_config = CloudflareQueueConfig(
    account_id="my-account-id",
    queue_id="my-queue-id",
    buffer_size=100,  # internal buffer size
    batch_size=10,  # max messages to fetch
    visibility_timeout_ms=30000,  # milliseconds
)

Message Routing with JMESPath

Eventide uses JMESPath expressions to route messages to the appropriate handlers. This provides a powerful and flexible way to match messages based on their content.

What is JMESPath?

JMESPath is a query language for JSON that allows you to extract and transform elements from a JSON document. In Eventide, it's used to match messages to handlers based on their content.

Examples of JMESPath Expressions

# Match messages with a specific type
"body.type == 'email'"

# Match messages with a specific attribute value
"body.customer_id == '12345'"

# Match messages with a specific attribute in an array
"contains(body.tags, 'urgent')"

# Match messages with a numeric comparison
"body.priority > 5"

# Match messages with a specific structure
"body.user.verified == true"

# Complex condition with multiple operators
"body.type == 'order' && body.total > 100"

Combining Multiple Expressions

You can combine multiple JMESPath expressions with logical operators:

from eventide import eventide_handler

# Match messages that satisfy ALL conditions
@eventide_handler(
    "body.type == 'notification'",
    "body.priority == 'high'",
    operator="and"  # Default is "all" which is the same as "and"
)
def priority_notifications_handler(_message):
  pass

# Match messages that satisfy ANY condition
@eventide_handler(
    "body.type == 'email'",
    "body.type == 'sms'",
    operator="or"  # Same as "any"
)
def email_or_sms_handler(_message):
  pass

This approach gives you fine-grained control over which messages are routed to which handlers, allowing for clean separation of concerns in your application.

Practical Example: Order Processing System

Here's a complete example of using Eventide to build an order processing system:

# app.py
from eventide import Eventide, EventideConfig, SQSQueueConfig, eventide_handler

# Define handlers for different message types
@eventide_handler("body.type == 'new_order'")
def process_new_order(message):
    order = message.body.get('order', {})
    order_id = order.get('id')
    print(f"Processing new order: {order_id}")
    # Your order processing logic here
    return True

@eventide_handler("body.type == 'payment_confirmed'")
def process_payment(message):
    order_id = message.body.get('order_id')
    amount = message.body.get('amount')
    print(f"Payment of ${amount} confirmed for order: {order_id}")
    # Update order status, trigger shipping, etc.
    return True

@eventide_handler(
    "body.type == 'order_status_update'",
    "body.status == 'shipped'"
)
def handle_shipped_order(message):
    order_id = message.body.get('order_id')
    tracking_number = message.body.get('tracking_number')
    print(f"Order {order_id} shipped with tracking number: {tracking_number}")
    # Send confirmation email to customer, update database, etc.
    return True

# Configure and run Eventide
if __name__ == "__main__":
    config = EventideConfig(
        queue=SQSQueueConfig(
            region="us-east-1",
            url="https://sqs.us-east-1.amazonaws.com/123456789012/orders-queue",
            # Increase visibility timeout for longer processing tasks
            visibility_timeout=120,
          ),
        # Use multiple workers for better throughput
        concurrency=4,
    )

    # Start processing messages
    print("Starting order processing system...")
    Eventide(config).run()

To run this application:

# Install dependencies
pip install eventide[sqs]

# Run the application
python app.py

This example demonstrates how to:

  1. Define multiple handlers for different types of messages
  2. Use JMESPath expressions to route messages to the appropriate handlers
  3. Configure the application with the appropriate queue settings
  4. Run multiple workers for better throughput

Roadmap

  • Comprehensive test suite
  • Message scheduling (cron and one-off)
  • Lifecycle hooks

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventide-0.0.0a3.tar.gz (55.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventide-0.0.0a3-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file eventide-0.0.0a3.tar.gz.

File metadata

  • Download URL: eventide-0.0.0a3.tar.gz
  • Upload date:
  • Size: 55.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.14

File hashes

Hashes for eventide-0.0.0a3.tar.gz
Algorithm Hash digest
SHA256 27698ff1038bfcb177a3ad2945367f08630b4a47108ce218444a74f2e886f1c0
MD5 e3c48114bcf0021cc6f0dc98f802e16d
BLAKE2b-256 52e70200e9b510d78cb558efde43a87b259c6f0df3f82e1699953fa74b88a89c

See more details on using hashes here.

File details

Details for the file eventide-0.0.0a3-py3-none-any.whl.

File metadata

  • Download URL: eventide-0.0.0a3-py3-none-any.whl
  • Upload date:
  • Size: 21.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.6.14

File hashes

Hashes for eventide-0.0.0a3-py3-none-any.whl
Algorithm Hash digest
SHA256 44e2f8d73f41059094fb43c11a4737da793b448cd09929f3e65642b5b4d1bfd9
MD5 1ff4464de6d93c42c24b565395bda1b8
BLAKE2b-256 c66a06a61bc98b71beec398e4d53f83a0e789db2f1a78635d57bcf567223d745

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page