Skip to main content

AWS Python Helper Framework

Project description

AWS Python Framework

Mini-framework to create REST APIs, SQS Consumers, SNS Publishers, Fargate Tasks, and Standalone Lambdas with Python in AWS Lambda.

๐Ÿš€ Features

  • Reusable single handler: A single handler for all your API routes
  • Dynamic controller loading: Routing based on convention
  • OOP structure: Object-oriented programming for your code
  • Flexible MongoDB: Direct access to multiple databases without models
  • External MongoDB: Connect to multiple MongoDB clusters simultaneously
  • Session propagation: Automatic Session (state + user) propagation across the entire call chain for per-state database routing
  • SQS Consumers: Same pattern to process SQS messages (single or batch mode)
  • SNS Publishers: Same pattern to publish messages to SNS topics
  • Fargate Tasks: Same pattern to run tasks in Fargate containers
  • Standalone Lambdas: Create lambdas invocable directly with AWS SDK
  • Authentication middleware: Built-in token-based authentication
  • Pydantic schema validation: Declare a schema property on any API or Lambda and the framework validates automatically before validate() โ€” returns 400 on failure
  • LambdaInvoker: Invoke other Lambda functions (sync or async) with built-in error handling
  • ApiClient: HTTP client for inter-service communication โ€” resolves service URLs and auth token from environment variables automatically
  • ModelQueryLambda: Base class for MongoDB query-proxy Lambdas โ€” declare an allowed_collections whitelist and the framework handles find and aggregate operations generically
  • JSON utilities: Automatic serialization of MongoDB types
  • Type hints: Modern Python with type annotations
  • Async/await: Full support for asynchronous operations

๐Ÿ”ง Installation

pip install aws-python-helper

๐Ÿ“ฆ Quick Reference

All available classes and functions:

Class / Function Import Purpose
API aws_python_helper.api.base Base class for REST endpoints
api_handler aws_python_helper.api.handler Generic handler for API Gateway
SQSConsumer aws_python_helper.sqs.consumer_base Base class for SQS consumers
sqs_handler aws_python_helper.sqs.handler Factory handler for SQS
SNSPublisher aws_python_helper.sns.publisher Base class for SNS publishers
Lambda aws_python_helper.lambda_standalone.base Base class for Standalone Lambdas
lambda_handler aws_python_helper.lambda_standalone.handler Factory handler for Lambda
FargateTask aws_python_helper.fargate.task_base Base class for Fargate tasks
FargateExecutor aws_python_helper.fargate.executor Launches Fargate tasks from Lambda
fargate_handler aws_python_helper.fargate.handler Entry point handler for Fargate
Repository aws_python_helper.repository.base Base class for MongoDB repositories
ModelQueryLambda aws_python_helper Generic MongoDB query-proxy Lambda base class
LambdaInvoker aws_python_helper Invoke Lambda functions (sync/async) with error handling
ApiClient aws_python_helper HTTP client for inter-service communication
LambdaInvocationError aws_python_helper Raised when boto3 fails to invoke a Lambda
LambdaResponseError aws_python_helper Raised when a Lambda returns a FunctionError
ServiceNotConfiguredError aws_python_helper Raised when a service name is not in MICROSERVICE_URLS
ApiClientError aws_python_helper Raised when an HTTP request fails (network/timeout)
ApiResponseError aws_python_helper Raised when a remote API returns 4xx/5xx
Session aws_python_helper Request-scoped session object (state + user)
get_session aws_python_helper Read the current Session from async context
set_session aws_python_helper Set the current Session in async context
MongoJSONEncoder aws_python_helper.utils.json_encoder JSON encoder for MongoDB types
mongo_json_dumps aws_python_helper.utils.json_encoder Helper to serialize MongoDB types
serialize_mongo_types aws_python_helper.utils.serializer Recursively serialize MongoDB types
UnauthorizedError aws_python_helper.api.exceptions 401 authentication exception
ForbiddenError aws_python_helper.api.exceptions 403 authorization exception

๐Ÿ“‚ Project Structure

This framework follows a convention-based folder structure. Here's the recommended organization:

your-project/
โ””โ”€โ”€ src/
    โ”œโ”€โ”€ api/                           # REST APIs
    โ”‚   โ””โ”€โ”€ users/                     # Resource folder (kebab-case)
    โ”‚       โ”œโ”€โ”€ get.py                 # GET /users/123 -> UserGetAPI
    โ”‚       โ”œโ”€โ”€ list.py                # GET /users -> UserListAPI
    โ”‚       โ”œโ”€โ”€ post.py                # POST /users -> UserPostAPI
    โ”‚       โ”œโ”€โ”€ put.py                 # PUT /users/123 -> UserPutAPI
    โ”‚       โ””โ”€โ”€ delete.py              # DELETE /users/123 -> UserDeleteAPI
    โ”‚
    โ”œโ”€โ”€ consumer/                     # SQS Consumers (direct files)
    โ”‚   โ”œโ”€โ”€ user_created.py            # user-created -> UserCreatedConsumer
    โ”‚   โ”œโ”€โ”€ title_indexed.py           # title-indexed -> TitleIndexedConsumer
    โ”‚   โ””โ”€โ”€ order_processed.py         # order-processed -> OrderProcessedConsumer
    โ”‚
    โ”œโ”€โ”€ lambda/                        # Standalone Lambdas (folders)
    โ”‚   โ”œโ”€โ”€ generate-route/            # generate-route -> GenerateRouteLambda
    โ”‚   โ”‚   โ””โ”€โ”€ main.py
    โ”‚   โ”œโ”€โ”€ sync-carrier/              # sync-carrier -> SyncCarrierLambda
    โ”‚   โ”‚   โ””โ”€โ”€ main.py
    โ”‚   โ””โ”€โ”€ process-payment/           # process-payment -> ProcessPaymentLambda
    โ”‚       โ””โ”€โ”€ main.py
    โ”‚
    โ”œโ”€โ”€ task/                         # Fargate Tasks (folders)
    โ”‚   โ”œโ”€โ”€ search-tax-by-town/        # search-tax-by-town -> SearchTaxByTownTask
    โ”‚   โ”‚   โ”œโ”€โ”€ main.py                # Entry point
    โ”‚   โ”‚   โ””โ”€โ”€ task.py                # Task class
    โ”‚   โ””โ”€โ”€ process-data/              # process-data -> ProcessDataTask
    โ”‚       โ”œโ”€โ”€ main.py
    โ”‚       โ””โ”€โ”€ task.py
    โ”‚
    โ””โ”€โ”€ topic/                        # SNS Publishers
        โ””โ”€โ”€ order_created.py           # OrderCreatedTopic

Naming Conventions

The framework uses automatic class name detection based on your folder/file structure:

Type Handler Name File Path Class Name
API N/A src/api/users/list.py UsersListAPI
Consumer user-created src/consumer/user_created.py UserCreatedConsumer
Lambda generate-route src/lambda/generate-route/main.py GenerateRouteLambda
Task search-tax-by-town src/task/search-tax-by-town/task.py SearchTaxByTownTask

Rules:

  • Handler names use kebab-case (e.g., user-created, generate-route)
  • Consumer files use snake_case (e.g., user_created.py)
  • Lambda folders use kebab-case (e.g., generate-route/)
  • Task folders use kebab-case (e.g., search-tax-by-town/)
  • Class names always use PascalCase with suffix (e.g., UserCreatedConsumer)

๐Ÿ“ Basic Usage

Create an Endpoint

1. Create your API class in src/api/constitutions/list.py:

from aws_python_helper.api.base import API

class ConstitutionListAPI(API):
    async def process(self):
        # Direct access to MongoDB
        constitutions = await self.db.constitution_db.constitutions.find().to_list(100)
        self.set_body(constitutions)

2. The routing is automatic:

  • GET /constitutions โ†’ src/api/constitutions/list.py
  • GET /constitutions/123 โ†’ src/api/constitutions/get.py
  • POST /constitutions โ†’ src/api/constitutions/post.py

3. Configure the generic handler (src/handlers/api_handler.py):

from aws_python_helper.api.handler import api_handler
handler = api_handler

Create an SQS Consumer

1. Create your consumer in src/consumer/title_indexed.py:

from aws_python_helper.sqs.consumer_base import SQSConsumer

class TitleIndexedConsumer(SQSConsumer):
    async def process_record(self, record):
        body = self.extract_content_message(record)
        # Your logic here
        await self.db.constitution_db.titles.insert_one(body)

2. Configure the handler in src/handlers/sqs_handler.py:

from aws_python_helper.sqs.handler import sqs_handler

# Create a handler for each consumer and export it
title_indexed_handler = sqs_handler('title-indexed')

__all__ = ['title_indexed_handler']

Create a Standalone Lambda

Standalone lambdas are functions that can be invoked directly using the AWS SDK, without an HTTP endpoint. They're perfect for internal operations, integrations, and background processing tasks.

Differences with APIs:

  • No API Gateway - invoked directly with AWS SDK
  • No HTTP methods or routing
  • Can be called from other lambdas, Step Functions, or any AWS service
  • Perfect for internal microservices communication

1. Create your lambda class in src/lambda/generate-route/main.py:

from aws_python_helper.lambda_standalone.base import Lambda
from datetime import datetime

class GenerateRouteLambda(Lambda):
    async def validate(self):
        # Validate input data
        if 'shipping_id' not in self.data:
            raise ValueError("shipping_id is required")

        if not isinstance(self.data['shipping_id'], str):
            raise TypeError("shipping_id must be a string")

    async def process(self):
        # Your business logic here
        shipping_id = self.data['shipping_id']

        # Access to MongoDB
        shipping = await self.db.deliveries.shippings.find_one(
            {'_id': shipping_id}
        )

        if not shipping:
            raise ValueError(f"Shipping {shipping_id} not found")

        # Create route
        route = {
            'shipping_id': shipping_id,
            'carrier_id': shipping.get('carrier_id'),
            'status': 'pending',
            'created_at': datetime.utcnow()
        }

        result = await self.db.deliveries.routes.insert_one(route)

        self.logger.info(f"Route created: {result.inserted_id}")

        # Return result
        return {
            'route_id': str(result.inserted_id),
            'shipping_id': shipping_id
        }

2. Configure the handler in src/handlers/lambda_handler.py:

from aws_python_helper.lambda_standalone.handler import lambda_handler

# Create a handler for each lambda and export it
generate_route_handler = lambda_handler('generate-route')
sync_carrier_handler = lambda_handler('sync-carrier')
process_payment_handler = lambda_handler('process-payment')

__all__ = [
    'generate_route_handler',
    'sync_carrier_handler',
    'process_payment_handler'
]

Note: The handler name 'generate-route' (kebab-case) will automatically look for:

  • Folder: src/lambda/generate-route/ (kebab-case)
  • File: main.py
  • Class: GenerateRouteLambda

3. Invoke from another Lambda or API using boto3:

import boto3
import json

lambda_client = boto3.client('lambda')

# Invoke synchronously (RequestResponse)
response = lambda_client.invoke(
    FunctionName='GenerateRouteLambda',
    InvocationType='RequestResponse',
    Payload=json.dumps({
        'session': {'state': 'connecticut'},  # Required
        'data': {
            'shipping_id': '507f1f77bcf86cd799439011'
        }
    })
)

result = json.loads(response['Payload'].read())
# {'success': True, 'data': {'route_id': '...', 'shipping_id': '...'}}

if result['success']:
    print(f"Route created: {result['data']['route_id']}")
else:
    print(f"Error: {result['error']}")

4. Invoke asynchronously (fire and forget):

# Invoke asynchronously (Event)
lambda_client.invoke(
    FunctionName='GenerateRouteLambda',
    InvocationType='Event',  # Asynchronous
    Payload=json.dumps({
        'session': {'state': 'connecticut'},  # Required
        'data': {
            'shipping_id': '507f1f77bcf86cd799439011'
        }
    })
)
# Returns immediately without waiting for the result

Naming Convention:

Lambda Name (kebab-case) Folder File Class
generate-route src/lambda/generate-route/ main.py GenerateRouteLambda
sync-carrier src/lambda/sync-carrier/ main.py SyncCarrierLambda
process-payment src/lambda/process-payment/ main.py ProcessPaymentLambda
send-notification src/lambda/send-notification/ main.py SendNotificationLambda

Common Use Cases:

  • Internal microservices communication
  • Background data processing
  • Integration with external services
  • Scheduled tasks (with EventBridge)
  • Step Functions workflows
  • Cross-service operations

Publish to SNS

1. Create your topic in src/topic/title_indexed.py:

from aws_python_helper.sns.publisher import SNSPublisher
import os

class TitleIndexedTopic(SNSPublisher):
    def __init__(self):
        super().__init__(
            topic_arn=os.getenv('TITLE_INDEXED_SNS_TOPIC_ARN')
        )

    def build_message(self, constitution_id, title, event_type='title_indexed'):
        return {
            'content': {
                'constitution_id': constitution_id,
                'title': title,
                'event_type': event_type
            },
            'attributes': {
                'event_type': event_type   # Used for SNS subscription filtering
            }
        }

2. Use the topic from anywhere:

from src.topic.title_indexed import TitleIndexedTopic

# In a consumer, API or task
topic = TitleIndexedTopic()

# Publish a single message
await topic.publish(topic.build_message('123', 'My Constitution'))

# Publish multiple messages in batch
messages = [
    topic.build_message('id1', 'Constitution A'),
    topic.build_message('id2', 'Constitution B'),
]
await topic.publish(messages)

Message format โ€” every message must have a content key:

{
    'content': {...},              # Required: message body (any dict)
    'attributes': {...},           # Optional: SNS message attributes for filtering
    'subject': 'Optional subject'  # Optional: message subject
}

Run a Fargate Task

1. Create your task in src/task/search-tax-by-town/task.py:

from aws_python_helper.fargate.task_base import FargateTask

class SearchTaxByTownTask(FargateTask):

    async def execute(self):
        town = self.require_env('TOWN')
        self.logger.info(f"Processing town: {town}")

        # Access to DB
        docs = await self.db.smart_data.address.find({'town': town}).to_list()

        # Your logic here
        for doc in docs:
            # Process document
            pass

2. Create the entry point in src/task/search-tax-by-town/main.py:

from aws_python_helper.fargate.handler import fargate_handler
import sys

if __name__ == '__main__':
    exit_code = fargate_handler('search-tax-by-town')
    sys.exit(exit_code)

3. Create the Dockerfile in src/task/search-tax-by-town/Dockerfile:

FROM python:3.10.12-slim
WORKDIR /app

# Install dependencies
COPY requirements.txt /app/framework_requirements.txt
COPY src/task/search-tax-by-town/requirements.txt /app/task_requirements.txt
RUN pip install -r /app/framework_requirements.txt && \
    pip install -r /app/task_requirements.txt

# Copy code
COPY aws_python_helper /app/aws_python_helper
COPY config.py /app/config.py
COPY task /app/task
COPY task/search-tax-by-town/main.py /app/main.py

ENV PYTHONUNBUFFERED=1
CMD ["python", "main.py"]

4. Invoke from Lambda:

from aws_python_helper.fargate.executor import FargateExecutor

def handler(event, context):
    executor = FargateExecutor()
    # session is auto-propagated as SESSION env var (JSON) in the container
    task_arn = executor.run_task(
        'search-tax-by-town',
        envs={'TOWN': 'Norwalk', 'ONLY_TAX': 'true'}
    )
    return {'taskArn': task_arn}

๐Ÿ—„๏ธ Access to MongoDB

The framework provides flexible access to multiple databases:

class MyAPI(API):
    async def process(self):
        # Access to different databases on the same cluster
        user = await self.db.users_db.users.find_one({'_id': user_id})

        # Another database
        await self.db.analytics_db.logs.insert_one({'action': 'view'})

        # Multiple collections
        titles = await self.db.constitution_db.titles.find().to_list(100)
        articles = await self.db.constitution_db.articles.find().to_list(100)

The pattern is always: self.db.<database_name>.<collection_name>.<motor_operation>()

External MongoDB Clusters

Connect to additional MongoDB clusters using EXTERNAL_MONGODB_CONNECTIONS:

EXTERNAL_MONGODB_CONNECTIONS='[
    {"name": "ClusterDockets", "connection_string": "mongodb+srv://cluster.mongodb.net"},
    {"name": "ClusterAnalytics", "connection_string": "mongodb+srv://analytics.mongodb.net"}
]'

The credentials from MONGO_DB_USER / MONGO_DB_PASSWORD are automatically injected into the connection strings.

Access external clusters via self.external_db:

class AddressAPI(API):
    async def process(self):
        # Access external cluster: self.external_db.<ClusterName>.<database>.<collection>
        addresses = await self.external_db.ClusterDockets.smart_data.addresses.find(
            {'town': self.data['town']}
        ).to_list(100)

        self.set_body({'addresses': addresses})

self.external_db is available in API, SQSConsumer, Lambda, and FargateTask.

๐Ÿ—‚๏ธ Repository Pattern

The framework provides a Repository base class that eliminates repetitive boilerplate in data access layers. Each repository only declares what collection it uses, whether it belongs to an external cluster, and what indexes to create. The base class handles the MongoDB connection and index creation automatically.

Properties to override

Property Type Default Required
collection_name str โ€” Yes
database_key str | None None No โ€” if None, uses session.state from context
is_external bool False No
cluster_name str None Only if is_external=True
indexes list [] No

database_key controls how the database is resolved:

  • database_key = "core" (or any string) โ†’ always connects to that specific database.
  • database_key = None (default) โ†’ reads session.state from context automatically. This makes the repository state-scoped: it connects to "connecticut", "new_jersey", etc. depending on the current request.

Collections are cached per (database_name, collection_name) key โ€” state-scoped repositories correctly isolate state between concurrent requests.

Index format

@property
def indexes(self):
    return [
        {"key": [("field", 1)]},                               # simple ASC
        {"key": [("field", -1)]},                              # simple DESC
        {"key": [("f1", 1), ("f2", -1)], "unique": True},     # compound + unique
        {"key": [("expires_at", 1)], "expireAfterSeconds": 0}, # TTL index
    ]

Indexes are created automatically in the background on first collection access โ€” no need to call any initialization method.

Repository with a fixed database

from aws_python_helper import Repository

class TownsRepository(Repository):

    @property
    def collection_name(self):
        return "towns"

    @property
    def database_key(self):
        return "core"  # always connects to the "core" database

    @property
    def indexes(self):
        return [
            {"key": [("name", 1)]},
            {"key": [("platform", 1)]},
        ]

    async def get_available(self, platforms):
        return await self.collection.find(
            {"platform": {"$in": platforms}},
            {"name": 1, "platform": 1}
        ).to_list(length=None)

    async def find_by_name(self, name):
        return await self.collection.find_one({"name": name})

State-scoped repository (no database_key)

When database_key is not set, the repository reads session.state from context and uses it as the database name. The same repository instance connects to "connecticut" for one request and to "new_jersey" for another โ€” automatically.

from aws_python_helper import Repository

class LandRecordsRepository(Repository):

    @property
    def collection_name(self):
        return "records"

    # No database_key โ†’ uses session.state automatically
    # If session.state = "connecticut" โ†’ connects to DB "connecticut"
    # If session.state = "new_jersey"  โ†’ connects to DB "new_jersey"

    @property
    def indexes(self):
        return [
            {"key": [("unique_id", 1)]},
            {"key": [("owner", 1), ("town", 1)]},
        ]

    async def bulk_upsert(self, records):
        from pymongo import UpdateOne
        operations = [
            UpdateOne({"unique_id": r["unique_id"]}, {"$set": r}, upsert=True)
            for r in records
        ]
        result = await self.collection.bulk_write(operations)
        return {"upserted": result.upserted_count, "modified": result.modified_count}

Note: A ValueError is raised at runtime if database_key is None and session.state has not been set. This is prevented automatically by the framework at every entry point (API, Lambda, SQS, Fargate).

Repository on an external cluster

from aws_python_helper import Repository

class AddressRepository(Repository):

    @property
    def database_key(self):
        return "smart_data"

    @property
    def collection_name(self):
        return "address"

    @property
    def is_external(self):
        return True

    @property
    def cluster_name(self):
        return "ClusterDockets"  # Must match a name in EXTERNAL_MONGODB_CONNECTIONS

    async def find_by_query(self, query, limit=None):
        cursor = self.collection.find(query)
        if limit:
            cursor = cursor.limit(limit)
        return await cursor.to_list(length=None)

Instantiation โ€” no db argument needed

class MyAPI(API):

    @property
    def towns_repository(self):
        if not self._towns_repository:
            self._towns_repository = TownsRepository()  # no args!
        return self._towns_repository

    async def process(self):
        towns = await self.towns_repository.get_available(["platform_a", "platform_b"])
        self.set_body({"towns": towns})

The repository connects itself using the already-initialized MongoManager singleton โ€” the same one used by self.db. No need to pass self.db or any connection object.

๐ŸŒ Session Context

The framework propagates a Session object automatically across the entire async call chain using Python's contextvars.ContextVar. The session holds state (for multi-state DB routing) and user (authenticated user from the auth middleware).

How the framework injects it at each entry point

Entry point How the session is read
API Gateway constitution-state header โ†’ session.state (when AUTHORIZATION includes state); auth middleware โ†’ session.user (when includes user). Returns 400 if required header is missing
Standalone Lambda session dict in the event payload โ€” required (must include state), raises ValueError if missing
SQS Consumer (single mode) Per-record: reads session from SNS MessageAttributes (Base64-encoded JSON)
SQS Consumer (batch mode) Groups records by session.state; calls process_batch() once per group with the correct session in context
Fargate Task SESSION env var (JSON) โ€” auto-injected by FargateExecutor

How the framework propagates it to downstream services

Downstream service Propagation mechanism
SNS Publisher Auto-injects the full session as a session MessageAttribute (Base64-encoded JSON) on every published message
FargateExecutor Auto-injects SESSION as a JSON env var when launching Fargate containers

This means that an API call with constitution-state: connecticut will automatically carry the full session (state + user) through SNS โ†’ SQS โ†’ Fargate without any code changes in your consumers or tasks.

Accessing the session in handlers

All handler base classes expose a self.session property:

class MyAPI(API):
    async def process(self):
        state = self.session.state   # e.g. "connecticut"
        user  = self.session.user    # authenticated user dict, or None

Available in API, SQSConsumer, Lambda, and FargateTask.

State-scoped repositories

Repositories with no database_key (default) read session.state from context to resolve the target database automatically. See the Repository Pattern section for details.

Manual access

If you need to read or set the session manually (e.g., in tests or utility code):

from aws_python_helper import Session, get_session, set_session

session = get_session()         # returns current Session (creates empty one if not set)
session.state                   # e.g. "connecticut", or None if not set
session.user                    # authenticated user dict, or None

set_session(Session(state="new_jersey"))  # set manually (the framework does this automatically)

API example โ€” constitution-state header

GET /constitutions HTTP/1.1
constitution-state: connecticut
Authorization: Bearer <token>

Lambda invocation example โ€” session in event

import boto3, json

lambda_client = boto3.client('lambda')
lambda_client.invoke(
    FunctionName='MyLambdaFunction',
    InvocationType='RequestResponse',
    Payload=json.dumps({
        'session': {'state': 'connecticut'},  # Required
        'data': {'key': 'value'}
    })
)

๐Ÿ”„ Routing Convention

The framework uses convention over configuration for the routing:

Request Loaded file
GET /users api/users/list.py
GET /users/123 api/users/get.py
POST /users api/users/post.py
PUT /users/123 api/users/put.py
DELETE /users/123 api/users/delete.py
GET /users/123/posts api/users/posts/list.py
GET /users/123/posts/456 api/users/posts/get.py

Logic:

  • The parts with even indices (0,2,4...) are directories
  • The parts with odd indices (1,3,5...) are path parameters
  • GET with odd number of parts โ†’ list method
  • GET with even number of parts โ†’ get method
  • Other methods use their name directly

๐Ÿงฉ API Class Reference

All properties and methods available inside an API subclass:

Request Properties

Property Type Description
self.data dict Request body (POST/PUT) or query params (GET)
self.headers dict HTTP request headers
self.path_parameters dict URL path parameters (e.g. /users/123 โ†’ {'id': '123'})
self.query_parameters dict Query string parameters
self.db DatabaseProxy Access to main MongoDB cluster
self.external_db ExternalDatabaseProxy Access to external MongoDB clusters
self.session Session Request-scoped session (session.state, session.user)
self.current_user dict | None Authenticated user document (requires REQUIRE_AUTH=true)
self.is_authenticated bool Whether the request is authenticated
self.auth_data dict | None Full authentication data

Response Methods

Method Description
self.set_code(code: int) Set HTTP response status code
self.set_body(body: Any) Set response body (auto-serialized to JSON)
self.set_header(key: str, value: str) Add a single response header
self.set_headers(headers: dict) Set multiple response headers at once

Methods to Override

Method Required Description
async validate() Optional Validate request data, raise exceptions to reject
async process() Required Main business logic
class UserGetAPI(API):
    async def validate(self):
        # Access path params: /users/123 โ†’ self.path_parameters = {'id': '123'}
        if not self.path_parameters.get('id'):
            raise ValueError("User ID is required")

    async def process(self):
        user_id = self.path_parameters['id']
        user = await self.db.users_db.users.find_one({'_id': user_id})

        if not user:
            self.set_code(404)
            self.set_body({'error': 'User not found'})
            return

        self.set_code(200)
        self.set_body({'data': user})
        self.set_header('X-Resource-Id', user_id)

๐Ÿ” Authentication

The framework includes a built-in token-based authentication middleware.

Configuration

AUTHORIZATION=full           # Authorization mode: 'user', 'state', or 'full' (default: empty/disabled)
AUTH_DB_NAME=my_database     # MongoDB database where tokens are stored
AUTH_BYPASS_TOKEN=secret123  # Master token to bypass auth (for internal use)

Using the authenticated user

When AUTHORIZATION is user or full, every request must include a valid Authorization: Bearer <token> header. The authenticated user is available via self.current_user:

class OrderListAPI(API):
    async def process(self):
        # self.current_user contains the user document from MongoDB
        user_id = self.current_user['_id']

        orders = await self.db.orders_db.orders.find(
            {'user_id': user_id}
        ).to_list(100)

        self.set_body({'data': orders})

Auth exceptions

Use these exceptions in your validate() or process() methods:

from aws_python_helper.api.exceptions import UnauthorizedError, ForbiddenError

class AdminOnlyAPI(API):
    async def validate(self):
        if not self.is_authenticated:
            raise UnauthorizedError("Authentication required")  # Returns 401

        if self.current_user.get('role') != 'admin':
            raise ForbiddenError("Admin access required")       # Returns 403

๐ŸŽฏ Complete Example

# src/api/constitutions/list.py
from aws_python_helper.api.base import API

class ConstitutionListAPI(API):
    async def validate(self):
        if 'limit' in self.data:
            limit = int(self.data['limit'])
            if limit > 1000:
                raise ValueError("Limit cannot exceed 1000")

    async def process(self):
        # Build filters
        filters = {}
        if 'country' in self.data:
            filters['country'] = self.data['country']

        # Query MongoDB
        limit = int(self.data.get('limit', 100))
        results = await self.db.constitution_db.constitutions.find(
            filters
        ).limit(limit).to_list(limit)

        # Count total
        total = await self.db.constitution_db.constitutions.count_documents(filters)

        # Register in analytics
        await self.db.analytics_db.searches.insert_one({
            'filters': filters,
            'result_count': len(results)
        })

        # Response
        self.set_body({
            'data': results,
            'total': total
        })
        self.set_header('X-Total-Count', str(total))

๐Ÿ”— Integration Example: API + Standalone Lambda

Here's a complete example showing how an API can invoke a standalone lambda:

Scenario: An API endpoint that creates a shipping and then asynchronously generates its route using a standalone lambda.

1. The API endpoint (src/api/shippings/post.py):

from pydantic import BaseModel
from aws_python_helper import API, LambdaInvoker

class ShippingSchema(BaseModel):
    customer_id: str
    address: str
    items: list[str]

class ShippingPostAPI(API):

    @property
    def schema(self):
        return ShippingSchema  # automatic validation โ†’ 400 on failure

    async def process(self):
        shipping = {
            'customer_id': self.data['customer_id'],
            'address': self.data['address'],
            'items': self.data['items'],
            'status': 'pending',
            'route_pending': True
        }

        result = await self.db.deliveries.shippings.insert_one(shipping)
        shipping_id = str(result.inserted_id)

        # Invoke standalone lambda asynchronously (fire-and-forget)
        invoker = LambdaInvoker()
        invoker.invoke_async('GenerateRouteLambda', payload={
            'data': {'shipping_id': shipping_id}
        })

        self.set_code(201)
        self.set_body({
            'shipping_id': shipping_id,
            'status': 'pending',
            'message': 'Shipping created, route generation in progress'
        })

2. The standalone lambda (src/lambda/generate-route/main.py):

from aws_python_helper.lambda_standalone.base import Lambda

class GenerateRouteLambda(Lambda):
    async def validate(self):
        if 'shipping_id' not in self.data:
            raise ValueError("shipping_id is required")

    async def process(self):
        shipping_id = self.data['shipping_id']

        # Get shipping details
        shipping = await self.db.deliveries.shippings.find_one(
            {'_id': shipping_id}
        )

        if not shipping:
            raise ValueError(f"Shipping {shipping_id} not found")

        # Generate optimal route
        route = await self.calculate_optimal_route(shipping)

        # Save route
        route_result = await self.db.deliveries.routes.insert_one(route)

        # Update shipping
        await self.db.deliveries.shippings.update_one(
            {'_id': shipping_id},
            {'$set': {
                'route_id': route_result.inserted_id,
                'route_pending': False,
                'status': 'scheduled'
            }}
        )

        return {
            'route_id': str(route_result.inserted_id),
            'shipping_id': shipping_id
        }

    async def calculate_optimal_route(self, shipping):
        # Your route calculation logic here
        return {
            'shipping_id': shipping['_id'],
            'carrier_id': shipping.get('carrier_id'),
            'estimated_duration': 60,
            'status': 'pending'
        }

3. Configure handlers (src/handlers/lambda_handler.py):

from aws_python_helper.lambda_standalone.handler import lambda_handler

generate_route_handler = lambda_handler('generate-route')

__all__ = ['generate_route_handler']

Benefits of this pattern:

  • API responds immediately (better UX)
  • Route generation happens in the background
  • Decoupled services (easier to maintain)
  • Can retry lambda independently if it fails
  • Scalable architecture

โœ… Pydantic Schema Validation

The framework supports automatic request validation via Pydantic v2. Override the schema property on any API or Lambda subclass to declare the expected shape of self.data. The framework validates before calling validate() and replaces self.data with the coerced output โ€” the property name stays the same.

In an API endpoint

For POST/PUT, self.data is the request body. For GET/LIST, it is the query parameters. Both are validated the same way.

from pydantic import BaseModel
from aws_python_helper import API

class SearchSchema(BaseModel):
    keys: list[str]
    state: str
    limit: int = 100

class SearchPostAPI(API):

    @property
    def schema(self):
        return SearchSchema  # โ† that's all

    async def process(self):
        keys  = self.data["keys"]   # self.data is validated and coerced
        limit = self.data["limit"]  # default applied by Pydantic
        ...

On validation failure the framework automatically returns HTTP 400 โ€” no extra code needed.

In a Standalone Lambda

Same property, same behavior. On failure a ValueError is raised, which the handler returns as {"success": False, "error": "..."}.

from pydantic import BaseModel
from aws_python_helper import Lambda

class SyncSchema(BaseModel):
    shipping_id: str
    carrier_id: str

class SyncCarrierLambda(Lambda):

    @property
    def schema(self):
        return SyncSchema

    async def process(self):
        shipping_id = self.data["shipping_id"]  # validated
        ...

Without a schema

If schema is not overridden (returns None, the default), the behavior is identical to before โ€” no change.


๐Ÿ”— Inter-Service Communication

The framework provides two utilities for calling other services: LambdaInvoker for direct Lambda invocations and ApiClient for HTTP API calls.

LambdaInvoker

Wraps boto3 Lambda invocations with proper error handling. Never returns None โ€” raises typed exceptions instead.

from aws_python_helper import LambdaInvoker, LambdaInvocationError, LambdaResponseError

invoker = LambdaInvoker()

# Synchronous โ€” waits for result
result = invoker.invoke("my-function-name", payload={"key": "value"})
# result is the parsed dict returned by the Lambda

# Asynchronous fire-and-forget
invoker.invoke_async("my-function-name", payload={"key": "value"})

Exceptions:

Exception When
LambdaInvocationError boto3 could not reach the function (network, permissions, etc.)
LambdaResponseError The function itself raised an unhandled exception (FunctionError in the response)

Note: boto3 is part of the AWS Lambda Python runtime โ€” no installation needed. For Fargate containers, add boto3 to your requirements.txt.

ApiClient

HTTP client for service-to-service calls. Resolves the target URL and auth token automatically from environment variables โ€” no hardcoded URLs or tokens in code.

from aws_python_helper import ApiClient

client = ApiClient("dockets")                               # service name only
result = await client.post("/search", body={"keys": [...]})
result = await client.get("/dockets/123")
result = await client.list("/dockets", params={"state": "CT"})
result = await client.put("/dockets/123", body={"status": "active"})
result = await client.patch("/dockets/123", body={"reviewed": True})
result = await client.delete("/dockets/123")

# Extra headers merged with the defaults (token is always injected automatically)
client = ApiClient("dockets", headers={"constitution-state": "CT"})

How URL and token are resolved:

Env var Purpose
MICROSERVICE_URLS JSON map: {"dockets": "https://api.example.com/", "title-search": "https://..."}
INTER_SERVICE_TOKEN Bearer token injected as Authorization: Bearer <token>
AUTH_BYPASS_TOKEN Fallback if INTER_SERVICE_TOKEN is not set

Exceptions:

Exception When
ServiceNotConfiguredError Service name not found in MICROSERVICE_URLS
ApiClientError Network error or timeout
ApiResponseError Remote API returned 4xx or 5xx (has .status_code and .response_body attributes)

Terraform configuration โ€” set once per environment in Secrets Manager with keys inter_service_token and microservice_urls. The Lambda and Fargate modules pick them up automatically from infrastructure outputs; no extra variables needed in resource files beyond passing them through:

# In AWS Secrets Manager secret (JSON):
# {
#   "inter_service_token": "my-secret-token",
#   "microservice_urls": "{\"dockets\": \"https://api-dockets.execute-api.us-east-2.amazonaws.com/\"}",
#   ...
# }

# In Terraform module call (already handled by the modules):
module "my_lambda" {
  source              = "../modules/lambda"
  inter_service_token = data.terraform_remote_state.infrastructure.outputs.inter_service_token
  microservice_urls   = data.terraform_remote_state.infrastructure.outputs.microservice_urls
  ...
}

๐Ÿ” ModelQueryLambda

ModelQueryLambda is a ready-to-use base class for creating MongoDB query-proxy Lambdas. Instead of writing a custom Lambda every time another microservice needs to read data from your MongoDB, you subclass ModelQueryLambda, declare an allowed_collections whitelist, and the framework exposes both find and aggregate operations automatically.

Use case: microservice A needs to query data owned by microservice B. Microservice B deploys one model-query Lambda; microservice A calls it via LambdaInvoker passing the collection name and query parameters.

Creating a model-query Lambda

Create src/lambda/model-query/main.py in the microservice that owns the data:

from aws_python_helper import ModelQueryLambda

class ModelQueryLambda(ModelQueryLambda):

    @property
    def allowed_collections(self) -> list:
        return ["orders", "customers"]  # whitelist โ€” unlisted collections are rejected

Register the handler in src/handlers/lambda_handler.py:

from aws_python_helper.lambda_standalone.handler import lambda_handler

model_query_handler = lambda_handler('model-query')

__all__ = ["model_query_handler"]

Query modes

Aggregate pipeline

Pass a pipeline list to run a MongoDB aggregation:

invoker.invoke("ServiceModelQuery-dev", payload={
    "session": self.session.to_dict(),
    "collection": "orders",
    "pipeline": [
        {"$match": {"status": "pending"}},
        {"$sort": {"created_at": -1}},
        {"$limit": 50},
    ],
})

Find with filter

Pass filter, fields, limit, and/or skip to run a find:

invoker.invoke("ServiceModelQuery-dev", payload={
    "session": self.session.to_dict(),
    "collection": "orders",
    "filter": {"customer_id": "abc123", "status": "active"},
    "fields": {"_id": 1, "total": 1, "status": 1},  # projection
    "limit": 20,
    "skip": 0,
})

Note: pipeline and fields are mutually exclusive โ€” the framework rejects payloads that include both.

Querying by _id

ObjectId hex strings in _id filters are automatically coerced to ObjectId so callers can serialize queries as plain JSON:

# Plain value
invoker.invoke("ServiceModelQuery-dev", payload={
    "session": self.session.to_dict(),
    "collection": "orders",
    "filter": {"_id": "65f1a2b3c4d5e6f7a8b9c0d1"},
})

# Operator with list
invoker.invoke("ServiceModelQuery-dev", payload={
    "session": self.session.to_dict(),
    "collection": "orders",
    "filter": {"_id": {"$in": ["65f1...", "65f2...", "65f3..."]}},
})

# 'id' is accepted as an alias for '_id'
invoker.invoke("ServiceModelQuery-dev", payload={
    "session": self.session.to_dict(),
    "collection": "orders",
    "filter": {"id": "65f1a2b3c4d5e6f7a8b9c0d1"},
})

Supported operators on _id: scalar ($eq, $ne, $gt, $gte, $lt, $lte) and list ($in, $nin). The same coercion is applied to $match stages inside a pipeline. If the string is not a valid 24-char ObjectId hex, the Lambda raises a clear validation error.

Payload reference

Field Type Required Description
session dict Yes Session dict from self.session.to_dict() โ€” drives state-scoped DB routing
collection str Yes Collection name (must be in allowed_collections)
pipeline list One of MongoDB aggregation pipeline
filter dict One of MongoDB filter document (defaults to {}). _id/id string values are auto-coerced to ObjectId
fields dict No MongoDB projection (only valid with filter mode)
limit int No Max documents to return
skip int No Documents to skip (default 0)

Response format

The Lambda always returns the standard framework envelope:

{
  "success": true,
  "data": [ ... ]
}

On validation error (unknown collection, invalid payload):

{
  "success": false,
  "error": "Collection 'unknown' is not allowed. Allowed: orders, customers"
}

Calling it from another microservice

import os
from aws_python_helper import LambdaInvoker, LambdaInvocationError

class OrdersPostAPI(API):

    async def process(self):
        try:
            response = LambdaInvoker().invoke(
                os.getenv("ORDERS_MODEL_QUERY_LAMBDA_NAME"),
                payload={
                    "session": self.session.to_dict(),
                    "collection": "orders",
                    "filter": {"status": "pending"},
                    "limit": 100,
                },
            )
        except LambdaInvocationError as e:
            self.logger.error(f"model-query invocation failed: {e}")
            raise ValueError("Could not retrieve orders. Please try again later.")

        if not response.get("success"):
            raise ValueError("Could not retrieve orders. Please try again later.")

        orders = response.get("data", [])
        self.set_body({"orders": orders})

Session and database routing

The session field in the payload is required. The ModelQueryLambda uses session.state to resolve the target database โ€” the same state-scoped routing used by all framework components. This means:

  • {"session": {"state": "connecticut"}, ...} โ†’ queries the connecticut database
  • {"session": {"state": "new_jersey"}, ...} โ†’ queries the new_jersey database

Always pass self.session.to_dict() when calling from an API or Lambda to ensure state propagates correctly.


๐Ÿ—๏ธ Architecture Overview

Typical flow for event-driven architectures using this framework:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  Client  โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚ API Gateway โ”‚โ”€โ”€โ”€โ”€โ–ถโ”‚  Lambda: api_handler                 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜     โ”‚  (src/api/resource/post.py)          โ”‚
                                     โ”‚  โ†’ validates, queries MongoDB,        โ”‚
                                     โ”‚    publishes to SNS                   โ”‚
                                     โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                                      โ”‚
                                                      โ–ผ
                                             โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                                             โ”‚   SNS Topic     โ”‚
                                             โ”‚ (fanout/filter) โ”‚
                                             โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                      โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                                      โ–ผ               โ–ผ               โ–ผ
                               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                               โ”‚  SQS Queue โ”‚  โ”‚  SQS Queue โ”‚  โ”‚  SQS Queue โ”‚
                               โ”‚  Platform Aโ”‚  โ”‚  Platform Bโ”‚  โ”‚  Platform Cโ”‚
                               โ””โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                     โ”‚               โ”‚               โ”‚
                                     โ–ผ               โ–ผ               โ–ผ
                               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                               โ”‚  Lambda: sqs_handler                         โ”‚
                               โ”‚  (src/consumer/platform_consumer.py)         โ”‚
                               โ”‚  โ†’ groups messages, acquires sessions,        โ”‚
                               โ”‚    launches Fargate tasks                     โ”‚
                               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                                   โ”‚  FargateExecutor.run_task()
                                                   โ–ผ
                               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                               โ”‚  Fargate Task: fargate_handler               โ”‚
                               โ”‚  (src/task/my-task/task.py)                  โ”‚
                               โ”‚  โ†’ scrapes/processes data,                   โ”‚
                               โ”‚    writes results to MongoDB                  โ”‚
                               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ” Environment Variables

MongoDB Configuration

The framework supports two ways to configure MongoDB:

Option 1: Full Connection String

# Full URI with embedded credentials
MONGODB_URI=mongodb+srv://user:password@cluster.mongodb.net/dbname?retryWrites=true&w=majority
# or
MONGO_DB_URI=mongodb+srv://user:password@cluster.mongodb.net/dbname

Option 2: Separate Components (Recommended for Terraform)

# Host without credentials
MONGO_DB_HOST=mongodb+srv://cluster.mongodb.net

# Credentials (more secure)
MONGO_DB_USER=admin
MONGO_DB_PASSWORD=my-secure-password

# Optional
MONGO_DB_NAME=my_database
MONGO_DB_OPTIONS=retryWrites=true&w=majority

Benefits of separate components:

  • โœ… Better security: credentials separate from host
  • โœ… Easy integration with Terraform/AWS Secrets Manager
  • โœ… Passwords with special characters are handled automatically
  • โœ… More flexible for different environments

The framework automatically:

  1. URL-encodes the password (handles @, :, /, etc.)
  2. Builds the full URI
  3. Initializes the connection

Terraform Example

environment_variables = {
  MONGO_DB_HOST     = module.mongodb.connection_string
  MONGO_DB_USER     = module.mongodb.database_user
  MONGO_DB_PASSWORD = module.mongodb.database_password
}

All Environment Variables

Variable Required Description
MONGODB_URI or MONGO_DB_URI One of these or components below Full MongoDB connection string
MONGO_DB_HOST Alt. to URI MongoDB host (e.g. mongodb+srv://cluster.net)
MONGO_DB_USER Alt. to URI MongoDB username
MONGO_DB_PASSWORD Alt. to URI MongoDB password
MONGO_DB_NAME Optional Default database name
MONGO_DB_OPTIONS Optional Connection options (e.g. retryWrites=true&w=majority)
EXTERNAL_MONGODB_CONNECTIONS Optional JSON array of external cluster configurations
REQUIRE_AUTH Optional Enable authentication middleware (true/false)
AUTH_DB_NAME If REQUIRE_AUTH=true MongoDB database for token validation
AUTH_BYPASS_TOKEN Optional Master token to bypass authentication
INTER_SERVICE_TOKEN Optional Bearer token for ApiClient service-to-service calls
MICROSERVICE_URLS Optional JSON map of service name โ†’ base URL used by ApiClient
ECS_CLUSTER Fargate only ECS cluster name for FargateExecutor
ECS_SUBNETS Fargate only Comma-separated subnet IDs for Fargate tasks
CONSTITUTION_STATE Fargate only (auto) State injected automatically by FargateExecutor โ€” do not set manually
AWS_REGION Fargate/SNS/SQS AWS region
AWS_ACCOUNT_ID SQS get_queue_url AWS account ID
SERVICE_NAME SQS get_queue_url Service name prefix for queue name
QUEUE_NAME SQS get_queue_url Queue name segment
ENV SQS get_queue_url Environment suffix (e.g. prod, dev)

๐Ÿ“Š Advanced Features

SQS Consumer - Batch Mode

By default, consumers process messages one by one ("single" mode). Use "batch" mode when you need to group or bulk-process messages.

Constitution-state handling in SQS:

  • Single mode: the framework extracts the session from each record automatically (from SNS MessageAttributes, Base64-decoded) and sets it in context before calling process_record(). You do not need to extract it yourself.
  • Batch mode: the framework groups the incoming records by constitution-state and calls process_batch() once per group, with the correct state in context for each group. This ensures that state-scoped repositories resolve to the right database even when a batch contains records from different states.
from aws_python_helper.sqs.consumer_base import SQSConsumer

class OrderConsumer(SQSConsumer):

    @property
    def processing_mode(self) -> str:
        return "batch"

    async def process_batch(self, records):
        # Group records by some key before processing
        grouped = {}
        for record in records:
            message_id = record.get('messageId')
            body = self.extract_content_message(record)
            key = body.get('region', 'default')
            grouped.setdefault(key, []).append((message_id, body))

        for region, messages in grouped.items():
            try:
                # Bulk operation for the whole group
                docs = [msg[1] for msg in messages]
                await self.db.orders_db.orders.insert_many(docs)
            except Exception as e:
                # Mark individual messages as failed
                for message_id, _ in messages:
                    self.add_message_failed(message_id, str(e))

Key methods in SQSConsumer:

Method / Property Description
self.extract_content_message(record) Parse message body (handles SNS โ†’ SQS wrapping automatically)
self.parse_body(record) Alias for extract_content_message
self.add_message_failed(message_id, error) Mark a message for retry (batch mode)
self.get_queue_url() Get the SQS queue URL (uses AWS_REGION, AWS_ACCOUNT_ID, SERVICE_NAME, QUEUE_NAME, ENV)
self.db Access to main MongoDB cluster
self.external_db Access to external MongoDB clusters

Retry behavior:

  • Messages marked with add_message_failed() are reported via reportBatchItemFailures
  • AWS SQS retries only the failed messages, not the whole batch
  • Successful messages in the same batch are not retried

SNS Publisher - Batch Publishing

The SNSPublisher automatically injects the current session as a Base64-encoded MessageAttribute on every published message. Base64 encoding is used to avoid SNS filter policy issues with raw JSON string values in attributes. SQS consumers built with this framework will decode it automatically, ensuring the session flows end-to-end through the SNS โ†’ SQS chain without any manual code.

topic = TitleIndexedTopic()

# Publish multiple messages in a single call
# constitution-state is auto-injected as a MessageAttribute on each message
await topic.publish([
    {'content': {'id': 'id1', 'title': 'Title 1'}, 'attributes': {'type': 'created'}},
    {'content': {'id': 'id2', 'title': 'Title 2'}, 'attributes': {'type': 'updated'}},
    {'content': {'id': 'id3', 'title': 'Title 3'}},  # attributes are optional
])

SNS - Message Attributes for Filtering

Use attributes to filter which SQS subscriptions receive each message:

class EventTopic(SNSPublisher):
    def __init__(self):
        super().__init__(topic_arn=os.getenv('EVENTS_SNS_TOPIC_ARN'))

    def build_message(self, payload, event_type, priority='normal'):
        return {
            'content': payload,
            'attributes': {
                'event_type': event_type,   # SQS subscriptions can filter on this
                'priority': priority
            }
        }

# Usage
topic = EventTopic()
await topic.publish(topic.build_message(
    payload={'order_id': '123', 'amount': 99.99},
    event_type='order_created',
    priority='high'
))

Fargate - Run multiple tasks

executor = FargateExecutor()
task_arns = executor.run_task_batch(
    'search-tax-by-town',
    [
        {'TOWN': 'Norwalk'},
        {'TOWN': 'Stamford'},
        {'TOWN': 'Bridgeport'}
    ]
)

Fargate - Check task status

executor = FargateExecutor()
task_arn = executor.run_task('my-task', {'PARAM': 'value'})

# Check task status
status = executor.get_task_status(task_arn)
print(f"Status: {status['status']}")
print(f"Started at: {status['started_at']}")

JSON Utilities for MongoDB Types

When returning MongoDB documents in API responses or exporting data, use the built-in serializers to handle ObjectId, datetime, Decimal128, and other BSON types:

import json
from aws_python_helper.utils.json_encoder import MongoJSONEncoder, mongo_json_dumps
from aws_python_helper.utils.serializer import serialize_mongo_types

# Use as json.dumps cls parameter
json_str = json.dumps(my_mongo_doc, cls=MongoJSONEncoder)

# Helper function
json_str = mongo_json_dumps(my_mongo_doc)

# Convert a document in-place (dict โ†’ JSON-serializable dict)
clean_doc = serialize_mongo_types(my_mongo_doc)

Types automatically converted:

MongoDB Type Converts to
ObjectId str
datetime ISO 8601 string
date ISO 8601 string
Decimal128 float
Decimal float
Binary base64 str
UUID str
bytes base64 str
set list

Common use case โ€” exporting query results to JSON files:

from aws_python_helper.utils.json_encoder import MongoJSONEncoder

class ExportResultsAPI(API):
    async def process(self):
        records = await self.db.orders_db.orders.find({}).to_list(1000)

        # Write to file with MongoJSONEncoder
        with open('/tmp/export.json', 'w') as f:
            json.dump(records, f, cls=MongoJSONEncoder, ensure_ascii=False, indent=2)

๐Ÿค Contributing

If you find bugs or want to add features, please create a PR!

๐Ÿ“„ License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws_python_helper-1.0.0.tar.gz (93.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aws_python_helper-1.0.0-py3-none-any.whl (78.1 kB view details)

Uploaded Python 3

File details

Details for the file aws_python_helper-1.0.0.tar.gz.

File metadata

  • Download URL: aws_python_helper-1.0.0.tar.gz
  • Upload date:
  • Size: 93.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for aws_python_helper-1.0.0.tar.gz
Algorithm Hash digest
SHA256 098ed3ac58790e812b41083c5f22afac0aee28a121f23eaac568f723da7875a2
MD5 bec73abcbbef8732c14472b1f711b3aa
BLAKE2b-256 a0bd684ad755dbaedd4beba6af6949fd1718355b90173183c8ba69b1f0ca8a6f

See more details on using hashes here.

File details

Details for the file aws_python_helper-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for aws_python_helper-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ecedc3a48d0a9257a2b13f376f521e57c0c9f1df900a9a5f5359574f6ae125c3
MD5 837096d9c121186f21fe5b38f2a7d312
BLAKE2b-256 ab885963430fc07c9dcee967a7669c9f6c501c5a73a475ea6d945de8f6db25e1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page