Skip to main content

FastAPI-like, modern async SQS message processing for Python with advanced features

Project description

FastSQS

FastAPI-like, production-ready async SQS message processing for Python.

PyPI version License: MIT


Version 0.3.0 - Production Ready Features

๐Ÿš€ New Enterprise Features

  • Idempotency: Prevent duplicate message processing with memory or DynamoDB storage
  • Advanced Error Handling: Exponential backoff, circuit breaker, and DLQ management
  • Visibility Timeout Management: Automatic monitoring and extension for long-running processes
  • Parallelization: Concurrent processing with semaphore-based limiting and thread pools

Key Features

  • ๐Ÿš€ FastAPI-like API: Familiar decorator-based routing with automatic type inference
  • ๐Ÿ”’ Pydantic Validation: Automatic message validation and serialization using SQSEvent models
  • ๐Ÿ”„ Auto Async/Sync: Write handlers as sync or async functions - framework handles both automatically
  • ๏ฟฝ๏ธ Middleware Support: Built-in and custom middleware for logging, timing, and more
  • ๐Ÿฆพ Partial Batch Failure: Native support for AWS Lambda batch failure responses
  • ๐Ÿ”€ FIFO & Standard Queues: Full support for both SQS queue types with proper ordering
  • ๐ŸŽฏ Flexible Matching: Automatic field name normalization (camelCase โ†” snake_case)
  • ๐Ÿ—๏ธ Nested Routing: QueueRouter support for complex routing scenarios
  • ๐Ÿ Type Safety: Full type hints and editor support throughout

Requirements

  • Python 3.8+
  • Pydantic (installed automatically)

Installation

pip install fastsqs

Quick Start

Basic FastAPI-like Example

from fastsqs import FastSQS, SQSEvent

class UserCreated(SQSEvent):
    user_id: str
    email: str
    name: str

class OrderProcessed(SQSEvent):
    order_id: str
    amount: float

# Create FastSQS app
app = FastSQS(debug=True)

# Route messages using SQSEvent models
@app.route(UserCreated)
async def handle_user_created(msg: UserCreated):
    print(f"User created: {msg.name} ({msg.email})")

@app.route(OrderProcessed)
def handle_order_processed(msg: OrderProcessed):
    print(f"Order {msg.order_id}: ${msg.amount}")

# Default handler for unmatched messages
@app.default()
def handle_unknown(payload, ctx):
    print(f"Unknown message: {payload}")

# AWS Lambda handler
def lambda_handler(event, context):
    return app.handler(event, context)

Example SQS Message Payloads

{
  "type": "user_created",
  "user_id": "123",
  "email": "user@example.com",
  "name": "John Doe"
}
{
  "type": "order_processed",
  "order_id": "ord-456",
  "amount": 99.99
}

Advanced Features

FIFO Queue Support

from fastsqs import FastSQS, QueueType

app = FastSQS(
    queue_type=QueueType.FIFO,
    debug=True
)

# Messages in the same messageGroupId are processed sequentially
# Different groups are processed in parallel

Flexible Field Matching

FastSQS automatically handles field name variations:

class UserEvent(SQSEvent):
    user_id: str  # Will match: user_id, userId, USER_ID, etc.
    first_name: str  # Will match: first_name, firstName, etc.

@app.route(UserEvent)
def handle_user(msg: UserEvent):
    print(f"User: {msg.user_id}, Name: {msg.first_name}")

SQS messages with different field formats work automatically:

{"type": "user_event", "userId": "123", "firstName": "John"}

Middleware

from fastsqs.middleware import TimingMiddleware, LoggingMiddleware

# Built-in middleware
app.add_middleware(TimingMiddleware())
app.add_middleware(LoggingMiddleware())

# Custom middleware
from fastsqs.middleware import Middleware

class AuthMiddleware(Middleware):
    async def before(self, payload, record, context, ctx):
        if not payload.get("auth_token"):
            raise ValueError("Missing auth token")
    
    async def after(self, payload, record, context, ctx, error):
        if error:
            print(f"Handler failed: {error}")

app.add_middleware(AuthMiddleware())

Complex Routing with QueueRouter

For complex nested routing scenarios, you can still use QueueRouter:

from fastsqs import FastSQS, QueueRouter

# Main FastSQS app
app = FastSQS()

# Complex router for nested routing
user_router = QueueRouter(key="action")

@user_router.route("create")
def create_user(msg, ctx):
    print(f"Creating user: {msg}")

@user_router.route("delete") 
def delete_user(msg, ctx):
    print(f"Deleting user: {msg}")

# Include the router for complex scenarios
app.include_router(user_router)

Example payload for nested routing:

{
  "service": "users",
  "action": "create",
  "user_data": {...}
}

Package Structure

The library is organized into clean, modular components:

fastsqs/
โ”œโ”€โ”€ __init__.py          # Main exports (FastSQS, QueueRouter, etc.)
โ”œโ”€โ”€ app.py               # Main FastSQS class  
โ”œโ”€โ”€ events.py            # SQSEvent base class with field normalization
โ”œโ”€โ”€ types.py             # Type definitions
โ”œโ”€โ”€ exceptions.py        # Custom exceptions
โ”œโ”€โ”€ utils.py             # Utility functions
โ”œโ”€โ”€ middleware/          # Middleware components
โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ”œโ”€โ”€ base.py         # Base middleware class
โ”‚   โ”œโ”€โ”€ timing.py       # Timing middleware
โ”‚   โ””โ”€โ”€ logging.py      # Logging middleware
โ””โ”€โ”€ routing/            # QueueRouter for complex routing
    โ”œโ”€โ”€ __init__.py
    โ”œโ”€โ”€ entry.py        # Route entry
    โ””โ”€โ”€ router.py       # Router implementation

How it Works

Message Processing Flow

  1. Message Parsing: JSON message body is parsed and validated
  2. Route Matching: Message type is extracted and matched to registered routes
  3. Model Validation: SQSEvent model validates and normalizes the message data
  4. Handler Execution: Your handler function is called with the validated model
  5. Error Handling: Any errors result in batch item failures for SQS retry

Key Concepts

  • FastAPI-like Decorators: Use @app.route(SQSEventModel) to register handlers
  • Automatic Type Inference: Handler signature determines what parameters to pass
  • Field Normalization: camelCase โ†” snake_case conversion happens automatically
  • Flexible Matching: Multiple field name variants are supported out of the box
  • Standard Error Handling: All errors result in message failure and SQS retry/DLQ

Error Handling

FastSQS uses a simple, predictable error handling strategy:

Error Type Behavior SQS Result
Invalid JSON InvalidMessage exception Message fails โ†’ retry/DLQ
Validation Error InvalidMessage exception Message fails โ†’ retry/DLQ
No Route Found RouteNotFound exception Message fails โ†’ retry/DLQ
Handler Exception Exception propagates Message fails โ†’ retry/DLQ

All failures are added to batchItemFailures in the Lambda response, allowing SQS to handle retries and dead letter queues according to your queue configuration.


Performance Features

  • FastAPI-like Efficiency: Minimal overhead with fast dictionary-based routing
  • Parallel Processing: Standard SQS messages are processed concurrently
  • FIFO Ordering: FIFO messages maintain order within message groups
  • Partial Batch Failures: Only failed messages are retried, not entire batches
  • Type Safety: Full type checking and validation with Pydantic
  • Memory Efficient: Minimal overhead per message with automatic cleanup

Documentation

  • Examples: See the examples/ directory for complete working examples
  • Type Safety: Full type hints throughout for excellent IDE support
  • API Reference: All classes and methods include comprehensive docstrings

Contributing

Contributions, issues, and feature requests are welcome! Please open an issue or submit a pull request.


License

This project is licensed under the terms of the MIT license.


Ready to build type-safe, FastAPI-like SQS processors? Try FastSQS today!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastsqs-0.3.1.tar.gz (26.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastsqs-0.3.1-py3-none-any.whl (28.0 kB view details)

Uploaded Python 3

File details

Details for the file fastsqs-0.3.1.tar.gz.

File metadata

  • Download URL: fastsqs-0.3.1.tar.gz
  • Upload date:
  • Size: 26.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for fastsqs-0.3.1.tar.gz
Algorithm Hash digest
SHA256 d9dc7cc1bc80f54210193a8fb7a12e2b28f9e005c30240a9b293099094889924
MD5 52519fbe426936c90f733efc5d552056
BLAKE2b-256 cf504be5c09e02bb6ca03965076cadc71938cc798ff46bd2f6ced604491f1942

See more details on using hashes here.

File details

Details for the file fastsqs-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: fastsqs-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 28.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for fastsqs-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 86fdfba5ffe4fd4dade507e0eaa25e508a349677c2ebf7c4d7a58c9a6c3dee8f
MD5 a00feb83eb40ee2cd390ea03fb852ce6
BLAKE2b-256 b12bf0763a48002322d0984fdf45b0b6fe9db8920e44f2fcad44481b9ab5d4f3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page