Create stream processors with AWS Lambda functions
Project description
aws-lambda-stream
aws-lambda-stream helps you build AWS Lambda functions that react to events and coordinate
work across AWS services.
Use it when a Lambda receives a batch of records from Kinesis, DynamoDB Streams, S3, SNS, SQS, or EventBridge, and you need to:
- decide which business rules should run for each event;
- execute tasks;
- publish new domain events;
- store events for later correlation;
- update DynamoDB materialized views;
- write objects to S3 or messages to SNS/SQS;
- keep processing the rest of the batch when one record fails.
Instead of writing one large Lambda handler with nested loops, filters, try/except blocks, AWS calls, and batch handling, you define small rules and run them through a stream pipeline.
The library uses ReactiveX for Python under the hood and is inspired by jgilbert01/aws-lambda-stream.
What Problem Does It Solve?
In an event-driven system, one incoming event often triggers several independent reactions.
For example, when an order-created event arrives, you may want to:
- save the event in DynamoDB;
- correlate it with
payment-authorized; - emit
order-readyonly when both events exist; - materialize an order read model;
- write a JSON snapshot to S3;
- notify another system through SNS.
With aws-lambda-stream, each reaction is a rule:
rules = [
collect_order_events,
correlate_order,
evaluate_order_ready,
materialize_order,
write_order_snapshot,
notify_order_ready,
]
The Lambda handler stays small:
StreamPipeline(
initialize_from(rules),
options,
).assemble(
from_kinesis(event),
)
The framework handles event normalization, rule matching, RxPY execution, batching helpers, fault collection, and AWS connector glue.
Installation
With pip:
pip install aws-lambda-stream
With Poetry:
poetry add aws-lambda-stream
Quick Start
from aws_lambda_stream.pipelines import StreamPipeline, initialize_from
from aws_lambda_stream.events.kinesis import from_kinesis
from aws_lambda_stream.flavors.task import task
from aws_lambda_stream.utils.opt import DEFAULT_OPTIONS
def execute(uow, rule):
return {
"handled": True,
"event_id": uow["event"]["id"],
}
rules = [
{
"id": "handle-thing-created",
"flavor": task,
"event_type": "thing-created",
"execute": execute,
},
]
def handler(event, _context):
StreamPipeline(
initialize_from(rules),
DEFAULT_OPTIONS,
).assemble(
from_kinesis(event),
)
Each incoming record becomes a unit of work (uow) shaped roughly like:
{
"pipeline": "handle-thing-created",
"record": {...},
"event": {
"id": "...",
"type": "thing-created",
"timestamp": 1548967022000,
},
}
Lambda Handler Shape
In most Lambdas you do three things:
- Convert the AWS trigger event into UOWs.
- Initialize rules.
- Assemble and run the stream.
from aws_lambda_powertools import Logger
from aws_lambda_stream.pipelines import StreamPipeline, initialize_from
from aws_lambda_stream.events.dynamodb import from_dynamodb
from aws_lambda_stream.utils.opt import DEFAULT_OPTIONS
from .rules import rules
logger = Logger()
def handler(event, context):
options = {
**DEFAULT_OPTIONS,
"logger": logger,
"bus_name": "my-event-bus",
}
StreamPipeline(
initialize_from(rules),
options,
).assemble(
from_dynamodb(event),
)
DEFAULT_OPTIONS provides:
logger: standard Python root logger.bus_name:BUS_NAMEfrom the environment.publish: EventBridge publisher.
You can override any of them per Lambda.
Rules And Flavors
Rules are dictionaries. initialize_from(rules) turns them into executable pipelines.
Common rule fields:
id: unique rule/pipeline id.flavor: a callable that receives the rule and returns an RxPY operator.event_type: string, list, regex, or callable used to match events.filters: optional list of content predicates.publish: publisher function, usually fromDEFAULT_OPTIONS.logger: optional logger. If omitted, a standard pipeline logger is configured.
Built-in flavors include:
task: execute business logic and optionally emit an event.collect: store events for later correlation.correlate: build correlation records.evaluate: evaluate collected/correlated events and emit higher-order events.materialize: update materialized views.cdc: publish change-data-capture events.s3,sns,elasticsearch: write to external destinations.expired: emit expiration events from DynamoDB TTL removals.update: query/get/update DynamoDB records.
Filtering Events
Most flavors start by checking event_type and then optional content filters:
{
"id": "collect-thing-created",
"flavor": collect,
"event_type": "thing-created",
"filters": [
lambda uow, rule: uow["event"]["thing"]["status"] == "active",
],
}
event_type can be any of these:
"thing-created"
["thing-created", "thing-updated"]
re.compile(r"thing-(created|updated)$")
lambda uow: uow["event"]["type"].startswith("thing-")
filters receive (uow, rule) and all filters must return True.
Task Flavor
Use task when you want to run business logic for matching events.
from aws_lambda_stream.flavors.task import task
def execute(uow, rule):
return {
"thing_id": uow["event"]["thing"]["id"],
"processed": True,
}
rules = [
{
"id": "process-thing",
"flavor": task,
"event_type": "thing-created",
"execute": execute,
},
]
The result is stored in uow["result"] by default. Use result_key to change it:
{
"id": "process-thing",
"flavor": task,
"event_type": "thing-created",
"execute": execute,
"result_key": "task_result",
}
Emitting Events From A Task
Add emit to publish a follow-up event.
{
"id": "complete-task",
"flavor": task,
"event_type": "task-requested",
"execute": execute,
"emit": lambda uow, rule, template: {
**template,
"type": "task-completed",
"thing": uow["event"]["thing"],
},
}
template contains a generated id, timestamp, and partition_key.
Return a list from emit to publish multiple events:
"emit": lambda uow, rule, template: [
{
**template,
"type": "first-event",
},
{
**template,
"type": "second-event",
},
]
Collect, Correlate, Evaluate
These flavors are useful for event-driven workflows where one event is not enough.
collect
collect stores raw events in DynamoDB so they can be queried later.
from aws_lambda_stream.flavors.collect import collect
{
"id": "collect-order-events",
"flavor": collect,
"event_type": ["order-created", "payment-authorized"],
"correlation_key": "order.id",
}
If correlation_key is a string, it is read from event. If it is callable, it receives the
UOW:
"correlation_key": lambda uow: uow["event"]["order"]["id"]
correlate
correlate reads collected event table stream records and writes correlation records.
from aws_lambda_stream.flavors.correlate import correlate
{
"id": "correlate-order",
"flavor": correlate,
"event_type": ["order-created", "payment-authorized"],
"correlation_key": "order.id",
}
Use correlation_key_suffix when different workflows share the same entity id:
{
"id": "correlate-payment",
"flavor": correlate,
"event_type": "payment-authorized",
"correlation_key": "order.id",
"correlation_key_suffix": "payment",
}
evaluate
evaluate checks whether enough correlated events exist and emits higher-order events.
from aws_lambda_stream.flavors.evaluate import evaluate
def has_order_and_payment(uow):
types = [event["type"] for event in uow["correlated"]]
return "order-created" in types and "payment-authorized" in types
{
"id": "order-ready",
"flavor": evaluate,
"event_type": ["order-created", "payment-authorized"],
"correlation_key_suffix": "payment",
"expression": has_order_and_payment,
"emit": "order-ready",
}
expression can return:
TrueorFalse.- An event dictionary.
- A list of event dictionaries.
When it returns True, the triggering event is used as the trigger.
CDC Flavor
Use cdc to publish a new event from a DynamoDB stream record.
from aws_lambda_stream.flavors.cdc import cdc
{
"id": "thing-cdc",
"flavor": cdc,
"event_type": "THING-created",
"to_event": lambda uow: {
"type": "thing-created",
"thing": uow["event"]["raw"]["new"],
},
}
to_event receives the UOW and returns fields that are merged into uow["event"] before
publishing.
Materialize Flavor
Use materialize to update a DynamoDB materialized view from an event.
from aws_lambda_stream.flavors.materialize import materialize
from aws_lambda_stream.utils.dynamodb import update_expression
{
"id": "materialize-thing",
"flavor": materialize,
"event_type": "thing-created",
"to_update_request": lambda uow: {
"Key": {
"pk": uow["event"]["thing"]["id"],
"sk": "THING",
},
**update_expression({
"name": uow["event"]["thing"]["name"],
"timestamp": uow["event"]["timestamp"],
}),
},
}
Use split_on when one event should update multiple records:
{
"id": "materialize-order-items",
"flavor": materialize,
"event_type": "order-created",
"split_on": "event.items",
"split_target_field": "item",
"to_update_request": lambda uow: {
"Key": {
"pk": uow["item"]["id"],
"sk": "ITEM",
},
**update_expression({
"order_id": uow["event"]["order"]["id"],
}),
},
}
S3 And SNS Flavors
Use s3 to write derived objects.
from aws_lambda_stream.flavors.s3 import s3
{
"id": "write-order-snapshot",
"flavor": s3,
"event_type": "order-ready",
"bucket_name": "my-snapshot-bucket",
"to_s3": lambda uow: {
"Key": "orders/{}.json".format(uow["event"]["order"]["id"]),
"Body": json.dumps(uow["event"]["order"]).encode("utf-8"),
"ContentType": "application/json",
},
}
Use sns to publish batches to an SNS topic.
from aws_lambda_stream.flavors.sns import sns
{
"id": "notify-order-ready",
"flavor": sns,
"event_type": "order-ready",
"topic_arn": "arn:aws:sns:us-east-1:123456789012:orders",
"to_sns": lambda uow: [
{
"Message": json.dumps(uow["event"]),
},
],
}
Expired Flavor
Use expired with DynamoDB TTL stream removals. If an item has ttl and expire, the flavor
emits an expiration event.
from aws_lambda_stream.flavors.expired import expired
{
"id": "expired-events",
"flavor": expired,
}
If expire is True, the emitted type is derived from the original event:
thing.createdbecomesthing.created.expiredthing-createdbecomesthing-created-expired
If expire is a string, that string is used as the emitted type:
{
"ttl": 1548967022,
"expire": "thing-timeout",
"event": {
"id": "thing-1",
"type": "thing-created",
"timestamp": 1548967022000,
},
}
Update Flavor
Use update when an event needs to query records, optionally split results, and update each
target.
from aws_lambda_stream.flavors.update import update
from aws_lambda_stream.utils.dynamodb import update_expression
{
"id": "update-related-things",
"flavor": update,
"event_type": "thing-renamed",
"to_query_request": lambda uow, rule: {
"IndexName": "DataIndex",
"KeyConditionExpression": "#data = :data",
"ExpressionAttributeNames": {
"#data": "data",
},
"ExpressionAttributeValues": {
":data": uow["event"]["thing"]["id"],
},
},
"split_on": "query_response",
"split_target_field": "target",
"to_get_request": lambda uow, rule: {
"Keys": [
{
"pk": uow["target"]["pk"],
"sk": uow["target"]["sk"],
},
],
},
"to_update_request": lambda uow, rule: {
"Key": {
"pk": uow["target"]["pk"],
"sk": uow["target"]["sk"],
},
**update_expression({
"thing_name": uow["event"]["thing"]["name"],
}),
},
}
Use to_fallback_update_request when an update can legitimately return {} and you want to
try a second request.
Event Sources
Helpers normalize AWS event payloads:
from aws_lambda_stream.events.kinesis import from_kinesis
from aws_lambda_stream.events.dynamodb import from_dynamodb
from aws_lambda_stream.events.s3 import from_s3
from aws_lambda_stream.events.sns import from_sns
from aws_lambda_stream.events.sqs import from_sqs
Test helpers are also available, for example to_kinesis_records(...),
to_dynamodb_records(...), and to_s3_records(...).
Example local test input:
from aws_lambda_stream.events.kinesis import to_kinesis_records, from_kinesis
event = to_kinesis_records([
{
"type": "thing-created",
"timestamp": 1548967022000,
"partition_key": "thing-1",
"thing": {
"id": "thing-1",
},
},
])
uows = from_kinesis(event)
Concurrency
StreamPipeline runs pipelines concurrently by default:
StreamPipeline(pipelines, options, concurrency=True)
You can disable concurrency for deterministic local tests:
StreamPipeline(pipelines, options, concurrency=False)
Concurrent pipelines wait for each rule to complete before shutting down their scheduler, so
RxPY operators such as flat_map can safely emit nested observables.
Unit Of Work (UOW)
A Unit Of Work, usually called uow, is the dictionary that moves through the pipeline.
Every flavor receives a stream of UOWs and may return the same UOW enriched with new fields.
The base shape is intentionally small:
{
"record": {...},
"event": {
"id": "event-id",
"type": "thing-created",
"timestamp": 1548967022000,
"tags": {...},
},
}
Common fields:
record: the original AWS record. It is kept so failures can be inspected or resubmitted.event: the normalized domain event used by filters and flavors.event.id: stable event id.event.type: event type used byevent_typefilters.event.timestamp: event timestamp in milliseconds.event.tags: optional metadata such as region, source, pipeline, or test skip tags.
Everything else depends on the source event or your domain payload.
For example, an event from an event hub may look like:
{
"record": {...},
"event": {
"id": "evt-1",
"type": "thing-created",
"timestamp": 1548967022000,
"tags": {...},
"thing": {
"id": "thing-1",
"name": "Thing One",
},
},
}
A UOW created from DynamoDB Streams includes source-specific data under event.raw:
{
"record": {...},
"event": {
"id": "3",
"type": "EVENT-created",
"timestamp": 1548967022000,
"tags": {
"region": "us-west-2",
},
"raw": {
"new": {...},
"old": None,
},
},
}
When StreamPipeline runs a rule, it also adds pipeline to the UOW so downstream operators
know which rule is processing it:
{
"pipeline": "rule-id",
"record": {...},
"event": {...},
}
Fault Handling
Use faulty(...) around functions that may fail for a single record:
from aws_lambda_stream.utils.faults import faulty
from aws_lambda_stream.utils.operators import rx_map
source.pipe(
rx_map(faulty(lambda uow: do_work(uow))),
)
Expected per-record failures are collected as fault events and processing continues with the
next record. Unexpected RxPY on_error errors remain terminal.
Writing A Custom Flavor
A flavor receives a rule and returns an RxPY operator.
from reactivex import Observable, operators as ops
from aws_lambda_stream.utils.filters import on_event_type, on_content
from aws_lambda_stream.utils.operators import rx_filter, rx_map
from aws_lambda_stream.utils.print import print_start_pipeline, print_end_pipeline
def my_flavor(rule):
def wrapper(source: Observable):
return source.pipe(
rx_filter(on_event_type(rule)),
ops.do_action(print_start_pipeline(rule)),
rx_filter(on_content(rule)),
rx_map(lambda uow: {
**uow,
"custom": True,
}),
ops.do_action(print_end_pipeline(rule)),
)
return wrapper
Use ops.do_action(...) for logging or side effects that should not transform the UOW.
Use rx_map(...) and rx_filter(...) when you want per-record failures to be collected as
faults and processing to continue.
Local Testing
You can run a pipeline directly in a unit test:
from aws_lambda_stream.pipelines import StreamPipeline, initialize_from
from aws_lambda_stream.events.kinesis import from_kinesis, to_kinesis_records
from aws_lambda_stream.utils.opt import DEFAULT_OPTIONS
def test_pipeline():
collected = []
event = to_kinesis_records([
{
"type": "thing-created",
"timestamp": 1,
"partition_key": "thing-1",
},
])
StreamPipeline(
initialize_from(rules),
DEFAULT_OPTIONS,
concurrency=False,
).assemble(
from_kinesis(event),
on_next=lambda pipeline_id, uow: collected.append((pipeline_id, uow)),
)
assert len(collected) == 1
Use concurrency=False when you want stable ordering in tests.
Project Templates
The repository includes Serverless Framework templates:
event-hubevent-lake-s3event-fault-monitorrest-bff-servicecontrol-service
Create a project from a template:
sls create \
--template-url https://github.com/clandro89/aws-lambda-stream/tree/master/templates/event-hub \
--path myprefix-event-hub
Development
Run tests:
poetry run pytest
Build:
poetry build
Publish:
poetry publish
Credits
License
This library is licensed under the MIT License. See the LICENSE file.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aws_lambda_stream-1.4.0.tar.gz.
File metadata
- Download URL: aws_lambda_stream-1.4.0.tar.gz
- Upload date:
- Size: 31.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.13.3 Darwin/25.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
63b7d76bc969e9d80abe00bf49e7aeca7275e576d9263d24b4f36d1cec4e0cf4
|
|
| MD5 |
147e8aebaf26196f8ef0747a06df8739
|
|
| BLAKE2b-256 |
0b5ae297f5a5201b11d11a376703e0bd155f4d86fde4a7bfe299f2c503d27b65
|
File details
Details for the file aws_lambda_stream-1.4.0-py3-none-any.whl.
File metadata
- Download URL: aws_lambda_stream-1.4.0-py3-none-any.whl
- Upload date:
- Size: 48.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.13.3 Darwin/25.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
27feb5946de1aad240cef654d1ef03e7655dec489f5654ca1d7737f6d4a28391
|
|
| MD5 |
981de9e5d26c8376e8df79e67efeb5c9
|
|
| BLAKE2b-256 |
37314812dab0295749d9591ccdbd0c5813e8f0fb4724427c1d47894c1d6a2a00
|