FastAPI-like, modern async SQS message processing for Python with advanced features
Project description
FastSQS
FastAPI-like, production-ready async SQS message processing for Python.
Version 0.3.0 - Production Ready Features
๐ New Enterprise Features
- Idempotency: Prevent duplicate message processing with memory or DynamoDB storage
- Advanced Error Handling: Exponential backoff, circuit breaker, and DLQ management
- Visibility Timeout Management: Automatic monitoring and extension for long-running processes
- Parallelization: Concurrent processing with semaphore-based limiting and thread pools
Key Features
- ๐ FastAPI-like API: Familiar decorator-based routing with automatic type inference
- ๐ Pydantic Validation: Automatic message validation and serialization using SQSEvent models
- ๐ Auto Async/Sync: Write handlers as sync or async functions - framework handles both automatically
- ๏ฟฝ๏ธ Middleware Support: Built-in and custom middleware for logging, timing, and more
- ๐ฆพ Partial Batch Failure: Native support for AWS Lambda batch failure responses
- ๐ FIFO & Standard Queues: Full support for both SQS queue types with proper ordering
- ๐ฏ Flexible Matching: Automatic field name normalization (camelCase โ snake_case)
- ๐๏ธ Nested Routing: QueueRouter support for complex routing scenarios
- ๐ Type Safety: Full type hints and editor support throughout
Requirements
- Python 3.8+
- Pydantic (installed automatically)
Installation
pip install fastsqs
Quick Start
Basic FastAPI-like Example
from fastsqs import FastSQS, SQSEvent
class UserCreated(SQSEvent):
user_id: str
email: str
name: str
class OrderProcessed(SQSEvent):
order_id: str
amount: float
# Create FastSQS app
app = FastSQS(debug=True)
# Route messages using SQSEvent models
@app.route(UserCreated)
async def handle_user_created(msg: UserCreated):
print(f"User created: {msg.name} ({msg.email})")
@app.route(OrderProcessed)
def handle_order_processed(msg: OrderProcessed):
print(f"Order {msg.order_id}: ${msg.amount}")
# Default handler for unmatched messages
@app.default()
def handle_unknown(payload, ctx):
print(f"Unknown message: {payload}")
# AWS Lambda handler
def lambda_handler(event, context):
return app.handler(event, context)
Example SQS Message Payloads
{
"type": "user_created",
"user_id": "123",
"email": "user@example.com",
"name": "John Doe"
}
{
"type": "order_processed",
"order_id": "ord-456",
"amount": 99.99
}
Advanced Features
FIFO Queue Support
from fastsqs import FastSQS, QueueType
app = FastSQS(
queue_type=QueueType.FIFO,
debug=True
)
# Messages in the same messageGroupId are processed sequentially
# Different groups are processed in parallel
Flexible Field Matching
FastSQS automatically handles field name variations:
class UserEvent(SQSEvent):
user_id: str # Will match: user_id, userId, USER_ID, etc.
first_name: str # Will match: first_name, firstName, etc.
@app.route(UserEvent)
def handle_user(msg: UserEvent):
print(f"User: {msg.user_id}, Name: {msg.first_name}")
SQS messages with different field formats work automatically:
{"type": "user_event", "userId": "123", "firstName": "John"}
Middleware
from fastsqs.middleware import TimingMiddleware, LoggingMiddleware
# Built-in middleware
app.add_middleware(TimingMiddleware())
app.add_middleware(LoggingMiddleware())
# Custom middleware
from fastsqs.middleware import Middleware
class AuthMiddleware(Middleware):
async def before(self, payload, record, context, ctx):
if not payload.get("auth_token"):
raise ValueError("Missing auth token")
async def after(self, payload, record, context, ctx, error):
if error:
print(f"Handler failed: {error}")
app.add_middleware(AuthMiddleware())
Complex Routing with QueueRouter
For complex nested routing scenarios, you can still use QueueRouter:
from fastsqs import FastSQS, QueueRouter
# Main FastSQS app
app = FastSQS()
# Complex router for nested routing
user_router = QueueRouter(key="action")
@user_router.route("create")
def create_user(msg, ctx):
print(f"Creating user: {msg}")
@user_router.route("delete")
def delete_user(msg, ctx):
print(f"Deleting user: {msg}")
# Include the router for complex scenarios
app.include_router(user_router)
Example payload for nested routing:
{
"service": "users",
"action": "create",
"user_data": {...}
}
Package Structure
The library is organized into clean, modular components:
fastsqs/
โโโ __init__.py # Main exports (FastSQS, QueueRouter, etc.)
โโโ app.py # Main FastSQS class
โโโ events.py # SQSEvent base class with field normalization
โโโ types.py # Type definitions
โโโ exceptions.py # Custom exceptions
โโโ utils.py # Utility functions
โโโ middleware/ # Middleware components
โ โโโ __init__.py
โ โโโ base.py # Base middleware class
โ โโโ timing.py # Timing middleware
โ โโโ logging.py # Logging middleware
โโโ routing/ # QueueRouter for complex routing
โโโ __init__.py
โโโ entry.py # Route entry
โโโ router.py # Router implementation
How it Works
Message Processing Flow
- Message Parsing: JSON message body is parsed and validated
- Route Matching: Message type is extracted and matched to registered routes
- Model Validation: SQSEvent model validates and normalizes the message data
- Handler Execution: Your handler function is called with the validated model
- Error Handling: Any errors result in batch item failures for SQS retry
Key Concepts
- FastAPI-like Decorators: Use
@app.route(SQSEventModel)to register handlers - Automatic Type Inference: Handler signature determines what parameters to pass
- Field Normalization: camelCase โ snake_case conversion happens automatically
- Flexible Matching: Multiple field name variants are supported out of the box
- Standard Error Handling: All errors result in message failure and SQS retry/DLQ
Error Handling
FastSQS uses a simple, predictable error handling strategy:
| Error Type | Behavior | SQS Result |
|---|---|---|
| Invalid JSON | InvalidMessage exception |
Message fails โ retry/DLQ |
| Validation Error | InvalidMessage exception |
Message fails โ retry/DLQ |
| No Route Found | RouteNotFound exception |
Message fails โ retry/DLQ |
| Handler Exception | Exception propagates | Message fails โ retry/DLQ |
All failures are added to batchItemFailures in the Lambda response, allowing SQS to handle retries and dead letter queues according to your queue configuration.
Performance Features
- FastAPI-like Efficiency: Minimal overhead with fast dictionary-based routing
- Parallel Processing: Standard SQS messages are processed concurrently
- FIFO Ordering: FIFO messages maintain order within message groups
- Partial Batch Failures: Only failed messages are retried, not entire batches
- Type Safety: Full type checking and validation with Pydantic
- Memory Efficient: Minimal overhead per message with automatic cleanup
Documentation
- Examples: See the
examples/directory for complete working examples - Type Safety: Full type hints throughout for excellent IDE support
- API Reference: All classes and methods include comprehensive docstrings
Contributing
Contributions, issues, and feature requests are welcome! Please open an issue or submit a pull request.
License
This project is licensed under the terms of the MIT license.
Ready to build type-safe, FastAPI-like SQS processors? Try FastSQS today!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastsqs-0.3.0.tar.gz.
File metadata
- Download URL: fastsqs-0.3.0.tar.gz
- Upload date:
- Size: 24.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5f03b7bca566c08573886786dfb7fbaf2366db5b88e3a2de25c0a365326ab6af
|
|
| MD5 |
1b8d263e7543c92784457e2252810fd5
|
|
| BLAKE2b-256 |
e94737244074da70f7f4f766993140fbaac72c61a220fd3cd1bd8ab126f70f45
|
File details
Details for the file fastsqs-0.3.0-py3-none-any.whl.
File metadata
- Download URL: fastsqs-0.3.0-py3-none-any.whl
- Upload date:
- Size: 26.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d34767e50ed7ce980d7af386f08c823b732b34f176d34e1661b25fb9571301ff
|
|
| MD5 |
b9cd64e1c636d9460571a4ee182212f6
|
|
| BLAKE2b-256 |
c2369d2846d8922c1b9a90b72376ea94e637c9ce30ce70835a152e5198e3c005
|