AWS Python Helper Framework
Project description
AWS Python Framework
Mini-framework to create REST APIs, SQS Consumers, SNS Publishers, Fargate Tasks, and Standalone Lambdas with Python in AWS Lambda.
๐ Features
- Reusable single handler: A single handler for all your API routes
- Dynamic controller loading: Routing based on convention
- OOP structure: Object-oriented programming for your code
- Flexible MongoDB: Direct access to multiple databases without models
- External MongoDB: Connect to multiple MongoDB clusters simultaneously
- SQS Consumers: Same pattern to process SQS messages (single or batch mode)
- SNS Publishers: Same pattern to publish messages to SNS topics
- Fargate Tasks: Same pattern to run tasks in Fargate containers
- Standalone Lambdas: Create lambdas invocable directly with AWS SDK
- Authentication middleware: Built-in token-based authentication
- JSON utilities: Automatic serialization of MongoDB types
- Type hints: Modern Python with type annotations
- Async/await: Full support for asynchronous operations
๐ง Installation
pip install aws-python-helper
๐ฆ Quick Reference
All available classes and functions:
| Class / Function | Import | Purpose |
|---|---|---|
API |
aws_python_helper.api.base |
Base class for REST endpoints |
api_handler |
aws_python_helper.api.handler |
Generic handler for API Gateway |
SQSConsumer |
aws_python_helper.sqs.consumer_base |
Base class for SQS consumers |
sqs_handler |
aws_python_helper.sqs.handler |
Factory handler for SQS |
SNSPublisher |
aws_python_helper.sns.publisher |
Base class for SNS publishers |
Lambda |
aws_python_helper.lambda_standalone.base |
Base class for Standalone Lambdas |
lambda_handler |
aws_python_helper.lambda_standalone.handler |
Factory handler for Lambda |
FargateTask |
aws_python_helper.fargate.task_base |
Base class for Fargate tasks |
FargateExecutor |
aws_python_helper.fargate.executor |
Launches Fargate tasks from Lambda |
fargate_handler |
aws_python_helper.fargate.handler |
Entry point handler for Fargate |
MongoJSONEncoder |
aws_python_helper.utils.json_encoder |
JSON encoder for MongoDB types |
mongo_json_dumps |
aws_python_helper.utils.json_encoder |
Helper to serialize MongoDB types |
serialize_mongo_types |
aws_python_helper.utils.serializer |
Recursively serialize MongoDB types |
UnauthorizedError |
aws_python_helper.api.exceptions |
401 authentication exception |
ForbiddenError |
aws_python_helper.api.exceptions |
403 authorization exception |
๐ Project Structure
This framework follows a convention-based folder structure. Here's the recommended organization:
your-project/
โโโ src/
โโโ api/ # REST APIs
โ โโโ users/ # Resource folder (kebab-case)
โ โโโ get.py # GET /users/123 -> UserGetAPI
โ โโโ list.py # GET /users -> UserListAPI
โ โโโ post.py # POST /users -> UserPostAPI
โ โโโ put.py # PUT /users/123 -> UserPutAPI
โ โโโ delete.py # DELETE /users/123 -> UserDeleteAPI
โ
โโโ consumer/ # SQS Consumers (direct files)
โ โโโ user_created.py # user-created -> UserCreatedConsumer
โ โโโ title_indexed.py # title-indexed -> TitleIndexedConsumer
โ โโโ order_processed.py # order-processed -> OrderProcessedConsumer
โ
โโโ lambda/ # Standalone Lambdas (folders)
โ โโโ generate-route/ # generate-route -> GenerateRouteLambda
โ โ โโโ main.py
โ โโโ sync-carrier/ # sync-carrier -> SyncCarrierLambda
โ โ โโโ main.py
โ โโโ process-payment/ # process-payment -> ProcessPaymentLambda
โ โโโ main.py
โ
โโโ task/ # Fargate Tasks (folders)
โ โโโ search-tax-by-town/ # search-tax-by-town -> SearchTaxByTownTask
โ โ โโโ main.py # Entry point
โ โ โโโ task.py # Task class
โ โโโ process-data/ # process-data -> ProcessDataTask
โ โโโ main.py
โ โโโ task.py
โ
โโโ topic/ # SNS Publishers
โโโ order_created.py # OrderCreatedTopic
Naming Conventions
The framework uses automatic class name detection based on your folder/file structure:
| Type | Handler Name | File Path | Class Name |
|---|---|---|---|
| API | N/A | src/api/users/list.py |
UsersListAPI |
| Consumer | user-created |
src/consumer/user_created.py |
UserCreatedConsumer |
| Lambda | generate-route |
src/lambda/generate-route/main.py |
GenerateRouteLambda |
| Task | search-tax-by-town |
src/task/search-tax-by-town/task.py |
SearchTaxByTownTask |
Rules:
- Handler names use kebab-case (e.g.,
user-created,generate-route) - Consumer files use snake_case (e.g.,
user_created.py) - Lambda folders use kebab-case (e.g.,
generate-route/) - Task folders use kebab-case (e.g.,
search-tax-by-town/) - Class names always use PascalCase with suffix (e.g.,
UserCreatedConsumer)
๐ Basic Usage
Create an Endpoint
1. Create your API class in src/api/constitutions/list.py:
from aws_python_helper.api.base import API
class ConstitutionListAPI(API):
async def process(self):
# Direct access to MongoDB
constitutions = await self.db.constitution_db.constitutions.find().to_list(100)
self.set_body(constitutions)
2. The routing is automatic:
GET /constitutionsโsrc/api/constitutions/list.pyGET /constitutions/123โsrc/api/constitutions/get.pyPOST /constitutionsโsrc/api/constitutions/post.py
3. Configure the generic handler (src/handlers/api_handler.py):
from aws_python_helper.api.handler import api_handler
handler = api_handler
Create an SQS Consumer
1. Create your consumer in src/consumer/title_indexed.py:
from aws_python_helper.sqs.consumer_base import SQSConsumer
class TitleIndexedConsumer(SQSConsumer):
async def process_record(self, record):
body = self.extract_content_message(record)
# Your logic here
await self.db.constitution_db.titles.insert_one(body)
2. Configure the handler in src/handlers/sqs_handler.py:
from aws_python_helper.sqs.handler import sqs_handler
# Create a handler for each consumer and export it
title_indexed_handler = sqs_handler('title-indexed')
__all__ = ['title_indexed_handler']
Create a Standalone Lambda
Standalone lambdas are functions that can be invoked directly using the AWS SDK, without an HTTP endpoint. They're perfect for internal operations, integrations, and background processing tasks.
Differences with APIs:
- No API Gateway - invoked directly with AWS SDK
- No HTTP methods or routing
- Can be called from other lambdas, Step Functions, or any AWS service
- Perfect for internal microservices communication
1. Create your lambda class in src/lambda/generate-route/main.py:
from aws_python_helper.lambda_standalone.base import Lambda
from datetime import datetime
class GenerateRouteLambda(Lambda):
async def validate(self):
# Validate input data
if 'shipping_id' not in self.data:
raise ValueError("shipping_id is required")
if not isinstance(self.data['shipping_id'], str):
raise TypeError("shipping_id must be a string")
async def process(self):
# Your business logic here
shipping_id = self.data['shipping_id']
# Access to MongoDB
shipping = await self.db.deliveries.shippings.find_one(
{'_id': shipping_id}
)
if not shipping:
raise ValueError(f"Shipping {shipping_id} not found")
# Create route
route = {
'shipping_id': shipping_id,
'carrier_id': shipping.get('carrier_id'),
'status': 'pending',
'created_at': datetime.utcnow()
}
result = await self.db.deliveries.routes.insert_one(route)
self.logger.info(f"Route created: {result.inserted_id}")
# Return result
return {
'route_id': str(result.inserted_id),
'shipping_id': shipping_id
}
2. Configure the handler in src/handlers/lambda_handler.py:
from aws_python_helper.lambda_standalone.handler import lambda_handler
# Create a handler for each lambda and export it
generate_route_handler = lambda_handler('generate-route')
sync_carrier_handler = lambda_handler('sync-carrier')
process_payment_handler = lambda_handler('process-payment')
__all__ = [
'generate_route_handler',
'sync_carrier_handler',
'process_payment_handler'
]
Note: The handler name 'generate-route' (kebab-case) will automatically look for:
- Folder:
src/lambda/generate-route/(kebab-case) - File:
main.py - Class:
GenerateRouteLambda
3. Invoke from another Lambda or API using boto3:
import boto3
import json
lambda_client = boto3.client('lambda')
# Invoke synchronously (RequestResponse)
response = lambda_client.invoke(
FunctionName='GenerateRouteLambda',
InvocationType='RequestResponse',
Payload=json.dumps({
'data': {
'shipping_id': '507f1f77bcf86cd799439011'
}
})
)
result = json.loads(response['Payload'].read())
# {'success': True, 'data': {'route_id': '...', 'shipping_id': '...'}}
if result['success']:
print(f"Route created: {result['data']['route_id']}")
else:
print(f"Error: {result['error']}")
4. Invoke asynchronously (fire and forget):
# Invoke asynchronously (Event)
lambda_client.invoke(
FunctionName='GenerateRouteLambda',
InvocationType='Event', # Asynchronous
Payload=json.dumps({
'data': {
'shipping_id': '507f1f77bcf86cd799439011'
}
})
)
# Returns immediately without waiting for the result
Naming Convention:
| Lambda Name (kebab-case) | Folder | File | Class |
|---|---|---|---|
generate-route |
src/lambda/generate-route/ |
main.py |
GenerateRouteLambda |
sync-carrier |
src/lambda/sync-carrier/ |
main.py |
SyncCarrierLambda |
process-payment |
src/lambda/process-payment/ |
main.py |
ProcessPaymentLambda |
send-notification |
src/lambda/send-notification/ |
main.py |
SendNotificationLambda |
Common Use Cases:
- Internal microservices communication
- Background data processing
- Integration with external services
- Scheduled tasks (with EventBridge)
- Step Functions workflows
- Cross-service operations
Publish to SNS
1. Create your topic in src/topic/title_indexed.py:
from aws_python_helper.sns.publisher import SNSPublisher
import os
class TitleIndexedTopic(SNSPublisher):
def __init__(self):
super().__init__(
topic_arn=os.getenv('TITLE_INDEXED_SNS_TOPIC_ARN')
)
def build_message(self, constitution_id, title, event_type='title_indexed'):
return {
'content': {
'constitution_id': constitution_id,
'title': title,
'event_type': event_type
},
'attributes': {
'event_type': event_type # Used for SNS subscription filtering
}
}
2. Use the topic from anywhere:
from src.topic.title_indexed import TitleIndexedTopic
# In a consumer, API or task
topic = TitleIndexedTopic()
# Publish a single message
await topic.publish(topic.build_message('123', 'My Constitution'))
# Publish multiple messages in batch
messages = [
topic.build_message('id1', 'Constitution A'),
topic.build_message('id2', 'Constitution B'),
]
await topic.publish(messages)
Message format โ every message must have a content key:
{
'content': {...}, # Required: message body (any dict)
'attributes': {...}, # Optional: SNS message attributes for filtering
'subject': 'Optional subject' # Optional: message subject
}
Run a Fargate Task
1. Create your task in src/task/search-tax-by-town/task.py:
from aws_python_helper.fargate.task_base import FargateTask
class SearchTaxByTownTask(FargateTask):
async def execute(self):
town = self.require_env('TOWN')
self.logger.info(f"Processing town: {town}")
# Access to DB
docs = await self.db.smart_data.address.find({'town': town}).to_list()
# Your logic here
for doc in docs:
# Process document
pass
2. Create the entry point in src/task/search-tax-by-town/main.py:
from aws_python_helper.fargate.handler import fargate_handler
import sys
if __name__ == '__main__':
exit_code = fargate_handler('search-tax-by-town')
sys.exit(exit_code)
3. Create the Dockerfile in src/task/search-tax-by-town/Dockerfile:
FROM python:3.10.12-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt /app/framework_requirements.txt
COPY src/task/search-tax-by-town/requirements.txt /app/task_requirements.txt
RUN pip install -r /app/framework_requirements.txt && \
pip install -r /app/task_requirements.txt
# Copy code
COPY aws_python_helper /app/aws_python_helper
COPY config.py /app/config.py
COPY task /app/task
COPY task/search-tax-by-town/main.py /app/main.py
ENV PYTHONUNBUFFERED=1
CMD ["python", "main.py"]
4. Invoke from Lambda:
from aws_python_helper.fargate.executor import FargateExecutor
def handler(event, context):
executor = FargateExecutor()
task_arn = executor.run_task(
'search-tax-by-town',
envs={'TOWN': 'Norwalk', 'ONLY_TAX': 'true'}
)
return {'taskArn': task_arn}
๐๏ธ Access to MongoDB
The framework provides flexible access to multiple databases:
class MyAPI(API):
async def process(self):
# Access to different databases on the same cluster
user = await self.db.users_db.users.find_one({'_id': user_id})
# Another database
await self.db.analytics_db.logs.insert_one({'action': 'view'})
# Multiple collections
titles = await self.db.constitution_db.titles.find().to_list(100)
articles = await self.db.constitution_db.articles.find().to_list(100)
The pattern is always: self.db.<database_name>.<collection_name>.<motor_operation>()
External MongoDB Clusters
Connect to additional MongoDB clusters using EXTERNAL_MONGODB_CONNECTIONS:
EXTERNAL_MONGODB_CONNECTIONS='[
{"name": "ClusterDockets", "connection_string": "mongodb+srv://cluster.mongodb.net"},
{"name": "ClusterAnalytics", "connection_string": "mongodb+srv://analytics.mongodb.net"}
]'
The credentials from MONGO_DB_USER / MONGO_DB_PASSWORD are automatically injected into the connection strings.
Access external clusters via self.external_db:
class AddressAPI(API):
async def process(self):
# Access external cluster: self.external_db.<ClusterName>.<database>.<collection>
addresses = await self.external_db.ClusterDockets.smart_data.addresses.find(
{'town': self.data['town']}
).to_list(100)
self.set_body({'addresses': addresses})
self.external_db is available in API, SQSConsumer, Lambda, and FargateTask.
๐ Routing Convention
The framework uses convention over configuration for the routing:
| Request | Loaded file |
|---|---|
GET /users |
api/users/list.py |
GET /users/123 |
api/users/get.py |
POST /users |
api/users/post.py |
PUT /users/123 |
api/users/put.py |
DELETE /users/123 |
api/users/delete.py |
GET /users/123/posts |
api/users/posts/list.py |
GET /users/123/posts/456 |
api/users/posts/get.py |
Logic:
- The parts with even indices (0,2,4...) are directories
- The parts with odd indices (1,3,5...) are path parameters
GETwith odd number of parts โ list methodGETwith even number of parts โ get method- Other methods use their name directly
๐งฉ API Class Reference
All properties and methods available inside an API subclass:
Request Properties
| Property | Type | Description |
|---|---|---|
self.data |
dict |
Request body (POST/PUT) or query params (GET) |
self.headers |
dict |
HTTP request headers |
self.path_parameters |
dict |
URL path parameters (e.g. /users/123 โ {'id': '123'}) |
self.query_parameters |
dict |
Query string parameters |
self.db |
DatabaseProxy |
Access to main MongoDB cluster |
self.external_db |
ExternalDatabaseProxy |
Access to external MongoDB clusters |
self.current_user |
dict | None |
Authenticated user document (requires REQUIRE_AUTH=true) |
self.is_authenticated |
bool |
Whether the request is authenticated |
self.auth_data |
dict | None |
Full authentication data |
Response Methods
| Method | Description |
|---|---|
self.set_code(code: int) |
Set HTTP response status code |
self.set_body(body: Any) |
Set response body (auto-serialized to JSON) |
self.set_header(key: str, value: str) |
Add a single response header |
self.set_headers(headers: dict) |
Set multiple response headers at once |
Methods to Override
| Method | Required | Description |
|---|---|---|
async validate() |
Optional | Validate request data, raise exceptions to reject |
async process() |
Required | Main business logic |
class UserGetAPI(API):
async def validate(self):
# Access path params: /users/123 โ self.path_parameters = {'id': '123'}
if not self.path_parameters.get('id'):
raise ValueError("User ID is required")
async def process(self):
user_id = self.path_parameters['id']
user = await self.db.users_db.users.find_one({'_id': user_id})
if not user:
self.set_code(404)
self.set_body({'error': 'User not found'})
return
self.set_code(200)
self.set_body({'data': user})
self.set_header('X-Resource-Id', user_id)
๐ Authentication
The framework includes a built-in token-based authentication middleware.
Configuration
REQUIRE_AUTH=true # Enable authentication (default: false)
AUTH_DB_NAME=my_database # MongoDB database where tokens are stored
AUTH_BYPASS_TOKEN=secret123 # Master token to bypass auth (for internal use)
Using the authenticated user
When REQUIRE_AUTH=true, every request must include a valid Authorization: Bearer <token> header. The authenticated user is available via self.current_user:
class OrderListAPI(API):
async def process(self):
# self.current_user contains the user document from MongoDB
user_id = self.current_user['_id']
orders = await self.db.orders_db.orders.find(
{'user_id': user_id}
).to_list(100)
self.set_body({'data': orders})
Auth exceptions
Use these exceptions in your validate() or process() methods:
from aws_python_helper.api.exceptions import UnauthorizedError, ForbiddenError
class AdminOnlyAPI(API):
async def validate(self):
if not self.is_authenticated:
raise UnauthorizedError("Authentication required") # Returns 401
if self.current_user.get('role') != 'admin':
raise ForbiddenError("Admin access required") # Returns 403
๐ฏ Complete Example
# src/api/constitutions/list.py
from aws_python_helper.api.base import API
class ConstitutionListAPI(API):
async def validate(self):
if 'limit' in self.data:
limit = int(self.data['limit'])
if limit > 1000:
raise ValueError("Limit cannot exceed 1000")
async def process(self):
# Build filters
filters = {}
if 'country' in self.data:
filters['country'] = self.data['country']
# Query MongoDB
limit = int(self.data.get('limit', 100))
results = await self.db.constitution_db.constitutions.find(
filters
).limit(limit).to_list(limit)
# Count total
total = await self.db.constitution_db.constitutions.count_documents(filters)
# Register in analytics
await self.db.analytics_db.searches.insert_one({
'filters': filters,
'result_count': len(results)
})
# Response
self.set_body({
'data': results,
'total': total
})
self.set_header('X-Total-Count', str(total))
๐ Integration Example: API + Standalone Lambda
Here's a complete example showing how an API can invoke a standalone lambda:
Scenario: An API endpoint that creates a shipping and then asynchronously generates its route using a standalone lambda.
1. The API endpoint (src/api/shippings/post.py):
from aws_python_helper.api.base import API
import boto3
import json
class ShippingPostAPI(API):
async def validate(self):
required_fields = ['customer_id', 'address', 'items']
for field in required_fields:
if field not in self.data:
raise ValueError(f"{field} is required")
async def process(self):
# Create shipping in database
shipping = {
'customer_id': self.data['customer_id'],
'address': self.data['address'],
'items': self.data['items'],
'status': 'pending',
'route_pending': True
}
result = await self.db.deliveries.shippings.insert_one(shipping)
shipping_id = str(result.inserted_id)
# Invoke standalone lambda asynchronously to generate route
lambda_client = boto3.client('lambda')
lambda_client.invoke(
FunctionName='GenerateRouteLambda',
InvocationType='Event', # Asynchronous
Payload=json.dumps({
'data': {'shipping_id': shipping_id}
})
)
self.set_code(201)
self.set_body({
'shipping_id': shipping_id,
'status': 'pending',
'message': 'Shipping created, route generation in progress'
})
2. The standalone lambda (src/lambda/generate-route/main.py):
from aws_python_helper.lambda_standalone.base import Lambda
class GenerateRouteLambda(Lambda):
async def validate(self):
if 'shipping_id' not in self.data:
raise ValueError("shipping_id is required")
async def process(self):
shipping_id = self.data['shipping_id']
# Get shipping details
shipping = await self.db.deliveries.shippings.find_one(
{'_id': shipping_id}
)
if not shipping:
raise ValueError(f"Shipping {shipping_id} not found")
# Generate optimal route
route = await self.calculate_optimal_route(shipping)
# Save route
route_result = await self.db.deliveries.routes.insert_one(route)
# Update shipping
await self.db.deliveries.shippings.update_one(
{'_id': shipping_id},
{'$set': {
'route_id': route_result.inserted_id,
'route_pending': False,
'status': 'scheduled'
}}
)
return {
'route_id': str(route_result.inserted_id),
'shipping_id': shipping_id
}
async def calculate_optimal_route(self, shipping):
# Your route calculation logic here
return {
'shipping_id': shipping['_id'],
'carrier_id': shipping.get('carrier_id'),
'estimated_duration': 60,
'status': 'pending'
}
3. Configure handlers (src/handlers/lambda_handler.py):
from aws_python_helper.lambda_standalone.handler import lambda_handler
generate_route_handler = lambda_handler('generate-route')
__all__ = ['generate_route_handler']
Benefits of this pattern:
- API responds immediately (better UX)
- Route generation happens in the background
- Decoupled services (easier to maintain)
- Can retry lambda independently if it fails
- Scalable architecture
๐๏ธ Architecture Overview
Typical flow for event-driven architectures using this framework:
โโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Client โโโโโโถโ API Gateway โโโโโโถโ Lambda: api_handler โ
โโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ (src/api/resource/post.py) โ
โ โ validates, queries MongoDB, โ
โ publishes to SNS โ
โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโ
โ SNS Topic โ
โ (fanout/filter) โ
โโโโโโโโโโฌโโโโโโโโโ
โโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโ
โผ โผ โผ
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ SQS Queue โ โ SQS Queue โ โ SQS Queue โ
โ Platform Aโ โ Platform Bโ โ Platform Cโ
โโโโโโโฌโโโโโโโ โโโโโโโฌโโโโโโโ โโโโโโโฌโโโโโโโ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Lambda: sqs_handler โ
โ (src/consumer/platform_consumer.py) โ
โ โ groups messages, acquires sessions, โ
โ launches Fargate tasks โ
โโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FargateExecutor.run_task()
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Fargate Task: fargate_handler โ
โ (src/task/my-task/task.py) โ
โ โ scrapes/processes data, โ
โ writes results to MongoDB โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ Environment Variables
MongoDB Configuration
The framework supports two ways to configure MongoDB:
Option 1: Full Connection String
# Full URI with embedded credentials
MONGODB_URI=mongodb+srv://user:password@cluster.mongodb.net/dbname?retryWrites=true&w=majority
# or
MONGO_DB_URI=mongodb+srv://user:password@cluster.mongodb.net/dbname
Option 2: Separate Components (Recommended for Terraform)
# Host without credentials
MONGO_DB_HOST=mongodb+srv://cluster.mongodb.net
# Credentials (more secure)
MONGO_DB_USER=admin
MONGO_DB_PASSWORD=my-secure-password
# Optional
MONGO_DB_NAME=my_database
MONGO_DB_OPTIONS=retryWrites=true&w=majority
Benefits of separate components:
- โ Better security: credentials separate from host
- โ Easy integration with Terraform/AWS Secrets Manager
- โ Passwords with special characters are handled automatically
- โ More flexible for different environments
The framework automatically:
- URL-encodes the password (handles
@,:,/, etc.) - Builds the full URI
- Initializes the connection
Terraform Example
environment_variables = {
MONGO_DB_HOST = module.mongodb.connection_string
MONGO_DB_USER = module.mongodb.database_user
MONGO_DB_PASSWORD = module.mongodb.database_password
}
All Environment Variables
| Variable | Required | Description |
|---|---|---|
MONGODB_URI or MONGO_DB_URI |
One of these or components below | Full MongoDB connection string |
MONGO_DB_HOST |
Alt. to URI | MongoDB host (e.g. mongodb+srv://cluster.net) |
MONGO_DB_USER |
Alt. to URI | MongoDB username |
MONGO_DB_PASSWORD |
Alt. to URI | MongoDB password |
MONGO_DB_NAME |
Optional | Default database name |
MONGO_DB_OPTIONS |
Optional | Connection options (e.g. retryWrites=true&w=majority) |
EXTERNAL_MONGODB_CONNECTIONS |
Optional | JSON array of external cluster configurations |
REQUIRE_AUTH |
Optional | Enable authentication middleware (true/false) |
AUTH_DB_NAME |
If REQUIRE_AUTH=true |
MongoDB database for token validation |
AUTH_BYPASS_TOKEN |
Optional | Master token to bypass authentication |
ECS_CLUSTER |
Fargate only | ECS cluster name for FargateExecutor |
ECS_SUBNETS |
Fargate only | Comma-separated subnet IDs for Fargate tasks |
AWS_REGION |
Fargate/SNS/SQS | AWS region |
AWS_ACCOUNT_ID |
SQS get_queue_url |
AWS account ID |
SERVICE_NAME |
SQS get_queue_url |
Service name prefix for queue name |
QUEUE_NAME |
SQS get_queue_url |
Queue name segment |
ENV |
SQS get_queue_url |
Environment suffix (e.g. prod, dev) |
๐ Advanced Features
SQS Consumer - Batch Mode
By default, consumers process messages one by one ("single" mode). Use "batch" mode when you need to group or bulk-process messages:
from aws_python_helper.sqs.consumer_base import SQSConsumer
class OrderConsumer(SQSConsumer):
@property
def processing_mode(self) -> str:
return "batch"
async def process_batch(self, records):
# Group records by some key before processing
grouped = {}
for record in records:
message_id = record.get('messageId')
body = self.extract_content_message(record)
key = body.get('region', 'default')
grouped.setdefault(key, []).append((message_id, body))
for region, messages in grouped.items():
try:
# Bulk operation for the whole group
docs = [msg[1] for msg in messages]
await self.db.orders_db.orders.insert_many(docs)
except Exception as e:
# Mark individual messages as failed
for message_id, _ in messages:
self.add_message_failed(message_id, str(e))
Key methods in SQSConsumer:
| Method / Property | Description |
|---|---|
self.extract_content_message(record) |
Parse message body (handles SNS โ SQS wrapping automatically) |
self.parse_body(record) |
Alias for extract_content_message |
self.add_message_failed(message_id, error) |
Mark a message for retry (batch mode) |
self.get_queue_url() |
Get the SQS queue URL (uses AWS_REGION, AWS_ACCOUNT_ID, SERVICE_NAME, QUEUE_NAME, ENV) |
self.db |
Access to main MongoDB cluster |
self.external_db |
Access to external MongoDB clusters |
Retry behavior:
- Messages marked with
add_message_failed()are reported viareportBatchItemFailures - AWS SQS retries only the failed messages, not the whole batch
- Successful messages in the same batch are not retried
SNS Publisher - Batch Publishing
topic = TitleIndexedTopic()
# Publish multiple messages in a single call
await topic.publish([
{'content': {'id': 'id1', 'title': 'Title 1'}, 'attributes': {'type': 'created'}},
{'content': {'id': 'id2', 'title': 'Title 2'}, 'attributes': {'type': 'updated'}},
{'content': {'id': 'id3', 'title': 'Title 3'}}, # attributes are optional
])
SNS - Message Attributes for Filtering
Use attributes to filter which SQS subscriptions receive each message:
class EventTopic(SNSPublisher):
def __init__(self):
super().__init__(topic_arn=os.getenv('EVENTS_SNS_TOPIC_ARN'))
def build_message(self, payload, event_type, priority='normal'):
return {
'content': payload,
'attributes': {
'event_type': event_type, # SQS subscriptions can filter on this
'priority': priority
}
}
# Usage
topic = EventTopic()
await topic.publish(topic.build_message(
payload={'order_id': '123', 'amount': 99.99},
event_type='order_created',
priority='high'
))
Fargate - Run multiple tasks
executor = FargateExecutor()
task_arns = executor.run_task_batch(
'search-tax-by-town',
[
{'TOWN': 'Norwalk'},
{'TOWN': 'Stamford'},
{'TOWN': 'Bridgeport'}
]
)
Fargate - Check task status
executor = FargateExecutor()
task_arn = executor.run_task('my-task', {'PARAM': 'value'})
# Check task status
status = executor.get_task_status(task_arn)
print(f"Status: {status['status']}")
print(f"Started at: {status['started_at']}")
JSON Utilities for MongoDB Types
When returning MongoDB documents in API responses or exporting data, use the built-in serializers to handle ObjectId, datetime, Decimal128, and other BSON types:
import json
from aws_python_helper.utils.json_encoder import MongoJSONEncoder, mongo_json_dumps
from aws_python_helper.utils.serializer import serialize_mongo_types
# Use as json.dumps cls parameter
json_str = json.dumps(my_mongo_doc, cls=MongoJSONEncoder)
# Helper function
json_str = mongo_json_dumps(my_mongo_doc)
# Convert a document in-place (dict โ JSON-serializable dict)
clean_doc = serialize_mongo_types(my_mongo_doc)
Types automatically converted:
| MongoDB Type | Converts to |
|---|---|
ObjectId |
str |
datetime |
ISO 8601 string |
date |
ISO 8601 string |
Decimal128 |
float |
Decimal |
float |
Binary |
base64 str |
UUID |
str |
bytes |
base64 str |
set |
list |
Common use case โ exporting query results to JSON files:
from aws_python_helper.utils.json_encoder import MongoJSONEncoder
class ExportResultsAPI(API):
async def process(self):
records = await self.db.orders_db.orders.find({}).to_list(1000)
# Write to file with MongoJSONEncoder
with open('/tmp/export.json', 'w') as f:
json.dump(records, f, cls=MongoJSONEncoder, ensure_ascii=False, indent=2)
๐ค Contributing
If you find bugs or want to add features, please create a PR!
๐ License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aws_python_helper-0.30.1.tar.gz.
File metadata
- Download URL: aws_python_helper-0.30.1.tar.gz
- Upload date:
- Size: 61.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
399136742f245f8baaf91abf19ce1486c6d388b09d964db07cf310e97dee16ff
|
|
| MD5 |
aa6bbf45c59884ceb48a432c0dbb2857
|
|
| BLAKE2b-256 |
25fc04088427369f2f2206f211bd013dcedac1bae786455dfd27831445394e61
|
File details
Details for the file aws_python_helper-0.30.1-py3-none-any.whl.
File metadata
- Download URL: aws_python_helper-0.30.1-py3-none-any.whl
- Upload date:
- Size: 56.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0547e947e93a864281117bbfdc03e30704d8c81534ca9f617787e3e6452bd57d
|
|
| MD5 |
317d5c316f471e9ddfe261e7c45cdb58
|
|
| BLAKE2b-256 |
63e49ad546c7eed37cf2b190e921429f9d2dbbe86ff224fd50d4b2e40fa10696
|