Skip to main content

A framework for creating AWS Lambda Async Workflows. - Unstable Branch

Project description

async-lambda

Async-lambda is a Python framework for building scalable, event-driven AWS Lambda applications with first-class support for asynchronous invocation via SQS queues. It provides a high-level abstraction for orchestrating Lambda functions, managing event triggers, and handling complex workflows with minimal boilerplate.

async-lambda converts your application into a Serverless Application Model (SAM) template which can be deployed with the SAM cli tool or via Cloudformation.

Features

  • Async Task Abstraction: Define tasks as Lambda functions triggered by SQS, API Gateway, DynamoDB Streams, or scheduled events.
  • Automatic SAM Template Generation: Converts your application into a Serverless Application Model (SAM) template for deployment.
  • Lane-based Parallelism: Scale async tasks horizontally using multiple SQS queues (lanes) and control concurrency per lane.
  • Middleware Support: Register middleware to wrap task execution, enabling cross-cutting concerns (logging, auth, etc.).
  • Large Payload Handling: Offload large payloads to S3 automatically when exceeding SQS size limits.
  • Dead Letter Queues (DLQ): Robust error handling and message redrive for failed async invocations.
  • Configurable via Code and JSON: Set configuration at the app, stage, and task level using Python or config files.
  • Task Initialization: Run custom logic during Lambda INIT phase for caching, setup, or resource allocation.

Getting Started

Installation

Add async-lambda to your project (see requirements in pyproject.toml).

Basic Usage

from async_lambda import AsyncLambdaController, config_set_name, ScheduledEvent, ManagedSQSEvent

app = AsyncLambdaController()
config_set_name("project-name")
lambda_handler = app.async_lambda_handler  # Required Lambda export

@app.scheduled_task('ScheduledTask1', schedule_expression="rate(15 minutes)")
def scheduled_task_1(event: ScheduledEvent):
    app.async_invoke("AsyncTask1", payload={"foo": "bar"})

@app.async_task('AsyncTask1')
def async_task_2(event: ManagedSQSEvent):
    print(event.payload)  # {"foo": "bar"}

Packaging & Deployment

Use the async-lambda CLI to build and package your app:

async-lambda build app --stage <stage-name>

This generates a SAM template (template.json) and deployment bundle (deployment.zip). Deploy using AWS SAM CLI or CloudFormation.

Core Concepts

Controller & Tasks

  • AsyncLambdaController: Central orchestrator for registering tasks, managing middleware, and handling invocations.
  • Task: Each task is a Lambda function with a unique task_id and a trigger type (SQS, API, DynamoDB, schedule).

Task Decorators

All task decorators accept common configuration arguments:

  • memory: Memory allocation (MB)
  • timeout: Timeout (seconds)
  • ephemeral_storage: Ephemeral storage (MB)
  • maximum_concurrency: Max concurrency for SQS triggers (int or list per lane)
  • lane_count: Number of parallel lanes (for async tasks)
  • init_tasks: Functions to run during Lambda INIT phase

Example: Async Task

@app.async_task("TaskID")
def async_task(event: ManagedSQSEvent):
    print(event.payload)

It is quite easy to get into infinite looping situations when utilizing async-lambda and care should be taken.

INFINITE LOOP EXAMPLE

# If task_1 where to ever get invoked, then it would start an infinite loop with
# task 1 invoking task 2, task 2 invoking task 1, and repeat...

@app.async_task("Task1")
def task_1(event: ManagedSQSEvent):
    app.async_invoke("Task2", {})

@app.async_task("Task2")
def task_1(event: ManagedSQSEvent):
    app.async_invoke("Task1", {})

Example: Scheduled Task

@app.scheduled_task("TaskID", schedule_expression='rate(15 minutes)')
def scheduled_task(event: ScheduledEvent):
    ...

Example: API Task

@app.api_task("TaskID", path='/test', method='get')
def api_task(event: APIEvent):
    print(event.headers)
    print(event.querystring_params)
    print(event.body)

Example: Unmanaged SQS Task

@app.sqs_task("TaskID", queue_arn='queue-arn')
def sqs_task(event: UnmanagedSQSEvent):
    print(event.body)

Lanes & Concurrency

Async tasks can be scaled horizontally using lanes. Each lane is a separate SQS queue and can have its own concurrency limit. Lane assignment can be controlled at the controller, sub-controller, or task level.

app = AsyncLambdaController(lane_count=2)

@app.async_task("SwitchBoard")
def switch_board(event: ManagedSQSEvent):
    lane = 1 if event.payload['value'] > 50000 else 0
    app.async_invoke("ProcessingTask", event.payload, lane=lane)

@app.async_task("ProcessingTask", maximum_concurrency=[10, 2])
def processing_task(event: ManagedSQSEvent):
    ...

Middleware

Middleware functions wrap task execution and can be used for logging, authentication, or modifying events/responses.

def async_lambda_middleware(event, call_next):
    print(f"Invocation Payload: {event}")
    result = call_next(event)
    print(f"Invocation Result: {result}")
    return result

controller = AsyncLambdaController(middleware=[([BaseEvent], async_lambda_middleware)])

If there are multiple middleware functions then call_next will actually be calling the next middleware function in the stack.

For example if there is middleware functions A and B registered in that order. Then the execution order would go:

A(Pre) -> B(Pre) -> Task -> B(Post) -> A(Post)

Large Payloads

If a payload exceeds SQS size limits, async-lambda automatically stores it in S3 and passes a reference key to the Lambda function.

Dead Letter Queues (DLQ)

All async tasks share a DLQ for failed messages. You can configure custom DLQ tasks for advanced error handling.

async-lambda config

Configuration options can be set with the .async_lambda/config.json file. The configuration options can be set at the app, stage, and task level. A configuration option set will apply unless overridden at a more specific level (app -> stage -> task -> stage). The override logic attempts to be non-destructive so if you have a layers of ['layer_1'] at the app level, and [layer_2] at the stage level, then the value will be ['layer_1', 'layer_2'].

Config file levels schema

{
    # APP LEVEL
    "stages": {
        "stage_name": {
            # STAGE LEVEL
        }
    },
    "tasks": {
        "task_id": {
            # TASK LEVEL
            "stages": {
                "stage_name": {
                    # TASK STAGE LEVEL
                }
            }
        }
    }
}

At any of these levels any of the configuration options can be set: With the exception of domain_name, tls_version, and certificate_arn which can not be set at the task level.

environment_variables

{
    "ENV_VAR_NAME": "ENV_VAR_VALUE"
}

This config value will set environment variables for the function execution. These environment variables will also be available during build time.

The value is passed to the Environment property on SAM::Serverless::Function

policies

[
    'IAM_POLICY_ARN' | STATEMENT
]

Use this config option to attach any arbitrary policies to the lambda functions execution role.

The value is passed to the Policies property on SAM::Serverless::Function, in addition to the async-lambda created inline policies.

layers

[
    "LAYER_ARN"
]

Use this config option to add any arbitrary lambda layers to the lambda functions. Ordering matters, and merging is done thru concatenation.

The value is passed to the Layers property on SAM::Serverless::Function

subnet_ids

[
    "SUBNET_ID"
]

Use this config option to put the lambda function into a vpc/subnet.

The value is passed into the SubnetIds field of the VpcConfig property on SAM::Serverless::Function

security_group_ids

[
    "SECURITY_GROUP_ID"
]

Use this config option to attach a security group to the lambda function.

The value is passed into the SecurityGroupIds field of the VpcConfig property on SAM::Serverless::Function

managed_queue_extras

[
    {
        # Cloudformation resource
    }
]

Use this config option to add extra resources for managed SQS queues (async_task tasks.)

For example this might be used to attach alarms to these queues.

Each item in the list should be a complete cloudformation resource. async-lambda provides a few custom substitutions so that you can reference the extras and the associated managed sqs resource by LogicalId.

  • $QUEUEID will be replaced with the LogicalId of the associated Managed SQS queue.
  • $EXTRA<index> will be replaced with the LogicalId of the extra at the specified index.

method_settings

This config value can only be set at the app or stage level.

[
    {...}
]

If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.

The value is passed into the MethodSettings property of the AWS::Serverless::Api. The spec for MethodSetting can be found here.

domain_name

This config value can only be set at the app or stage level.

"domain_name"

If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.

This config value will set the DomainName field of the Domain property

tls_version

This config value can only be set at the app or stage level.

"tls_version"

If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.

This config value will set the SecurityPolicy field of the Domain property

Possible values are TLS_1_0 and TLS_1_2

certificate_arn

This config value can only be set at the app or stage level.

"certificate_arn"

If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.

This config value will set the CertificateArn field of the Domain property

hosted_zone_id

This config value can only be set at the app or stage level.

"hosted_zone_id"

If your async-lambda app contains any api_task tasks, then a AWS::Serverless::Api resource is created.

This config value will set the HostedZoneId field of the Route53 property of the Domain property.

This will create a DNS record on the given hosted zone for the api gateway endpoint created by the SAM deployment.

tags

{
    "TAG_NAME": "TAG_VALUE"
}

This config value will set the Tags field of all resources created by async-lambda. This will not set the field on managed_queue_extras resources.

The keys framework and framework-version will always be set and the system values will override any values set by the user.

For managed queues the tags async-lambda-queue-type will be set to dlq, dlq-task, or managed depending on the queue type.

For async_task queues (non dlq-task) the async-lambda-lane will be set.

logging_config

{
    "ApplicationLogLevel": "TRACE" | "DEBUG" | "INFO" | "WARN" | "ERROR" | "FATAL",
    "LogFormat": "Text" | "JSON",
    "LogGroup": "",
    "SystemLogLevel": "DEBUG" | "INFO" | "WARN"
}

The value is passed directly to the LoggingConfig Cloudformation parameter for lambda function/s.

See above for full details on configuration schema and merging behavior.

Advanced Usage

Task Initialization

Use the init_tasks argument to run setup logic during Lambda INIT phase. This is useful for caching, resource allocation, or one-time setup.

def setup_cache(task_id):
    ...

@app.async_task("TaskID", init_tasks=[setup_cache])
def async_task(event: ManagedSQSEvent):
    ...

Defer Utility

The Defer class allows you to cache values during INIT and only execute a function when its value is requested.

cache = Defer(get_a_value, 10, 100)

@app.async_task("Task", init_tasks=[cache.execute])
def task(event: ManagedSQSEvent):
    for i in range(cache.value):
        ...

Known Limitations

  • Not all Lambda configuration options are supported (see code for extension points)
  • Payloads must be JSON serializable
  • Infinite loops are possible if tasks invoke each other recursively

Project Structure

  • async_lambda/: Core framework code
    • controller.py: Main controller and orchestration logic
    • models/: Event, response, and task models
    • middleware.py: Middleware registration and execution
    • client.py, env.py, config.py: AWS clients and configuration
    • defer.py: Defer utility for caching
    • payload_encoder.py, util.py: Utilities
  • example/: Example usage and sample app
  • scripts/: Linting and testing scripts
  • test/: Unit tests

License

See LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_lambda_unstable-0.6.7.tar.gz (42.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_lambda_unstable-0.6.7-py2.py3-none-any.whl (51.2 kB view details)

Uploaded Python 2Python 3

File details

Details for the file async_lambda_unstable-0.6.7.tar.gz.

File metadata

File hashes

Hashes for async_lambda_unstable-0.6.7.tar.gz
Algorithm Hash digest
SHA256 477e658b670c281816bfb4bdf4c62c26a78b260ffbcf15551ba6644430ad85cb
MD5 3687e5446c8fb5c3d924e599832fa16f
BLAKE2b-256 79ad8fc97deb3f3fc9326b738bbde3f7dde55c2c64571be7c2dda1e5f58f42a2

See more details on using hashes here.

File details

Details for the file async_lambda_unstable-0.6.7-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for async_lambda_unstable-0.6.7-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 1aaac3f59b3ef7526420f7754421fbe06b7d2a2a6d87d610db97a3bd6c3f7670
MD5 dfe97c7d8104cdbc7059c4f51e3f3a9d
BLAKE2b-256 c3d14c813e5b6f6808b47cda7cff3dd81daf6f98c40ab944b411c5d12074411c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page