A framework for creating AWS Lambda Async Workflows. - Unstable Branch
Project description
async-lambda
async-lambda
is a framework for creating AWS Lambda
applications with built
in support for asynchronous invocation via a SQS Queue. This is useful if you
have workloads you need to split up into separate execution contexts.
async-lambda
converts your application into a Serverless Application Model (SAM)
template which can be deployed with the SAM
cli tool.
An async-lambda
application is comprised of a controller
object and tasks
.
import json
from async_lambda import AsyncLambdaController, ScheduledEvent, ManagedSQSEvent, config_set_name
app = AsyncLambdaController()
config_set_name("project-name")
lambda_handler = app.async_lambda_handler # This "export" is required for lambda.
@app.scheduled_task('ScheduledTask1', schedule_expression="rate(15 minutes)")
def scheduled_task_1(event: ScheduledEvent):
app.async_invoke("AsyncTask1", payload={"foo": "bar"}) # Payload must be JSON serializable and < 2560Kb
@app.async_task('AsyncTask1')
def async_task_2(event: ManagedSQSEvent):
print(event.payload) #{"foo": "bar"}
When the app is packaged for lambda, only the main module, and the vendor
and src
directories are included.
Tasks
The core abstraction in async-lambda
is a task
. Each task will result in a separate lambda function.
Tasks have a trigger_type
which determines what event triggers them. A task is identified by its unique task_id
.
All task decorators share common arguments for configuring the underlying lambda function:
memory: int = 128
Sets the memory allocation for the function.timeout: int = 60
Sets the timeout for the function (max 900 seconds).ephemeral_storage: int = 512
Sets the ephemeral storage allocation for the function.
Async Task
All async tasks have a matching SQS queue which the lambda function consumes from (1 message per invocation).
All async task queues share a DLQ. Async tasks can be invoked from anywhere within the app by using the
AsyncLambdaController.async_invoke
method. Calling this method sends a message into the queue for the given task.
The task function should have a single parameter of the ManagedSQSEvent
type.
app = AsyncLambdaController()
@app.async_task("TaskID")
def async_task(event: ManagedSQSEvent):
event.payload # payload sent via the `async_invoke` method
event.source_task_id # the task_id where the event originated
It is quite easy to get into infinite looping situations when utilizing async-lambda
and care should be taken.
INFINITE LOOP EXAMPLE
# If task_1 where to ever get invoked, then it would start an infinite loop with
# task 1 invoking task 2, task 2 invoking task 1, and repeat...
@app.async_task("Task1")
def task_1(event: ManagedSQSEvent):
app.async_invoke("Task2", {})
@app.async_task("Task2")
def task_1(event: ManagedSQSEvent):
app.async_invoke("Task1", {})
Unmanaged SQS Task
Unmanaged SQS tasks consume from any arbitrary SQS queue (1 message per invocation).
The task function should have a single parameter of the UnmanagedSQSEvent
type.
app = AsyncLambdaController()
@app.sqs_task("TaskID", queue_arn='queue-arn')
def sqs_task(event: UnmanagedSQSEvent):
event.body # sqs event body
Scheduled Task
Scheduled tasks are triggered by an eventbridge schedule. The schedule expression can be
a cron expression
or a rate expression.
The task function should have a single parameter of the ScheduledEvent
type.
app = AsyncLambdaController()
@app.scheduled_task("TaskID", schedule_expression='rate(15 minutes)')
def scheduled_task(event: ScheduledEvent):
...
API Task
API tasks are triggered by a Web Request. async-lambda
creates an APIGateway endpoint matching the
method
and path
in the task definition. It is possible to configure a custom domain and certificate
for all API tasks within an async-lambda
app.
The task function should have a single parameter of the APIEvent
type.
app = AsyncLambdaController()
@app.api_task("TaskID", path='/test', method='get')
def api_task(event: APIEvent):
event.headers # request headers
event.querystring_params # request querystring params
event.body # request body
Building an async-lambda
app
When the app is packaged for lambda, only the main module, and the vendor
and src
directories are included.
From the project root directory, utilize the async-lambda
CLI tool to generate
a SAM template and function bundle.
# app.py contains the root AsyncLambdaController
async-lambda build app
This will generate a SAM template template.json
as well as an deployment.zip
file.
This template and zip file can then be deployed or transformed into regular cloudformation
via the sam
or aws
cli tools.
Known Limitations
- Lambda Configuration - not all of the lambda configuration spec is present in
async-lambda
. It is relatively trivial to add in configuration options. Make an issue if there is a feature you would like to see implemented. - The
async_invoke
payload must beJSON
serializable withjson.dumps
. - It is possible to get into infinite loops quite easily. (Task A invokes Task B, Task B invokes Task A)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for async-lambda-unstable-0.2.9.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 91c796509464f6de394a879232f8ef0318cebc4e7e218ef31ccbc5979b4c7091 |
|
MD5 | f20421273df7ba99e4a758efbc45533c |
|
BLAKE2b-256 | f68f47b0fc91cb42bbea933f820ae71638e64b3d3a021355083fd7b4759db8c6 |
Hashes for async_lambda_unstable-0.2.9-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b472a3a89812f5ed65d8d548695ab5d905c2cb514d34624bc4a3104178836480 |
|
MD5 | 04ab5b770507e8f7ec7b4691281adbdf |
|
BLAKE2b-256 | ec43d1ce1285a7db0f2044007982d2cce842fdcc05496bb4b0a3882ca81e7e3a |