AWS Python Helper Framework
Project description
AWS Python Framework
Mini-framework to create REST APIs, SQS Consumers, SNS Publishers, Fargate Tasks, and Standalone Lambdas with Python in AWS Lambda.
๐ Features
- Reusable single handler: A single handler for all your API routes
- Dynamic controller loading: Routing based on convention
- OOP structure: Object-oriented programming for your code
- Flexible MongoDB: Direct access to multiple databases without models
- External MongoDB: Connect to multiple MongoDB clusters simultaneously
- Session propagation: Automatic
Session(state + user) propagation across the entire call chain for per-state database routing - SQS Consumers: Same pattern to process SQS messages (single or batch mode)
- SNS Publishers: Same pattern to publish messages to SNS topics
- Fargate Tasks: Same pattern to run tasks in Fargate containers
- Standalone Lambdas: Create lambdas invocable directly with AWS SDK
- Authentication middleware: Built-in token-based authentication
- Pydantic schema validation: Declare a
schemaproperty on anyAPIorLambdaand the framework validates automatically beforevalidate()โ returns 400 on failure - LambdaInvoker: Invoke other Lambda functions (sync or async) with built-in error handling
- ApiClient: HTTP client for inter-service communication โ resolves service URLs and auth token from environment variables automatically
- ModelQueryLambda: Base class for MongoDB query-proxy Lambdas โ declare an
allowed_collectionswhitelist and the framework handlesfindandaggregateoperations generically - ModelIndexSyncLambda: Base class for index-synchronization Lambdas โ declare your
repositoriesand the framework creates every declared index across thecoredatabase and all active state databases, idempotently, in a single run (run once per deploy) - JSON utilities: Automatic serialization of MongoDB types
- Type hints: Modern Python with type annotations
- Async/await: Full support for asynchronous operations
๐ง Installation
pip install aws-python-helper
๐ฆ Quick Reference
All available classes and functions:
| Class / Function | Import | Purpose |
|---|---|---|
API |
aws_python_helper.api.base |
Base class for REST endpoints |
api_handler |
aws_python_helper.api.handler |
Generic handler for API Gateway |
SQSConsumer |
aws_python_helper.sqs.consumer_base |
Base class for SQS consumers |
sqs_handler |
aws_python_helper.sqs.handler |
Factory handler for SQS |
SNSPublisher |
aws_python_helper.sns.publisher |
Base class for SNS publishers |
Lambda |
aws_python_helper.lambda_standalone.base |
Base class for Standalone Lambdas |
lambda_handler |
aws_python_helper.lambda_standalone.handler |
Factory handler for Lambda |
FargateTask |
aws_python_helper.fargate.task_base |
Base class for Fargate tasks |
FargateExecutor |
aws_python_helper.fargate.executor |
Launches Fargate tasks from Lambda |
fargate_handler |
aws_python_helper.fargate.handler |
Entry point handler for Fargate |
Repository |
aws_python_helper.repository.base |
Base class for MongoDB repositories |
ModelQueryLambda |
aws_python_helper |
Generic MongoDB query-proxy Lambda base class |
ModelIndexSyncLambda |
aws_python_helper |
Index-sync Lambda base class (creates indexes across core + all state DBs) |
LambdaInvoker |
aws_python_helper |
Invoke Lambda functions (sync/async) with error handling |
ApiClient |
aws_python_helper |
HTTP client for inter-service communication |
LambdaInvocationError |
aws_python_helper |
Raised when boto3 fails to invoke a Lambda |
LambdaResponseError |
aws_python_helper |
Raised when a Lambda returns a FunctionError |
ServiceNotConfiguredError |
aws_python_helper |
Raised when a service name is not in MICROSERVICE_URLS |
ApiClientError |
aws_python_helper |
Raised when an HTTP request fails (network/timeout) |
ApiResponseError |
aws_python_helper |
Raised when a remote API returns 4xx/5xx |
Session |
aws_python_helper |
Request-scoped session object (state + user) |
get_session |
aws_python_helper |
Read the current Session from async context |
set_session |
aws_python_helper |
Set the current Session in async context |
StateValidator |
aws_python_helper |
Validates a state against core.states (cached, active states only) |
InvalidStateError |
aws_python_helper |
Raised when a state is not found or not active |
MongoJSONEncoder |
aws_python_helper.utils.json_encoder |
JSON encoder for MongoDB types |
mongo_json_dumps |
aws_python_helper.utils.json_encoder |
Helper to serialize MongoDB types |
serialize_mongo_types |
aws_python_helper.utils.serializer |
Recursively serialize MongoDB types |
UnauthorizedError |
aws_python_helper.api.exceptions |
401 authentication exception |
ForbiddenError |
aws_python_helper.api.exceptions |
403 authorization exception |
๐ Project Structure
This framework follows a convention-based folder structure. Here's the recommended organization:
your-project/
โโโ src/
โโโ api/ # REST APIs
โ โโโ users/ # Resource folder (kebab-case)
โ โโโ get.py # GET /users/123 -> UserGetAPI
โ โโโ list.py # GET /users -> UserListAPI
โ โโโ post.py # POST /users -> UserPostAPI
โ โโโ put.py # PUT /users/123 -> UserPutAPI
โ โโโ delete.py # DELETE /users/123 -> UserDeleteAPI
โ
โโโ consumer/ # SQS Consumers (direct files)
โ โโโ user_created.py # user-created -> UserCreatedConsumer
โ โโโ title_indexed.py # title-indexed -> TitleIndexedConsumer
โ โโโ order_processed.py # order-processed -> OrderProcessedConsumer
โ
โโโ lambda/ # Standalone Lambdas (folders)
โ โโโ generate-route/ # generate-route -> GenerateRouteLambda
โ โ โโโ main.py
โ โโโ sync-carrier/ # sync-carrier -> SyncCarrierLambda
โ โ โโโ main.py
โ โโโ process-payment/ # process-payment -> ProcessPaymentLambda
โ โโโ main.py
โ
โโโ task/ # Fargate Tasks (folders)
โ โโโ search-tax-by-town/ # search-tax-by-town -> SearchTaxByTownTask
โ โ โโโ main.py # Entry point
โ โ โโโ task.py # Task class
โ โโโ process-data/ # process-data -> ProcessDataTask
โ โโโ main.py
โ โโโ task.py
โ
โโโ topic/ # SNS Publishers
โโโ order_created.py # OrderCreatedTopic
Naming Conventions
The framework uses automatic class name detection based on your folder/file structure:
| Type | Handler Name | File Path | Class Name |
|---|---|---|---|
| API | N/A | src/api/users/list.py |
UsersListAPI |
| Consumer | user-created |
src/consumer/user_created.py |
UserCreatedConsumer |
| Lambda | generate-route |
src/lambda/generate-route/main.py |
GenerateRouteLambda |
| Task | search-tax-by-town |
src/task/search-tax-by-town/task.py |
SearchTaxByTownTask |
Rules:
- Handler names use kebab-case (e.g.,
user-created,generate-route) - Consumer files use snake_case (e.g.,
user_created.py) - Lambda folders use kebab-case (e.g.,
generate-route/) - Task folders use kebab-case (e.g.,
search-tax-by-town/) - Class names always use PascalCase with suffix (e.g.,
UserCreatedConsumer)
๐ Basic Usage
Create an Endpoint
1. Create your API class in src/api/constitutions/list.py:
from aws_python_helper.api.base import API
class ConstitutionListAPI(API):
async def process(self):
# Direct access to MongoDB
constitutions = await self.db.constitution_db.constitutions.find().to_list(100)
self.set_body(constitutions)
2. The routing is automatic:
GET /constitutionsโsrc/api/constitutions/list.pyGET /constitutions/123โsrc/api/constitutions/get.pyPOST /constitutionsโsrc/api/constitutions/post.py
3. Configure the generic handler (src/handlers/api_handler.py):
from aws_python_helper.api.handler import api_handler
handler = api_handler
Create an SQS Consumer
1. Create your consumer in src/consumer/title_indexed.py:
from aws_python_helper.sqs.consumer_base import SQSConsumer
class TitleIndexedConsumer(SQSConsumer):
async def process_record(self, record):
body = self.extract_content_message(record)
# Your logic here
await self.db.constitution_db.titles.insert_one(body)
2. Configure the handler in src/handlers/sqs_handler.py:
from aws_python_helper.sqs.handler import sqs_handler
# Create a handler for each consumer and export it
title_indexed_handler = sqs_handler('title-indexed')
__all__ = ['title_indexed_handler']
Create a Standalone Lambda
Standalone lambdas are functions that can be invoked directly using the AWS SDK, without an HTTP endpoint. They're perfect for internal operations, integrations, and background processing tasks.
Differences with APIs:
- No API Gateway - invoked directly with AWS SDK
- No HTTP methods or routing
- Can be called from other lambdas, Step Functions, or any AWS service
- Perfect for internal microservices communication
1. Create your lambda class in src/lambda/generate-route/main.py:
from aws_python_helper.lambda_standalone.base import Lambda
from datetime import datetime
class GenerateRouteLambda(Lambda):
async def validate(self):
# Validate input data
if 'shipping_id' not in self.data:
raise ValueError("shipping_id is required")
if not isinstance(self.data['shipping_id'], str):
raise TypeError("shipping_id must be a string")
async def process(self):
# Your business logic here
shipping_id = self.data['shipping_id']
# Access to MongoDB
shipping = await self.db.deliveries.shippings.find_one(
{'_id': shipping_id}
)
if not shipping:
raise ValueError(f"Shipping {shipping_id} not found")
# Create route
route = {
'shipping_id': shipping_id,
'carrier_id': shipping.get('carrier_id'),
'status': 'pending',
'created_at': datetime.utcnow()
}
result = await self.db.deliveries.routes.insert_one(route)
self.logger.info(f"Route created: {result.inserted_id}")
# Return result
return {
'route_id': str(result.inserted_id),
'shipping_id': shipping_id
}
requires_state(session requirement) โ By default (requires_state = True) the framework requires the event to carry asessionwith a validstateand validates it againstcore.statesbefore running. Overriderequires_stateto returnFalsefor Lambdas that operate across all states and must not be tied to a single one (e.g. index synchronization) โ thesession/statethen becomes optional. See ModelIndexSyncLambda.class SyncAllStatesLambda(Lambda): @property def requires_state(self) -> bool: return False # no session.state required; runs across every state
2. Configure the handler in src/handlers/lambda_handler.py:
from aws_python_helper.lambda_standalone.handler import lambda_handler
# Create a handler for each lambda and export it
generate_route_handler = lambda_handler('generate-route')
sync_carrier_handler = lambda_handler('sync-carrier')
process_payment_handler = lambda_handler('process-payment')
__all__ = [
'generate_route_handler',
'sync_carrier_handler',
'process_payment_handler'
]
Note: The handler name 'generate-route' (kebab-case) will automatically look for:
- Folder:
src/lambda/generate-route/(kebab-case) - File:
main.py - Class:
GenerateRouteLambda
3. Invoke from another Lambda or API using boto3:
import boto3
import json
lambda_client = boto3.client('lambda')
# Invoke synchronously (RequestResponse)
response = lambda_client.invoke(
FunctionName='GenerateRouteLambda',
InvocationType='RequestResponse',
Payload=json.dumps({
'session': {'state': 'connecticut'}, # Required
'data': {
'shipping_id': '507f1f77bcf86cd799439011'
}
})
)
result = json.loads(response['Payload'].read())
# {'success': True, 'data': {'route_id': '...', 'shipping_id': '...'}}
if result['success']:
print(f"Route created: {result['data']['route_id']}")
else:
print(f"Error: {result['error']}")
4. Invoke asynchronously (fire and forget):
# Invoke asynchronously (Event)
lambda_client.invoke(
FunctionName='GenerateRouteLambda',
InvocationType='Event', # Asynchronous
Payload=json.dumps({
'session': {'state': 'connecticut'}, # Required
'data': {
'shipping_id': '507f1f77bcf86cd799439011'
}
})
)
# Returns immediately without waiting for the result
Naming Convention:
| Lambda Name (kebab-case) | Folder | File | Class |
|---|---|---|---|
generate-route |
src/lambda/generate-route/ |
main.py |
GenerateRouteLambda |
sync-carrier |
src/lambda/sync-carrier/ |
main.py |
SyncCarrierLambda |
process-payment |
src/lambda/process-payment/ |
main.py |
ProcessPaymentLambda |
send-notification |
src/lambda/send-notification/ |
main.py |
SendNotificationLambda |
Common Use Cases:
- Internal microservices communication
- Background data processing
- Integration with external services
- Scheduled tasks (with EventBridge)
- Step Functions workflows
- Cross-service operations
Publish to SNS
1. Create your topic in src/topic/title_indexed.py:
from aws_python_helper.sns.publisher import SNSPublisher
import os
class TitleIndexedTopic(SNSPublisher):
def __init__(self):
super().__init__(
topic_arn=os.getenv('TITLE_INDEXED_SNS_TOPIC_ARN')
)
def build_message(self, constitution_id, title, event_type='title_indexed'):
return {
'content': {
'constitution_id': constitution_id,
'title': title,
'event_type': event_type
},
'attributes': {
'event_type': event_type # Used for SNS subscription filtering
}
}
2. Use the topic from anywhere:
from src.topic.title_indexed import TitleIndexedTopic
# In a consumer, API or task
topic = TitleIndexedTopic()
# Publish a single message
await topic.publish(topic.build_message('123', 'My Constitution'))
# Publish multiple messages in batch
messages = [
topic.build_message('id1', 'Constitution A'),
topic.build_message('id2', 'Constitution B'),
]
await topic.publish(messages)
Message format โ every message must have a content key:
{
'content': {...}, # Required: message body (any dict)
'attributes': {...}, # Optional: SNS message attributes for filtering
'subject': 'Optional subject' # Optional: message subject
}
Run a Fargate Task
1. Create your task in src/task/search-tax-by-town/task.py:
from aws_python_helper.fargate.task_base import FargateTask
class SearchTaxByTownTask(FargateTask):
async def execute(self):
town = self.require_env('TOWN')
self.logger.info(f"Processing town: {town}")
# Access to DB
docs = await self.db.smart_data.address.find({'town': town}).to_list()
# Your logic here
for doc in docs:
# Process document
pass
2. Create the entry point in src/task/search-tax-by-town/main.py:
from aws_python_helper.fargate.handler import fargate_handler
import sys
if __name__ == '__main__':
exit_code = fargate_handler('search-tax-by-town')
sys.exit(exit_code)
3. Create the Dockerfile in src/task/search-tax-by-town/Dockerfile:
FROM python:3.10.12-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt /app/framework_requirements.txt
COPY src/task/search-tax-by-town/requirements.txt /app/task_requirements.txt
RUN pip install -r /app/framework_requirements.txt && \
pip install -r /app/task_requirements.txt
# Copy code
COPY aws_python_helper /app/aws_python_helper
COPY config.py /app/config.py
COPY task /app/task
COPY task/search-tax-by-town/main.py /app/main.py
ENV PYTHONUNBUFFERED=1
CMD ["python", "main.py"]
4. Invoke from Lambda:
from aws_python_helper.fargate.executor import FargateExecutor
def handler(event, context):
executor = FargateExecutor()
# session is auto-propagated as SESSION env var (JSON) in the container
task_arn = executor.run_task(
'search-tax-by-town',
envs={'TOWN': 'Norwalk', 'ONLY_TAX': 'true'}
)
return {'taskArn': task_arn}
๐๏ธ Access to MongoDB
The framework provides flexible access to multiple databases:
class MyAPI(API):
async def process(self):
# Access to different databases on the same cluster
user = await self.db.users_db.users.find_one({'_id': user_id})
# Another database
await self.db.analytics_db.logs.insert_one({'action': 'view'})
# Multiple collections
titles = await self.db.constitution_db.titles.find().to_list(100)
articles = await self.db.constitution_db.articles.find().to_list(100)
The pattern is always: self.db.<database_name>.<collection_name>.<motor_operation>()
External MongoDB Clusters
Connect to additional MongoDB clusters using EXTERNAL_MONGODB_CONNECTIONS:
EXTERNAL_MONGODB_CONNECTIONS='[
{"name": "ClusterDockets", "connection_string": "mongodb+srv://cluster.mongodb.net"},
{"name": "ClusterAnalytics", "connection_string": "mongodb+srv://analytics.mongodb.net"}
]'
The credentials from MONGO_DB_USER / MONGO_DB_PASSWORD are automatically injected into the connection strings.
Access external clusters via self.external_db:
class AddressAPI(API):
async def process(self):
# Access external cluster: self.external_db.<ClusterName>.<database>.<collection>
addresses = await self.external_db.ClusterDockets.smart_data.addresses.find(
{'town': self.data['town']}
).to_list(100)
self.set_body({'addresses': addresses})
self.external_db is available in API, SQSConsumer, Lambda, and FargateTask.
๐๏ธ Repository Pattern
The framework provides a Repository base class that eliminates repetitive boilerplate in data access layers. Each repository only declares what collection it uses, whether it belongs to an external cluster, and what indexes it needs. The base class handles the MongoDB connection automatically.
Index creation is not triggered at request time. Declaring
indexesonly describes the indexes; it does not create them on collection access (this avoids per-request overhead and half-finished index builds in short-lived Lambda/API runtimes). Indexes are created by theModelIndexSyncLambdaโ run once per deploy โ or by callingawait repo.ensure_indexes()explicitly.
Properties to override
| Property | Type | Default | Required |
|---|---|---|---|
collection_name |
str |
โ | Yes |
database_key |
str | None |
None |
No โ if None, uses session.state from context |
is_external |
bool |
False |
No |
cluster_name |
str |
None |
Only if is_external=True |
indexes |
list |
[] |
No |
database_key controls how the database is resolved:
database_key = "core"(or any string) โ always connects to that specific database.database_key = None(default) โ readssession.statefrom context automatically. This makes the repository state-scoped: it connects to"connecticut","new_jersey", etc. depending on the current request.
Collections are cached per (database_name, collection_name) key โ state-scoped repositories correctly isolate state between concurrent requests.
Index format
@property
def indexes(self):
return [
{"key": [("field", 1)]}, # simple ASC
{"key": [("field", -1)]}, # simple DESC
{"key": [("f1", 1), ("f2", -1)], "unique": True}, # compound + unique
{"key": [("expires_at", 1)], "expireAfterSeconds": 0}, # TTL index
]
These definitions are consumed by ensure_indexes(), which the ModelIndexSyncLambda runs across the relevant databases at deploy time. ensure_indexes() is idempotent (safe to re-run) and accepts an explicit database_name so the same state-scoped repository can be materialized in every state database:
# Create this repository's indexes in a specific database (what the index-sync Lambda does):
await repo.ensure_indexes(database_name="connecticut")
# Or, with no argument, in the repository's resolved database (database_key or session.state):
await repo.ensure_indexes()
Repository with a fixed database
from aws_python_helper import Repository
class TownsRepository(Repository):
@property
def collection_name(self):
return "towns"
@property
def database_key(self):
return "core" # always connects to the "core" database
@property
def indexes(self):
return [
{"key": [("name", 1)]},
{"key": [("platform", 1)]},
]
async def get_available(self, platforms):
return await self.collection.find(
{"platform": {"$in": platforms}},
{"name": 1, "platform": 1}
).to_list(length=None)
async def find_by_name(self, name):
return await self.collection.find_one({"name": name})
State-scoped repository (no database_key)
When database_key is not set, the repository reads session.state from context and uses it as the database name. The same repository instance connects to "connecticut" for one request and to "new_jersey" for another โ automatically.
from aws_python_helper import Repository
class LandRecordsRepository(Repository):
@property
def collection_name(self):
return "records"
# No database_key โ uses session.state automatically
# If session.state = "connecticut" โ connects to DB "connecticut"
# If session.state = "new_jersey" โ connects to DB "new_jersey"
@property
def indexes(self):
return [
{"key": [("unique_id", 1)]},
{"key": [("owner", 1), ("town", 1)]},
]
async def bulk_upsert(self, records):
from pymongo import UpdateOne
operations = [
UpdateOne({"unique_id": r["unique_id"]}, {"$set": r}, upsert=True)
for r in records
]
result = await self.collection.bulk_write(operations)
return {"upserted": result.upserted_count, "modified": result.modified_count}
Note: A
ValueErroris raised at runtime ifdatabase_keyisNoneandsession.statehas not been set. This is prevented automatically by the framework at every entry point (API, Lambda, SQS, Fargate).
Repository on an external cluster
from aws_python_helper import Repository
class AddressRepository(Repository):
@property
def database_key(self):
return "smart_data"
@property
def collection_name(self):
return "address"
@property
def is_external(self):
return True
@property
def cluster_name(self):
return "ClusterDockets" # Must match a name in EXTERNAL_MONGODB_CONNECTIONS
async def find_by_query(self, query, limit=None):
cursor = self.collection.find(query)
if limit:
cursor = cursor.limit(limit)
return await cursor.to_list(length=None)
Instantiation โ no db argument needed
class MyAPI(API):
@property
def towns_repository(self):
if not self._towns_repository:
self._towns_repository = TownsRepository() # no args!
return self._towns_repository
async def process(self):
towns = await self.towns_repository.get_available(["platform_a", "platform_b"])
self.set_body({"towns": towns})
The repository connects itself using the already-initialized MongoManager singleton โ the same one used by self.db. No need to pass self.db or any connection object.
๐ Session Context
The framework propagates a Session object automatically across the entire async call chain using Python's contextvars.ContextVar. The session holds state (for multi-state DB routing) and user (authenticated user from the auth middleware).
How the framework injects it at each entry point
| Entry point | How the session is read |
|---|---|
| API Gateway | constitution-state header โ session.state (when AUTHORIZATION includes state); auth middleware โ session.user (when includes user). Returns 400 if required header is missing |
| Standalone Lambda | session dict in the event payload โ required by default (must include state), raises ValueError if missing. A Lambda can opt out via requires_state = False (e.g. ModelIndexSyncLambda), making the session optional |
| SQS Consumer (single mode) | Per-record: reads session from SNS MessageAttributes (Base64-encoded JSON) |
| SQS Consumer (batch mode) | Groups records by session.state; calls process_batch() once per group with the correct session in context |
| Fargate Task | SESSION env var (JSON) โ auto-injected by FargateExecutor |
How the framework propagates it to downstream services
| Downstream service | Propagation mechanism |
|---|---|
| SNS Publisher | Auto-injects the full session as a session MessageAttribute (Base64-encoded JSON) on every published message |
| FargateExecutor | Auto-injects SESSION as a JSON env var when launching Fargate containers |
This means that an API call with constitution-state: connecticut will automatically carry the full session (state + user) through SNS โ SQS โ Fargate without any code changes in your consumers or tasks.
Accessing the session in handlers
All handler base classes expose a self.session property:
class MyAPI(API):
async def process(self):
state = self.session.state # e.g. "connecticut"
user = self.session.user # authenticated user dict, or None
Available in API, SQSConsumer, Lambda, and FargateTask.
State-scoped repositories
Repositories with no database_key (default) read session.state from context to resolve the target database automatically. See the Repository Pattern section for details.
Manual access
If you need to read or set the session manually (e.g., in tests or utility code):
from aws_python_helper import Session, get_session, set_session
session = get_session() # returns current Session (creates empty one if not set)
session.state # e.g. "connecticut", or None if not set
session.user # authenticated user dict, or None
set_session(Session(state="new_jersey")) # set manually (the framework does this automatically)
API example โ constitution-state header
GET /constitutions HTTP/1.1
constitution-state: connecticut
Authorization: Bearer <token>
Lambda invocation example โ session in event
import boto3, json
lambda_client = boto3.client('lambda')
lambda_client.invoke(
FunctionName='MyLambdaFunction',
InvocationType='RequestResponse',
Payload=json.dumps({
'session': {'state': 'connecticut'}, # Required
'data': {'key': 'value'}
})
)
๐ Routing Convention
The framework uses convention over configuration for the routing:
| Request | Loaded file |
|---|---|
GET /users |
api/users/list.py |
GET /users/123 |
api/users/get.py |
POST /users |
api/users/post.py |
PUT /users/123 |
api/users/put.py |
DELETE /users/123 |
api/users/delete.py |
GET /users/123/posts |
api/users/posts/list.py |
GET /users/123/posts/456 |
api/users/posts/get.py |
Logic:
- The parts with even indices (0,2,4...) are directories
- The parts with odd indices (1,3,5...) are path parameters
GETwith odd number of parts โ list methodGETwith even number of parts โ get method- Other methods use their name directly
๐งฉ API Class Reference
All properties and methods available inside an API subclass:
Request Properties
| Property | Type | Description |
|---|---|---|
self.data |
dict |
Request body (POST/PUT) or query params (GET) |
self.headers |
dict |
HTTP request headers |
self.path_parameters |
dict |
URL path parameters (e.g. /users/123 โ {'id': '123'}) |
self.query_parameters |
dict |
Query string parameters |
self.db |
DatabaseProxy |
Access to main MongoDB cluster |
self.external_db |
ExternalDatabaseProxy |
Access to external MongoDB clusters |
self.session |
Session |
Request-scoped session (session.state, session.user) |
self.current_user |
dict | None |
Authenticated user document (requires REQUIRE_AUTH=true) |
self.is_authenticated |
bool |
Whether the request is authenticated |
self.auth_data |
dict | None |
Full authentication data |
Response Methods
| Method | Description |
|---|---|
self.set_code(code: int) |
Set HTTP response status code |
self.set_body(body: Any) |
Set response body (auto-serialized to JSON) |
self.set_header(key: str, value: str) |
Add a single response header |
self.set_headers(headers: dict) |
Set multiple response headers at once |
Methods to Override
| Method | Required | Description |
|---|---|---|
async validate() |
Optional | Validate request data, raise exceptions to reject |
async process() |
Required | Main business logic |
class UserGetAPI(API):
async def validate(self):
# Access path params: /users/123 โ self.path_parameters = {'id': '123'}
if not self.path_parameters.get('id'):
raise ValueError("User ID is required")
async def process(self):
user_id = self.path_parameters['id']
user = await self.db.users_db.users.find_one({'_id': user_id})
if not user:
self.set_code(404)
self.set_body({'error': 'User not found'})
return
self.set_code(200)
self.set_body({'data': user})
self.set_header('X-Resource-Id', user_id)
๐ Authentication
The framework includes a built-in token-based authentication middleware.
Configuration
AUTHORIZATION=full # Authorization mode: 'user', 'state', or 'full' (default: empty/disabled)
AUTH_DB_NAME=my_database # MongoDB database where tokens are stored
AUTH_BYPASS_TOKEN=secret123 # Master token to bypass auth (for internal use)
Using the authenticated user
When AUTHORIZATION is user or full, every request must include a valid Authorization: Bearer <token> header. The authenticated user is available via self.current_user:
class OrderListAPI(API):
async def process(self):
# self.current_user contains the user document from MongoDB
user_id = self.current_user['_id']
orders = await self.db.orders_db.orders.find(
{'user_id': user_id}
).to_list(100)
self.set_body({'data': orders})
Auth exceptions
Use these exceptions in your validate() or process() methods:
from aws_python_helper.api.exceptions import UnauthorizedError, ForbiddenError
class AdminOnlyAPI(API):
async def validate(self):
if not self.is_authenticated:
raise UnauthorizedError("Authentication required") # Returns 401
if self.current_user.get('role') != 'admin':
raise ForbiddenError("Admin access required") # Returns 403
๐ฏ Complete Example
# src/api/constitutions/list.py
from aws_python_helper.api.base import API
class ConstitutionListAPI(API):
async def validate(self):
if 'limit' in self.data:
limit = int(self.data['limit'])
if limit > 1000:
raise ValueError("Limit cannot exceed 1000")
async def process(self):
# Build filters
filters = {}
if 'country' in self.data:
filters['country'] = self.data['country']
# Query MongoDB
limit = int(self.data.get('limit', 100))
results = await self.db.constitution_db.constitutions.find(
filters
).limit(limit).to_list(limit)
# Count total
total = await self.db.constitution_db.constitutions.count_documents(filters)
# Register in analytics
await self.db.analytics_db.searches.insert_one({
'filters': filters,
'result_count': len(results)
})
# Response
self.set_body({
'data': results,
'total': total
})
self.set_header('X-Total-Count', str(total))
๐ Integration Example: API + Standalone Lambda
Here's a complete example showing how an API can invoke a standalone lambda:
Scenario: An API endpoint that creates a shipping and then asynchronously generates its route using a standalone lambda.
1. The API endpoint (src/api/shippings/post.py):
from pydantic import BaseModel
from aws_python_helper import API, LambdaInvoker
class ShippingSchema(BaseModel):
customer_id: str
address: str
items: list[str]
class ShippingPostAPI(API):
@property
def schema(self):
return ShippingSchema # automatic validation โ 400 on failure
async def process(self):
shipping = {
'customer_id': self.data['customer_id'],
'address': self.data['address'],
'items': self.data['items'],
'status': 'pending',
'route_pending': True
}
result = await self.db.deliveries.shippings.insert_one(shipping)
shipping_id = str(result.inserted_id)
# Invoke standalone lambda asynchronously (fire-and-forget)
invoker = LambdaInvoker()
invoker.invoke_async('GenerateRouteLambda', payload={
'data': {'shipping_id': shipping_id}
})
self.set_code(201)
self.set_body({
'shipping_id': shipping_id,
'status': 'pending',
'message': 'Shipping created, route generation in progress'
})
2. The standalone lambda (src/lambda/generate-route/main.py):
from aws_python_helper.lambda_standalone.base import Lambda
class GenerateRouteLambda(Lambda):
async def validate(self):
if 'shipping_id' not in self.data:
raise ValueError("shipping_id is required")
async def process(self):
shipping_id = self.data['shipping_id']
# Get shipping details
shipping = await self.db.deliveries.shippings.find_one(
{'_id': shipping_id}
)
if not shipping:
raise ValueError(f"Shipping {shipping_id} not found")
# Generate optimal route
route = await self.calculate_optimal_route(shipping)
# Save route
route_result = await self.db.deliveries.routes.insert_one(route)
# Update shipping
await self.db.deliveries.shippings.update_one(
{'_id': shipping_id},
{'$set': {
'route_id': route_result.inserted_id,
'route_pending': False,
'status': 'scheduled'
}}
)
return {
'route_id': str(route_result.inserted_id),
'shipping_id': shipping_id
}
async def calculate_optimal_route(self, shipping):
# Your route calculation logic here
return {
'shipping_id': shipping['_id'],
'carrier_id': shipping.get('carrier_id'),
'estimated_duration': 60,
'status': 'pending'
}
3. Configure handlers (src/handlers/lambda_handler.py):
from aws_python_helper.lambda_standalone.handler import lambda_handler
generate_route_handler = lambda_handler('generate-route')
__all__ = ['generate_route_handler']
Benefits of this pattern:
- API responds immediately (better UX)
- Route generation happens in the background
- Decoupled services (easier to maintain)
- Can retry lambda independently if it fails
- Scalable architecture
โ Pydantic Schema Validation
The framework supports automatic request validation via Pydantic v2. Override the schema property on any API or Lambda subclass to declare the expected shape of self.data. The framework validates before calling validate() and replaces self.data with the coerced output โ the property name stays the same.
In an API endpoint
For POST/PUT, self.data is the request body. For GET/LIST, it is the query parameters. Both are validated the same way.
from pydantic import BaseModel
from aws_python_helper import API
class SearchSchema(BaseModel):
keys: list[str]
state: str
limit: int = 100
class SearchPostAPI(API):
@property
def schema(self):
return SearchSchema # โ that's all
async def process(self):
keys = self.data["keys"] # self.data is validated and coerced
limit = self.data["limit"] # default applied by Pydantic
...
On validation failure the framework automatically returns HTTP 400 โ no extra code needed.
In a Standalone Lambda
Same property, same behavior. On failure a ValueError is raised, which the handler returns as {"success": False, "error": "..."}.
from pydantic import BaseModel
from aws_python_helper import Lambda
class SyncSchema(BaseModel):
shipping_id: str
carrier_id: str
class SyncCarrierLambda(Lambda):
@property
def schema(self):
return SyncSchema
async def process(self):
shipping_id = self.data["shipping_id"] # validated
...
Without a schema
If schema is not overridden (returns None, the default), the behavior is identical to before โ no change.
๐ Inter-Service Communication
The framework provides two utilities for calling other services: LambdaInvoker for direct Lambda invocations and ApiClient for HTTP API calls.
LambdaInvoker
Wraps boto3 Lambda invocations with proper error handling. Never returns None โ raises typed exceptions instead.
from aws_python_helper import LambdaInvoker, LambdaInvocationError, LambdaResponseError
invoker = LambdaInvoker()
# Synchronous โ waits for result
result = invoker.invoke("my-function-name", payload={"key": "value"})
# result is the parsed dict returned by the Lambda
# Asynchronous fire-and-forget
invoker.invoke_async("my-function-name", payload={"key": "value"})
Exceptions:
| Exception | When |
|---|---|
LambdaInvocationError |
boto3 could not reach the function (network, permissions, etc.) |
LambdaResponseError |
The function itself raised an unhandled exception (FunctionError in the response) |
Note: boto3 is part of the AWS Lambda Python runtime โ no installation needed. For Fargate containers, add boto3 to your requirements.txt.
ApiClient
HTTP client for service-to-service calls. Resolves the target URL and auth token automatically from environment variables โ no hardcoded URLs or tokens in code.
from aws_python_helper import ApiClient
client = ApiClient("dockets") # service name only
result = await client.post("/search", body={"keys": [...]})
result = await client.get("/dockets/123")
result = await client.list("/dockets", params={"state": "CT"})
result = await client.put("/dockets/123", body={"status": "active"})
result = await client.patch("/dockets/123", body={"reviewed": True})
result = await client.delete("/dockets/123")
# Extra headers merged with the defaults (token is always injected automatically)
client = ApiClient("dockets", headers={"constitution-state": "CT"})
How URL and token are resolved:
| Env var | Purpose |
|---|---|
MICROSERVICE_URLS |
JSON map: {"dockets": "https://api.example.com/", "title-search": "https://..."} |
INTER_SERVICE_TOKEN |
Bearer token injected as Authorization: Bearer <token> |
AUTH_BYPASS_TOKEN |
Fallback if INTER_SERVICE_TOKEN is not set |
Exceptions:
| Exception | When |
|---|---|
ServiceNotConfiguredError |
Service name not found in MICROSERVICE_URLS |
ApiClientError |
Network error or timeout |
ApiResponseError |
Remote API returned 4xx or 5xx (has .status_code and .response_body attributes) |
Terraform configuration โ set once per environment in Secrets Manager with keys inter_service_token and microservice_urls. The Lambda and Fargate modules pick them up automatically from infrastructure outputs; no extra variables needed in resource files beyond passing them through:
# In AWS Secrets Manager secret (JSON):
# {
# "inter_service_token": "my-secret-token",
# "microservice_urls": "{\"dockets\": \"https://api-dockets.execute-api.us-east-2.amazonaws.com/\"}",
# ...
# }
# In Terraform module call (already handled by the modules):
module "my_lambda" {
source = "../modules/lambda"
inter_service_token = data.terraform_remote_state.infrastructure.outputs.inter_service_token
microservice_urls = data.terraform_remote_state.infrastructure.outputs.microservice_urls
...
}
๐ ModelQueryLambda
ModelQueryLambda is a ready-to-use base class for creating MongoDB query-proxy Lambdas. Instead of writing a custom Lambda every time another microservice needs to read data from your MongoDB, you subclass ModelQueryLambda, declare an allowed_collections whitelist, and the framework exposes both find and aggregate operations automatically.
Use case: microservice A needs to query data owned by microservice B. Microservice B deploys one model-query Lambda; microservice A calls it via LambdaInvoker passing the collection name and query parameters.
Creating a model-query Lambda
Create src/lambda/model-query/main.py in the microservice that owns the data:
from aws_python_helper import ModelQueryLambda
class OrdersModelQueryLambda(ModelQueryLambda):
@property
def allowed_collections(self) -> list:
return ["orders", "customers"] # whitelist โ unlisted collections are rejected
Class discovery: name your subclass distinctly from the imported base (e.g.
OrdersModelQueryLambda, notModelQueryLambda). The framework loads the class defined in the file โ not the imported base โ so subclassing an imported concrete base (ModelQueryLambda,ModelIndexSyncLambda) works without name collisions.
Register the handler in src/handlers/lambda_handler.py:
from aws_python_helper.lambda_standalone.handler import lambda_handler
model_query_handler = lambda_handler('model-query')
__all__ = ["model_query_handler"]
Query modes
Aggregate pipeline
Pass a pipeline list to run a MongoDB aggregation:
invoker.invoke("ServiceModelQuery-dev", payload={
"session": self.session.to_dict(),
"collection": "orders",
"pipeline": [
{"$match": {"status": "pending"}},
{"$sort": {"created_at": -1}},
{"$limit": 50},
],
})
Find with filter
Pass filter, fields, limit, and/or skip to run a find:
invoker.invoke("ServiceModelQuery-dev", payload={
"session": self.session.to_dict(),
"collection": "orders",
"filter": {"customer_id": "abc123", "status": "active"},
"fields": {"_id": 1, "total": 1, "status": 1}, # projection
"limit": 20,
"skip": 0,
})
Note: pipeline and fields are mutually exclusive โ the framework rejects payloads that include both.
Querying by _id
ObjectId hex strings in _id filters are automatically coerced to ObjectId so callers can serialize queries as plain JSON:
# Plain value
invoker.invoke("ServiceModelQuery-dev", payload={
"session": self.session.to_dict(),
"collection": "orders",
"filter": {"_id": "65f1a2b3c4d5e6f7a8b9c0d1"},
})
# Operator with list
invoker.invoke("ServiceModelQuery-dev", payload={
"session": self.session.to_dict(),
"collection": "orders",
"filter": {"_id": {"$in": ["65f1...", "65f2...", "65f3..."]}},
})
# 'id' is accepted as an alias for '_id'
invoker.invoke("ServiceModelQuery-dev", payload={
"session": self.session.to_dict(),
"collection": "orders",
"filter": {"id": "65f1a2b3c4d5e6f7a8b9c0d1"},
})
Supported operators on _id: scalar ($eq, $ne, $gt, $gte, $lt, $lte) and list ($in, $nin). The same coercion is applied to $match stages inside a pipeline. If the string is not a valid 24-char ObjectId hex, the Lambda raises a clear validation error.
Payload reference
| Field | Type | Required | Description |
|---|---|---|---|
session |
dict |
Yes | Session dict from self.session.to_dict() โ drives state-scoped DB routing |
collection |
str |
Yes | Collection name (must be in allowed_collections) |
pipeline |
list |
One of | MongoDB aggregation pipeline |
filter |
dict |
One of | MongoDB filter document (defaults to {}). _id/id string values are auto-coerced to ObjectId |
fields |
dict |
No | MongoDB projection (only valid with filter mode) |
limit |
int |
No | Max documents to return |
skip |
int |
No | Documents to skip (default 0) |
Response format
The Lambda always returns the standard framework envelope:
{
"success": true,
"data": [ ... ]
}
On validation error (unknown collection, invalid payload):
{
"success": false,
"error": "Collection 'unknown' is not allowed. Allowed: orders, customers"
}
Calling it from another microservice
import os
from aws_python_helper import LambdaInvoker, LambdaInvocationError
class OrdersPostAPI(API):
async def process(self):
try:
response = LambdaInvoker().invoke(
os.getenv("ORDERS_MODEL_QUERY_LAMBDA_NAME"),
payload={
"session": self.session.to_dict(),
"collection": "orders",
"filter": {"status": "pending"},
"limit": 100,
},
)
except LambdaInvocationError as e:
self.logger.error(f"model-query invocation failed: {e}")
raise ValueError("Could not retrieve orders. Please try again later.")
if not response.get("success"):
raise ValueError("Could not retrieve orders. Please try again later.")
orders = response.get("data", [])
self.set_body({"orders": orders})
Session and database routing
The session field in the payload is required. The ModelQueryLambda uses session.state to resolve the target database โ the same state-scoped routing used by all framework components. This means:
{"session": {"state": "connecticut"}, ...}โ queries theconnecticutdatabase{"session": {"state": "new_jersey"}, ...}โ queries thenew_jerseydatabase
Always pass self.session.to_dict() when calling from an API or Lambda to ensure state propagates correctly.
๐งฑ ModelIndexSyncLambda
ModelIndexSyncLambda is a ready-to-use base class for creating all of a service's MongoDB indexes deterministically, in one run. Because index creation is no longer triggered at request time (see the Repository Pattern note), this Lambda is the single source of truth for materializing indexes โ typically invoked once per deploy.
Why: in short-lived Lambda/API runtimes, creating indexes lazily on first access often never finishes (the function responds and freezes). This Lambda creates every declared index up front, await-ing each one, idempotently โ and across every active state database, not just the one of the current request.
Creating an index-sync Lambda
Create src/lambda/model-index-sync/main.py and declare which repositories to sync:
import aws_python_helper
from repositories.parcels import ParcelsRepository # state-scoped (database_key = None)
from repositories.users import UsersRepository # core (database_key = "core")
from repositories.tokens import TokensRepository # core
class OrdersModelIndexSyncLambda(aws_python_helper.ModelIndexSyncLambda):
@property
def repositories(self) -> list:
return [ParcelsRepository, UsersRepository, TokensRepository]
Register the handler in src/handlers/lambda_handler.py:
from aws_python_helper.lambda_standalone.handler import lambda_handler
index_sync_handler = lambda_handler('model-index-sync')
__all__ = ["index_sync_handler"]
Declare repositories explicitly (don't auto-discover) so the synced set is intentional. List your own collections; external repositories (
is_external = True) belong to another cluster/service and are typically excluded.
How it routes each repository
For every repository in the list, the Lambda decides where to create its indexes:
| Repository kind | database_key |
Target databases |
|---|---|---|
| Core / fixed | "core" (or any string) |
that single database |
| State-scoped | None |
every active state, read from core.states where is_active = true (e.g. connecticut, new_jersey, โฆ) |
A state-scoped collection such as parcels therefore gets its indexes created in connecticut, new_jersey, and any other active state โ automatically. Creation is idempotent, so re-running it is safe.
This Lambda sets requires_state = False: it is not tied to a single state and does not require a session in the event.
Response format
{
"success": true,
"data": [
{"repository": "ParcelsRepository", "database": "connecticut", "collection": "parcels", "indexes": ["parcel_id_1", "..."]},
{"repository": "ParcelsRepository", "database": "new_jersey", "collection": "parcels", "indexes": ["parcel_id_1", "..."]},
{"repository": "UsersRepository", "database": "core", "collection": "users", "indexes": ["email_1"]}
]
}
Properties to override / configure
| Property | Default | Purpose |
|---|---|---|
repositories |
[] |
Required. List of Repository subclasses (the classes, not instances) to sync. Empty raises a validation error |
states_database |
"core" |
Database holding the states collection |
states_collection |
"states" |
Collection listing the states (filtered by is_active = true, field name) |
requires_state |
False |
Inherited override โ the Lambda runs across all states, no session required |
Running it once per deploy (Terraform)
Since deploys are applied manually with Terraform, attach an aws_lambda_invocation that runs on every apply, right after the function code is updated. timestamp() changes on every plan, so it always re-runs (creation is idempotent and also picks up any newly-activated state). The postcondition fails the apply if the sync reports success = false (the framework handler returns HTTP 200 with success = false on internal errors, so this check is required to surface failures):
module "lambda" {
source = "../../../../../modules/lambda"
function_name = "${...service_name}${var.lambda_name}-${...env}"
source_handler = "handlers.lambda_handler.index_sync_handler"
timeout = 300
memory_size = 512
# ... mongo + vpc config (same as any other Lambda)
}
resource "aws_lambda_invocation" "sync_on_deploy" {
function_name = module.lambda.lambda_function_name
input = jsonencode({ _invoked_at = timestamp() }) # always changes โ always re-runs
depends_on = [module.lambda] # run after the code is updated
lifecycle {
postcondition {
condition = try(jsondecode(self.result).success, false) == true
error_message = "Index sync failed during apply: ${self.result}"
}
}
}
You can also invoke it manually at any time (e.g. after activating a new state without a code deploy):
aws lambda invoke --function-name MyServiceModelIndexSync-dev out.json
๐๏ธ Architecture Overview
Typical flow for event-driven architectures using this framework:
โโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Client โโโโโโถโ API Gateway โโโโโโถโ Lambda: api_handler โ
โโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ (src/api/resource/post.py) โ
โ โ validates, queries MongoDB, โ
โ publishes to SNS โ
โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโ
โ
โผ
โโโโโโโโโโโโโโโโโโโ
โ SNS Topic โ
โ (fanout/filter) โ
โโโโโโโโโโฌโโโโโโโโโ
โโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโ
โผ โผ โผ
โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
โ SQS Queue โ โ SQS Queue โ โ SQS Queue โ
โ Platform Aโ โ Platform Bโ โ Platform Cโ
โโโโโโโฌโโโโโโโ โโโโโโโฌโโโโโโโ โโโโโโโฌโโโโโโโ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Lambda: sqs_handler โ
โ (src/consumer/platform_consumer.py) โ
โ โ groups messages, acquires sessions, โ
โ launches Fargate tasks โ
โโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FargateExecutor.run_task()
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Fargate Task: fargate_handler โ
โ (src/task/my-task/task.py) โ
โ โ scrapes/processes data, โ
โ writes results to MongoDB โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ Environment Variables
MongoDB Configuration
The framework supports two ways to configure MongoDB:
Option 1: Full Connection String
# Full URI with embedded credentials
MONGODB_URI=mongodb+srv://user:password@cluster.mongodb.net/dbname?retryWrites=true&w=majority
# or
MONGO_DB_URI=mongodb+srv://user:password@cluster.mongodb.net/dbname
Option 2: Separate Components (Recommended for Terraform)
# Host without credentials
MONGO_DB_HOST=mongodb+srv://cluster.mongodb.net
# Credentials (more secure)
MONGO_DB_USER=admin
MONGO_DB_PASSWORD=my-secure-password
# Optional
MONGO_DB_NAME=my_database
MONGO_DB_OPTIONS=retryWrites=true&w=majority
Benefits of separate components:
- โ Better security: credentials separate from host
- โ Easy integration with Terraform/AWS Secrets Manager
- โ Passwords with special characters are handled automatically
- โ More flexible for different environments
The framework automatically:
- URL-encodes the password (handles
@,:,/, etc.) - Builds the full URI
- Initializes the connection
Terraform Example
environment_variables = {
MONGO_DB_HOST = module.mongodb.connection_string
MONGO_DB_USER = module.mongodb.database_user
MONGO_DB_PASSWORD = module.mongodb.database_password
}
All Environment Variables
| Variable | Required | Description |
|---|---|---|
MONGODB_URI or MONGO_DB_URI |
One of these or components below | Full MongoDB connection string |
MONGO_DB_HOST |
Alt. to URI | MongoDB host (e.g. mongodb+srv://cluster.net) |
MONGO_DB_USER |
Alt. to URI | MongoDB username |
MONGO_DB_PASSWORD |
Alt. to URI | MongoDB password |
MONGO_DB_NAME |
Optional | Default database name |
MONGO_DB_OPTIONS |
Optional | Connection options (e.g. retryWrites=true&w=majority) |
EXTERNAL_MONGODB_CONNECTIONS |
Optional | JSON array of external cluster configurations |
REQUIRE_AUTH |
Optional | Enable authentication middleware (true/false) |
AUTH_DB_NAME |
If REQUIRE_AUTH=true |
MongoDB database for token validation |
AUTH_BYPASS_TOKEN |
Optional | Master token to bypass authentication |
INTER_SERVICE_TOKEN |
Optional | Bearer token for ApiClient service-to-service calls |
MICROSERVICE_URLS |
Optional | JSON map of service name โ base URL used by ApiClient |
ECS_CLUSTER |
Fargate only | ECS cluster name for FargateExecutor |
ECS_SUBNETS |
Fargate only | Comma-separated subnet IDs for Fargate tasks |
CONSTITUTION_STATE |
Fargate only (auto) | State injected automatically by FargateExecutor โ do not set manually |
AWS_REGION |
Fargate/SNS/SQS | AWS region |
AWS_ACCOUNT_ID |
SQS get_queue_url |
AWS account ID |
SERVICE_NAME |
SQS get_queue_url |
Service name prefix for queue name |
QUEUE_NAME |
SQS get_queue_url |
Queue name segment |
ENV |
SQS get_queue_url |
Environment suffix (e.g. prod, dev) |
๐ Advanced Features
SQS Consumer - Batch Mode
By default, consumers process messages one by one ("single" mode). Use "batch" mode when you need to group or bulk-process messages.
Constitution-state handling in SQS:
- Single mode: the framework extracts the session from each record automatically (from SNS
MessageAttributes, Base64-decoded) and sets it in context before callingprocess_record(). You do not need to extract it yourself. - Batch mode: the framework groups the incoming records by
constitution-stateand callsprocess_batch()once per group, with the correct state in context for each group. This ensures that state-scoped repositories resolve to the right database even when a batch contains records from different states.
from aws_python_helper.sqs.consumer_base import SQSConsumer
class OrderConsumer(SQSConsumer):
@property
def processing_mode(self) -> str:
return "batch"
async def process_batch(self, records):
# Group records by some key before processing
grouped = {}
for record in records:
message_id = record.get('messageId')
body = self.extract_content_message(record)
key = body.get('region', 'default')
grouped.setdefault(key, []).append((message_id, body))
for region, messages in grouped.items():
try:
# Bulk operation for the whole group
docs = [msg[1] for msg in messages]
await self.db.orders_db.orders.insert_many(docs)
except Exception as e:
# Mark individual messages as failed
for message_id, _ in messages:
self.add_message_failed(message_id, str(e))
Key methods in SQSConsumer:
| Method / Property | Description |
|---|---|
self.extract_content_message(record) |
Parse message body (handles SNS โ SQS wrapping automatically) |
self.parse_body(record) |
Alias for extract_content_message |
self.add_message_failed(message_id, error) |
Mark a message for retry (batch mode) |
self.get_queue_url() |
Get the SQS queue URL (uses AWS_REGION, AWS_ACCOUNT_ID, SERVICE_NAME, QUEUE_NAME, ENV) |
self.db |
Access to main MongoDB cluster |
self.external_db |
Access to external MongoDB clusters |
Retry behavior:
- Messages marked with
add_message_failed()are reported viareportBatchItemFailures - AWS SQS retries only the failed messages, not the whole batch
- Successful messages in the same batch are not retried
SNS Publisher - Batch Publishing
The SNSPublisher automatically injects the current session as a Base64-encoded MessageAttribute on every published message. Base64 encoding is used to avoid SNS filter policy issues with raw JSON string values in attributes. SQS consumers built with this framework will decode it automatically, ensuring the session flows end-to-end through the SNS โ SQS chain without any manual code.
topic = TitleIndexedTopic()
# Publish multiple messages in a single call
# constitution-state is auto-injected as a MessageAttribute on each message
await topic.publish([
{'content': {'id': 'id1', 'title': 'Title 1'}, 'attributes': {'type': 'created'}},
{'content': {'id': 'id2', 'title': 'Title 2'}, 'attributes': {'type': 'updated'}},
{'content': {'id': 'id3', 'title': 'Title 3'}}, # attributes are optional
])
SNS - Message Attributes for Filtering
Use attributes to filter which SQS subscriptions receive each message:
class EventTopic(SNSPublisher):
def __init__(self):
super().__init__(topic_arn=os.getenv('EVENTS_SNS_TOPIC_ARN'))
def build_message(self, payload, event_type, priority='normal'):
return {
'content': payload,
'attributes': {
'event_type': event_type, # SQS subscriptions can filter on this
'priority': priority
}
}
# Usage
topic = EventTopic()
await topic.publish(topic.build_message(
payload={'order_id': '123', 'amount': 99.99},
event_type='order_created',
priority='high'
))
Fargate - Run multiple tasks
executor = FargateExecutor()
task_arns = executor.run_task_batch(
'search-tax-by-town',
[
{'TOWN': 'Norwalk'},
{'TOWN': 'Stamford'},
{'TOWN': 'Bridgeport'}
]
)
Fargate - Check task status
executor = FargateExecutor()
task_arn = executor.run_task('my-task', {'PARAM': 'value'})
# Check task status
status = executor.get_task_status(task_arn)
print(f"Status: {status['status']}")
print(f"Started at: {status['started_at']}")
JSON Utilities for MongoDB Types
When returning MongoDB documents in API responses or exporting data, use the built-in serializers to handle ObjectId, datetime, Decimal128, and other BSON types:
import json
from aws_python_helper.utils.json_encoder import MongoJSONEncoder, mongo_json_dumps
from aws_python_helper.utils.serializer import serialize_mongo_types
# Use as json.dumps cls parameter
json_str = json.dumps(my_mongo_doc, cls=MongoJSONEncoder)
# Helper function
json_str = mongo_json_dumps(my_mongo_doc)
# Convert a document in-place (dict โ JSON-serializable dict)
clean_doc = serialize_mongo_types(my_mongo_doc)
Types automatically converted:
| MongoDB Type | Converts to |
|---|---|
ObjectId |
str |
datetime |
ISO 8601 string |
date |
ISO 8601 string |
Decimal128 |
float |
Decimal |
float |
Binary |
base64 str |
UUID |
str |
bytes |
base64 str |
set |
list |
Common use case โ exporting query results to JSON files:
from aws_python_helper.utils.json_encoder import MongoJSONEncoder
class ExportResultsAPI(API):
async def process(self):
records = await self.db.orders_db.orders.find({}).to_list(1000)
# Write to file with MongoJSONEncoder
with open('/tmp/export.json', 'w') as f:
json.dump(records, f, cls=MongoJSONEncoder, ensure_ascii=False, indent=2)
๐ค Contributing
If you find bugs or want to add features, please create a PR!
๐ License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aws_python_helper-1.0.2.tar.gz.
File metadata
- Download URL: aws_python_helper-1.0.2.tar.gz
- Upload date:
- Size: 103.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
54ed2b28ad13b46cfab7069d8146bf2ccb2223fb221b1062c031e9a5bd4210f9
|
|
| MD5 |
a4fcd34aff5ef3e1b295d39909bd7e3e
|
|
| BLAKE2b-256 |
9fd6f4302321545ebbcd1e1b410debc8db18b522f41d9732e243469d15e9ec59
|
File details
Details for the file aws_python_helper-1.0.2-py3-none-any.whl.
File metadata
- Download URL: aws_python_helper-1.0.2-py3-none-any.whl
- Upload date:
- Size: 83.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7950cc5ac22bfad0f799b835566e2144fb508769b8f49ce410f14ce76495636e
|
|
| MD5 |
716d10d06fc2b97dd412784793f59c89
|
|
| BLAKE2b-256 |
c0aa8b57c25d6530f2130a2d6e599eadfd8e240f9d36e758852ff215624cf93c
|