A framework for creating AWS Lambda Async Workflows. - Unstable Branch
Project description
async-lambda
Async-lambda is a Python framework for building scalable, event-driven AWS Lambda applications with first-class support for asynchronous invocation via SQS queues. It provides a high-level abstraction for orchestrating Lambda functions, managing event triggers, and handling complex workflows with minimal boilerplate.
async-lambda converts your application into a Serverless Application Model (SAM)
template which can be deployed with the SAM cli tool or via Cloudformation.
Features
- Async Task Abstraction: Define tasks as Lambda functions triggered by SQS, API Gateway, DynamoDB Streams, or scheduled events.
- Automatic SAM Template Generation: Converts your application into a Serverless Application Model (SAM) template for deployment.
- Lane-based Parallelism: Scale async tasks horizontally using multiple SQS queues (lanes) and control concurrency per lane.
- Middleware Support: Register middleware to wrap task execution, enabling cross-cutting concerns (logging, auth, etc.).
- Large Payload Handling: Offload large payloads to S3 automatically when exceeding SQS size limits.
- Dead Letter Queues (DLQ): Robust error handling and message redrive for failed async invocations.
- Configurable via Code and JSON: Set configuration at the app, stage, and task level using Python or config files.
- Task Initialization: Run custom logic during Lambda INIT phase for caching, setup, or resource allocation.
Getting Started
Installation
Add async-lambda to your project (see requirements in pyproject.toml).
Basic Usage
from async_lambda import AsyncLambdaController, config_set_name, ScheduledEvent, ManagedSQSEvent
app = AsyncLambdaController()
config_set_name("project-name")
lambda_handler = app.async_lambda_handler # Required Lambda export
@app.scheduled_task('ScheduledTask1', schedule_expression="rate(15 minutes)")
def scheduled_task_1(event: ScheduledEvent):
app.async_invoke("AsyncTask1", payload={"foo": "bar"})
@app.async_task('AsyncTask1')
def async_task_2(event: ManagedSQSEvent):
print(event.payload) # {"foo": "bar"}
Packaging & Deployment
Use the async-lambda CLI to build and package your app:
async-lambda build app --stage <stage-name>
This generates a SAM template (template.json) and deployment bundle (deployment.zip). Deploy using AWS SAM CLI or CloudFormation.
Core Concepts
Controller & Tasks
- AsyncLambdaController: Central orchestrator for registering tasks, managing middleware, and handling invocations.
- Task: Each task is a Lambda function with a unique
task_idand a trigger type (SQS, API, DynamoDB, schedule).
Task Decorators
All task decorators accept common configuration arguments:
memory: Memory allocation (MB)timeout: Timeout (seconds)ephemeral_storage: Ephemeral storage (MB)maximum_concurrency: Max concurrency for SQS triggers (int or list per lane)lane_count: Number of parallel lanes (for async tasks)init_tasks: Functions to run during Lambda INIT phase
Example: Async Task
@app.async_task("TaskID")
def async_task(event: ManagedSQSEvent):
print(event.payload)
It is quite easy to get into infinite looping situations when utilizing async-lambda and care should be taken.
INFINITE LOOP EXAMPLE
# If task_1 where to ever get invoked, then it would start an infinite loop with
# task 1 invoking task 2, task 2 invoking task 1, and repeat...
@app.async_task("Task1")
def task_1(event: ManagedSQSEvent):
app.async_invoke("Task2", {})
@app.async_task("Task2")
def task_1(event: ManagedSQSEvent):
app.async_invoke("Task1", {})
Example: Scheduled Task
@app.scheduled_task("TaskID", schedule_expression='rate(15 minutes)')
def scheduled_task(event: ScheduledEvent):
...
Example: API Task
@app.api_task("TaskID", path='/test', method='get')
def api_task(event: APIEvent):
print(event.headers)
print(event.querystring_params)
print(event.body)
Example: Unmanaged SQS Task
@app.sqs_task("TaskID", queue_arn='queue-arn')
def sqs_task(event: UnmanagedSQSEvent):
print(event.body)
Lanes & Concurrency
Async tasks can be scaled horizontally using lanes. Each lane is a separate SQS queue and can have its own concurrency limit. Lane assignment can be controlled at the controller, sub-controller, or task level.
app = AsyncLambdaController(lane_count=2)
@app.async_task("SwitchBoard")
def switch_board(event: ManagedSQSEvent):
lane = 1 if event.payload['value'] > 50000 else 0
app.async_invoke("ProcessingTask", event.payload, lane=lane)
@app.async_task("ProcessingTask", maximum_concurrency=[10, 2])
def processing_task(event: ManagedSQSEvent):
...
Middleware
Middleware functions wrap task execution and can be used for logging, authentication, or modifying events/responses.
def async_lambda_middleware(event, call_next):
print(f"Invocation Payload: {event}")
result = call_next(event)
print(f"Invocation Result: {result}")
return result
controller = AsyncLambdaController(middleware=[([BaseEvent], async_lambda_middleware)])
If there are multiple middleware functions then call_next will actually be calling the next middleware function in the stack.
For example if there is middleware functions A and B registered in that order.
Then the execution order would go:
A(Pre) -> B(Pre) -> Task -> B(Post) -> A(Post)
Large Payloads
If a payload exceeds SQS size limits, async-lambda automatically stores it in S3 and passes a reference key to the Lambda function.
Dead Letter Queues (DLQ)
All async tasks share a DLQ for failed messages. You can configure custom DLQ tasks for advanced error handling.
async-lambda config
Configuration options can be set with the .async_lambda/config.json file.
The configuration options can be set at the app, stage, and task level. A configuration option set
will apply unless overridden at a more specific level (app -> stage -> task -> stage).
The override logic attempts to be non-destructive so if you have a layers of ['layer_1'] at the app level,
and [layer_2] at the stage level, then the value will be ['layer_1', 'layer_2'].
Config file levels schema
{
# APP LEVEL
"stages": {
"stage_name": {
# STAGE LEVEL
}
},
"tasks": {
"task_id": {
# TASK LEVEL
"stages": {
"stage_name": {
# TASK STAGE LEVEL
}
}
}
}
}
At any of these levels any of the configuration options can be set:
With the exception of domain_name, tls_version, and certificate_arn which can not be set at the task level.
environment_variables
{
"ENV_VAR_NAME": "ENV_VAR_VALUE"
}
This config value will set environment variables for the function execution. These environment variables will also be available during build time.
The value is passed to the Environment property on SAM::Serverless::Function
policies
[
'IAM_POLICY_ARN' | STATEMENT
]
Use this config option to attach any arbitrary policies to the lambda functions execution role.
The value is passed to the Policies property on SAM::Serverless::Function, in addition to the async-lambda created inline policies.
layers
[
"LAYER_ARN"
]
Use this config option to add any arbitrary lambda layers to the lambda functions. Ordering matters, and merging is done thru concatenation.
The value is passed to the Layers property on SAM::Serverless::Function
subnet_ids
[
"SUBNET_ID"
]
Use this config option to put the lambda function into a vpc/subnet.
The value is passed into the SubnetIds field of the VpcConfig property on SAM::Serverless::Function
security_group_ids
[
"SECURITY_GROUP_ID"
]
Use this config option to attach a security group to the lambda function.
The value is passed into the SecurityGroupIds field of the VpcConfig property on SAM::Serverless::Function
managed_queue_extras
[
{
# Cloudformation resource
}
]
Use this config option to add extra resources for managed SQS queues (async_task tasks.)
For example this might be used to attach alarms to these queues.
Each item in the list should be a complete cloudformation resource. async-lambda provides a few custom substitutions
so that you can reference the extras and the associated managed sqs resource by LogicalId.
$QUEUEIDwill be replaced with theLogicalIdof the associated Managed SQS queue.$EXTRA<index>will be replaced with theLogicalIdof the extra at the specified index.
method_settings
This config value can only be set at the app or stage level.
[
{...}
]
If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.
The value is passed into the MethodSettings property of the AWS::Serverless::Api. The spec for MethodSetting can be found here.
domain_name
This config value can only be set at the app or stage level.
"domain_name"
If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.
This config value will set the DomainName field of the Domain property
tls_version
This config value can only be set at the app or stage level.
"tls_version"
If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.
This config value will set the SecurityPolicy field of the Domain property
Possible values are TLS_1_0 and TLS_1_2
certificate_arn
This config value can only be set at the app or stage level.
"certificate_arn"
If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.
This config value will set the CertificateArn field of the Domain property
hosted_zone_id
This config value can only be set at the app or stage level.
"hosted_zone_id"
If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.
This config value will set the HostedZoneId field of the Route53 property of the Domain property.
This will create a DNS record on the given hosted zone for the api gateway endpoint created by the SAM deployment.
tags
{
"TAG_NAME": "TAG_VALUE"
}
This config value will set the Tags field of all resources created by async-lambda. This will not set the field on managed_queue_extras resources.
The keys framework and framework-version will always be set and the system values will override any values set by the user.
For managed queues the tags async-lambda-queue-type will be set to dlq, dlq-task, or managed depending on the queue type.
For async_task queues (non dlq-task) the async-lambda-lane will be set.
logging_config
{
"ApplicationLogLevel": "TRACE" | "DEBUG" | "INFO" | "WARN" | "ERROR" | "FATAL",
"LogFormat": "Text" | "JSON",
"LogGroup": "",
"SystemLogLevel": "DEBUG" | "INFO" | "WARN"
}
The value is passed directly to the LoggingConfig Cloudformation parameter for lambda function/s.
See above for full details on configuration schema and merging behavior.
Advanced Usage
Task Initialization
Use the init_tasks argument to run setup logic during Lambda INIT phase. This is useful for caching, resource allocation, or one-time setup.
def setup_cache(task_id):
...
@app.async_task("TaskID", init_tasks=[setup_cache])
def async_task(event: ManagedSQSEvent):
...
Defer Utility
The Defer class allows you to cache values during INIT and only execute a function when its value is requested.
cache = Defer(get_a_value, 10, 100)
@app.async_task("Task", init_tasks=[cache.execute])
def task(event: ManagedSQSEvent):
for i in range(cache.value):
...
Known Limitations
- Not all Lambda configuration options are supported (see code for extension points)
- Payloads must be JSON serializable
- Infinite loops are possible if tasks invoke each other recursively
Project Structure
async_lambda/: Core framework codecontroller.py: Main controller and orchestration logicmodels/: Event, response, and task modelsmiddleware.py: Middleware registration and executionclient.py,env.py,config.py: AWS clients and configurationdefer.py: Defer utility for cachingpayload_encoder.py,util.py: Utilities
example/: Example usage and sample appscripts/: Linting and testing scriptstest/: Unit tests
License
See LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file async_lambda_unstable-0.5.9.tar.gz.
File metadata
- Download URL: async_lambda_unstable-0.5.9.tar.gz
- Upload date:
- Size: 7.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e0757ba0e15333088e56bcc613b21e308a2e66a45786a287546731040295cfcd
|
|
| MD5 |
06550ac8b56c5533cd2614130daef52b
|
|
| BLAKE2b-256 |
fd250aa800018bd326a846e8df92382470d4c6ec7403fe768a99074d825eaef9
|
File details
Details for the file async_lambda_unstable-0.5.9-py2.py3-none-any.whl.
File metadata
- Download URL: async_lambda_unstable-0.5.9-py2.py3-none-any.whl
- Upload date:
- Size: 6.2 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bbc51ba7446778b9907c1e177c9415fa77728dd32315f33ba6954aafcb180b33
|
|
| MD5 |
9ec322fc4c5c510d1aa7177df134cca1
|
|
| BLAKE2b-256 |
bdbc8b66b97a6c8ebef1a104c28c9a8af091d853d041d35fec9fcfe3ee8f52e7
|