High-performance SQS message consumer with worker-based concurrency, dependency injection, and async support for Python applications
Project description
Finalsa SQS Consumer
A Python package for creating SQS message consumers in FastAPI applications with built-in dependency injection, interceptors, and async support.
Features
- SQS Message Consumption: Simple, decorator-based SQS message handling
- Worker-based Concurrency: Concurrent message processing with configurable worker pools (like uvicorn)
- Dependency Injection: Built-in dependency injection system with
SqsDepends - Async Support: Full async/await support for message processing
- Interceptors: Pre/post-processing hooks for message handling
- Signal Handling: Graceful shutdown handling
- Testing Support: Built-in testing utilities
- Type Safety: Full type hints and validation
Installation
pip install finalsa-sqs-consumer
Quick Start
from finalsa.sqs.consumer import SqsApp, SqsDepends
# Create app instance with worker-based concurrency
app = SqsApp(
app_name="my-consumer",
queue_url="https://sqs.region.amazonaws.com/account/queue-name",
max_number_of_messages=10,
workers=8 # 8 concurrent workers for high throughput
)
# Define a simple handler
@app.handler("user.created")
async def handle_user_created(message: dict):
print(f"User created: {message}")
# Define handler with dependencies
@app.handler("order.created")
async def handle_order_created(
message: dict,
db_service: DatabaseService = SqsDepends(DatabaseService)
):
await db_service.process_order(message)
# Run the consumer with concurrent workers
if __name__ == "__main__":
app.run() # Starts 8 worker processes
Core Components
SqsApp
Main application class that manages message consumption and routing.
app = SqsApp(
app_name="my-app", # Application identifier
queue_url="...", # SQS queue URL
max_number_of_messages=10, # Max messages per batch
workers=8, # Number of concurrent workers (like uvicorn)
message_timeout=300.0, # Message processing timeout in seconds (default: 5 minutes)
interceptors=[] # List of interceptor classes
)
Worker-based Processing:
- Messages are distributed to a pool of concurrent workers
- Each worker processes messages independently
- Similar to uvicorn's worker model for high throughput
- Automatic load balancing across workers
- Graceful shutdown of all workers
Message Timeout:
- Configurable timeout for processing individual messages
- Prevents workers from being blocked by long-running handlers
- Timed-out messages are logged and returned to queue for retry
- Default timeout: 300 seconds (5 minutes)
Message Handlers
Register handlers for specific message topics:
@app.handler("topic.name")
async def my_handler(message: dict, context: dict = None):
# Process message
pass
### Dependency Injection
Use `SqsDepends` for dependency injection:
```python
class MyService:
def process(self, data): ...
@app.handler("topic")
async def handler(
message: dict,
service: MyService = SqsDepends(MyService)
):
service.process(message)
Interceptors
Create custom interceptors for cross-cutting concerns:
from finalsa.sqs.consumer import AsyncConsumerInterceptor
class LoggingInterceptor(AsyncConsumerInterceptor):
async def before_consume(self, topic: str, message: dict):
print(f"Processing {topic}: {message}")
async def after_consume(self, topic: str, result):
print(f"Completed {topic}")
app = SqsApp(interceptors=[LoggingInterceptor])
Message Timeout Configuration
Configure timeout limits for message processing to prevent workers from being blocked:
Basic Timeout Configuration
# Fast operations (API calls, simple DB operations)
fast_app = SqsApp(
app_name="fast-processor",
queue_url="...",
workers=5,
message_timeout=30.0 # 30 seconds
)
# Data processing operations
data_app = SqsApp(
app_name="data-processor",
queue_url="...",
workers=3,
message_timeout=300.0 # 5 minutes (default)
)
# Heavy computation operations
heavy_app = SqsApp(
app_name="heavy-processor",
queue_url="...",
workers=2,
message_timeout=1800.0 # 30 minutes
)
Timeout Behavior
When a message handler exceeds the timeout:
- The handler execution is cancelled
- An error is logged with timeout details
- The message is not deleted from SQS (remains available for retry)
- The worker becomes available for new messages immediately
Timeout Guidelines
- Fast operations (5-60 seconds): API calls, simple DB operations, cache updates
- Medium operations (1-10 minutes): File processing, image processing, data aggregation
- Heavy operations (10-60 minutes): Large file processing, ML inference, complex data analysis
Example Handler with Timeout
# This handler has a 2-minute timeout
app = SqsApp(message_timeout=120.0)
@app.handler("data.process")
async def process_data(message: dict):
# This operation must complete within 2 minutes
# or it will be cancelled and logged as timeout
await heavy_data_processing(message)
Testing
Use SqsAppTest for testing message handlers:
from finalsa.sqs.consumer import SqsAppTest
def test_user_handler():
test_app = SqsAppTest(app)
# Test handler
result = test_app.test_handler(
"user.created",
{"user_id": 123, "name": "John"}
)
assert result is not None
Error Handling
The library provides specific exceptions:
TopicNotFoundException: Handler not found for topicInvalidMessageException: Message format validation failedTopicAlreadyRegisteredException: Duplicate topic registration
Configuration
Environment Variables
AWS_REGION: AWS region for SQSAWS_ACCESS_KEY_ID: AWS access keyAWS_SECRET_ACCESS_KEY: AWS secret key
Message Format
Expected SQS message format:
{
"topic": "user.created",
"data": {
"user_id": 123,
"name": "John Doe"
},
"metadata": {
"correlation_id": "uuid",
"timestamp": "2024-01-01T00:00:00Z"
}
}
Advanced Usage
Custom Signal Handling
from finalsa.sqs.consumer import SignalHandler
signal_handler = SignalHandler(logger)
# Automatic graceful shutdown on SIGTERM/SIGINT
Concurrent Processing
Configure workers for high-throughput message processing:
# High throughput configuration
app = SqsApp(
app_name="high-throughput-service",
queue_url="...",
max_number_of_messages=10, # Receive multiple messages per batch
workers=16 # 16 concurrent workers
)
@app.handler("bulk.process")
async def process_bulk_data(message: dict):
# Each message processed by available worker
await process_large_dataset(message)
Multiple Workers
app = SqsApp(workers=10) # Process messages with 10 concurrent workers
Benefits of Worker-based Processing:
- Concurrent Execution: Multiple messages processed simultaneously
- Fault Isolation: Worker failures don't affect other workers
- Load Balancing: Messages automatically distributed to available workers
- Graceful Shutdown: All workers stop cleanly on termination signals
- Better Throughput: Ideal for I/O-bound operations like database calls
Batch Processing
app = SqsApp(max_number_of_messages=10) # Receive up to 10 messages per batch
Development
Running Tests
pytest
Linting
ruff check .
Coverage
coverage run -m pytest
coverage report
License
MIT License - see LICENSE.md for details.
Contributing
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Run linting and tests
- Submit a pull request
Requirements
- Python 3.10+
- AWS credentials configured
- SQS queue access
Related Packages
finalsa-common-models: Shared data modelsfinalsa-sqs-client: SQS client implementationfinalsa-sns-client: SNS client for notificationsfinalsa-dependency-injector: Dependency injection framework
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file finalsa_sqs_consumer-3.0.5.tar.gz.
File metadata
- Download URL: finalsa_sqs_consumer-3.0.5.tar.gz
- Upload date:
- Size: 14.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ad44aea6fbfaea54262783bde4542f37843aefecbf95348602af48219bef85ce
|
|
| MD5 |
dac236a01c9c67cc76a694aa0e1c56cc
|
|
| BLAKE2b-256 |
3264c89f059355802e2acacd94ec722f8fea902d65f53a6b08a0767e2f9d27c9
|
File details
Details for the file finalsa_sqs_consumer-3.0.5-py3-none-any.whl.
File metadata
- Download URL: finalsa_sqs_consumer-3.0.5-py3-none-any.whl
- Upload date:
- Size: 20.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3fac450e2d8b6bc1c7287f1ca3918051b462b81afb2303eb86acdc096cf9301
|
|
| MD5 |
93ef0b709be3161bdfb6a6cffc7a8161
|
|
| BLAKE2b-256 |
cac343639816dc3d1e4b081ddf5af8cc955ca74e24104305c3124a655a6e5e95
|