A framework for creating AWS Lambda Async Workflows. - Unstable Branch
Project description
async-lambda
async-lambda
is a framework for creating AWS Lambda
applications with built
in support for asynchronous invocation via a SQS Queue. This is useful if you
have workloads you need to split up into separate execution contexts.
async-lambda
converts your application into a Serverless Application Model (SAM)
template which can be deployed with the SAM
cli tool.
An async-lambda
application is comprised of a controller
object and tasks
.
import json
from async_lambda import AsyncLambdaController, ScheduledEvent, ManagedSQSEvent, config_set_name
app = AsyncLambdaController()
config_set_name("project-name")
lambda_handler = app.async_lambda_handler # This "export" is required for lambda.
@app.scheduled_task('ScheduledTask1', schedule_expression="rate(15 minutes)")
def scheduled_task_1(event: ScheduledEvent):
app.async_invoke("AsyncTask1", payload={"foo": "bar"}) # Payload must be JSON serializable and < 2560Kb
@app.async_task('AsyncTask1')
def async_task_2(event: ManagedSQSEvent):
print(event.payload) #{"foo": "bar"}
When the app is packaged for lambda, only the main module, and the vendor
and src
directories are included.
Tasks
The core abstraction in async-lambda
is a task
. Each task will result in a separate lambda function.
Tasks have a trigger_type
which determines what event triggers them. A task is identified by its unique task_id
.
All task decorators share common arguments for configuring the underlying lambda function:
memory: int = 128
Sets the memory allocation for the function.timeout: int = 60
Sets the timeout for the function (max 900 seconds).ephemeral_storage: int = 512
Sets the ephemeral storage allocation for the function.maximum_concurrency: Optional[int] = None
Sets the maximum concurrency value for the SQS trigger for the function. (only applies toasync_task
andsqs_task
tasks.)
Async Task
All async tasks have a matching SQS queue which the lambda function consumes from (1 message per invocation).
All async task queues share a DLQ. Async tasks can be invoked from anywhere within the app by using the
AsyncLambdaController.async_invoke
method. Calling this method sends a message into the queue for the given task.
The task function should have a single parameter of the ManagedSQSEvent
type.
app = AsyncLambdaController()
@app.async_task("TaskID")
def async_task(event: ManagedSQSEvent):
event.payload # payload sent via the `async_invoke` method
event.source_task_id # the task_id where the event originated
It is quite easy to get into infinite looping situations when utilizing async-lambda
and care should be taken.
INFINITE LOOP EXAMPLE
# If task_1 where to ever get invoked, then it would start an infinite loop with
# task 1 invoking task 2, task 2 invoking task 1, and repeat...
@app.async_task("Task1")
def task_1(event: ManagedSQSEvent):
app.async_invoke("Task2", {})
@app.async_task("Task2")
def task_1(event: ManagedSQSEvent):
app.async_invoke("Task1", {})
Unmanaged SQS Task
Unmanaged SQS tasks consume from any arbitrary SQS queue (1 message per invocation).
The task function should have a single parameter of the UnmanagedSQSEvent
type.
app = AsyncLambdaController()
@app.sqs_task("TaskID", queue_arn='queue-arn')
def sqs_task(event: UnmanagedSQSEvent):
event.body # sqs event body
Scheduled Task
Scheduled tasks are triggered by an eventbridge schedule. The schedule expression can be
a cron expression
or a rate expression.
The task function should have a single parameter of the ScheduledEvent
type.
app = AsyncLambdaController()
@app.scheduled_task("TaskID", schedule_expression='rate(15 minutes)')
def scheduled_task(event: ScheduledEvent):
...
API Task
API tasks are triggered by a Web Request. async-lambda
creates an APIGateway endpoint matching the
method
and path
in the task definition. It is possible to configure a custom domain and certificate
for all API tasks within an async-lambda
app.
The task function should have a single parameter of the APIEvent
type.
app = AsyncLambdaController()
@app.api_task("TaskID", path='/test', method='get')
def api_task(event: APIEvent):
event.headers # request headers
event.querystring_params # request querystring params
event.body # request body
async-lambda
config
Configuration options can be set with the .async_lambda/config.json
file.
The configuration options can be set at the app, stage, and task level. A configuration option set
will apply unless overridden at a more specific level (app -> stage -> task -> stage).
The override logic attempts to non-destructive so if you have a layers
of ['layer_1']
at the app level,
and [layer_2]
at the stage level, then the value will be ['layer_1', 'layer_2']
.
Config file levels schema
{
# APP LEVEL
"stages": {
"stage_name": {
# STAGE LEVEL
}
},
"tasks": {
"task_id": {
# TASK LEVEL
"stages": {
"stage_name": {
# TASK STAGE LEVEL
}
}
}
}
}
At any of these levels
any of the configuration options can be set:
With the exception of domain_name
, tls_version
, and certificate_arn
which can not be set at the task level.
environment_variables
{
"ENV_VAR_NAME": "ENV_VAR_VALUE"
}
This config value will set environment variables for the function execution. These environment variables will also be available during build time.
The value is passed to the Environment
property on SAM::Serverless::Function
policies
[
'IAM_POLICY_ARN' | STATEMENT
]
Use this config option to attach any arbitrary policies to the lambda functions execution role.
The value is passed to the Policies
property on SAM::Serverless::Function
, in addition to the async-lambda
created inline policies.
layers
[
"LAYER_ARN"
]
Use this config option to add any arbitrary lambda layers to the lambda functions. Ordering matters, and merging is done thru concatenation.
The value is passed to the Layers
property on SAM::Serverless::Function
subnet_ids
[
"SUBNET_ID
]
Use this config option to put the lambda function into a vpc/subnet.
The value is passed into the SubnetIds
field of the VpcConfig
property on SAM::Serverless::Function
security_group_ids
[
"SECURITY_GROUP_ID"
]
Use this config option to attach a security group to the lambda function.
The value is passed into the SecurityGroupIds
field of the VpcConfig
property on SAM::Serverless::Function
managed_queue_extras
[
{
# Cloudformation resource
}
]
Use this config option to add extra resources for managed SQS queues (async_task
tasks.)
For example this might be used to attach alarms to these queues.
Each item in the list should be a complete cloudformation resource. async-lambda
provides a few custom substitutions
so that you can reference the extras and the associated managed sqs resource by LogicalId
.
$QUEUEID"
will be replaced with theLogicalId
of the associated Managed SQS queue.$EXTRA<index>
will be replaced with theLogicalId
of the extra at the specified index.
domain_name
This config value can only be set at the app or stage level.
"domain_name"
If your async-lambda
app contains any api_task
tasks, then a AWS::Serverless::Api
resource is created.
This config value will set the DomainName
field of the Domain
property
tls_version
This config value can only be set at the app or stage level.
"tls_version"
If your async-lambda
app contains any api_task
tasks, then a AWS::Serverless::Api
resource is created.
This config value will set the SecurityPolicy
field of the Domain
property
Possible values are TLS_1_0
and TLS_1_2
certificate_arn
This config value can only be set at the app or stage level.
"certificate_arn"
If your async-lambda
app contains any api_task
tasks, then a AWS::Serverless::Api
resource is created.
This config value will set the CertificateArn
field of the Domain
property
Building an async-lambda
app
When the app is packaged for lambda, only the main module, and the vendor
and src
directories are included.
From the project root directory, utilize the async-lambda
CLI tool to generate
a SAM template and function bundle. Optionally specify the stage
to use stage
specific configs.
# app.py contains the root AsyncLambdaController
async-lambda build app --stage <stage-name>
This will generate a SAM template template.json
as well as an deployment.zip
file.
This template and zip file can then be deployed or transformed into regular cloudformation
via the sam
or aws
cli tools.
Known Limitations
- Lambda Configuration - not all of the lambda configuration spec is present in
async-lambda
. It is relatively trivial to add in configuration options. Make an issue if there is a feature you would like to see implemented. - The
async_invoke
payload must beJSON
serializable withjson.dumps
. - It is possible to get into infinite loops quite easily. (Task A invokes Task B, Task B invokes Task A)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for async-lambda-unstable-0.2.15.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 26b0b46f841f2d958919f9bcc213d77027d8588f174989f8a6c1222df998cd4a |
|
MD5 | 34c561b68a83eecc5817ef60855a0f33 |
|
BLAKE2b-256 | 577341bdc1a3835e9a0d2bb04a838c3d689ebdb308e36dea1a98d86d6980a88b |
Hashes for async_lambda_unstable-0.2.15-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9860407af1a8679e4ab27f61c7a9b2949a1235cc35209014cca62caa616c6692 |
|
MD5 | 8f3aaf3682e803914310918dbb55a3a9 |
|
BLAKE2b-256 | cb94fa398b205616952c4bbdb1d3f5d40a6b10f669ba10f692a1b8a2dd2eae38 |