Skip to main content

Create stream processors with AWS Lambda functions

Project description

aws-lambda-stream

aws-lambda-stream helps you build AWS Lambda functions that react to events and coordinate work across AWS services.

Use it when a Lambda receives a batch of records from Kinesis, DynamoDB Streams, S3, SNS, SQS, or EventBridge, and you need to:

  • decide which business rules should run for each event;
  • execute tasks;
  • publish new domain events;
  • store events for later correlation;
  • update DynamoDB materialized views;
  • write objects to S3 or messages to SNS/SQS;
  • keep processing the rest of the batch when one record fails.

Instead of writing one large Lambda handler with nested loops, filters, try/except blocks, AWS calls, and batch handling, you define small rules and run them through a stream pipeline.

The library uses ReactiveX for Python under the hood and is inspired by jgilbert01/aws-lambda-stream.

What Problem Does It Solve?

In an event-driven system, one incoming event often triggers several independent reactions.

For example, when an order-created event arrives, you may want to:

  • save the event in DynamoDB;
  • correlate it with payment-authorized;
  • emit order-ready only when both events exist;
  • materialize an order read model;
  • write a JSON snapshot to S3;
  • notify another system through SNS.

With aws-lambda-stream, each reaction is a rule:

rules = [
    collect_order_events,
    correlate_order,
    evaluate_order_ready,
    materialize_order,
    write_order_snapshot,
    notify_order_ready,
]

The Lambda handler stays small:

StreamPipeline(
    initialize_from(rules),
    options,
).assemble(
    from_kinesis(event),
)

The framework handles event normalization, rule matching, RxPY execution, batching helpers, fault collection, and AWS connector glue.

Installation

With pip:

pip install aws-lambda-stream

With Poetry:

poetry add aws-lambda-stream

Quick Start

from aws_lambda_stream.pipelines import StreamPipeline, initialize_from
from aws_lambda_stream.events.kinesis import from_kinesis
from aws_lambda_stream.flavors.task import task
from aws_lambda_stream.utils.opt import DEFAULT_OPTIONS


def execute(uow, rule):
    return {
        "handled": True,
        "event_id": uow["event"]["id"],
    }


rules = [
    {
        "id": "handle-thing-created",
        "flavor": task,
        "event_type": "thing-created",
        "execute": execute,
    },
]


def handler(event, _context):
    StreamPipeline(
        initialize_from(rules),
        DEFAULT_OPTIONS,
    ).assemble(
        from_kinesis(event),
    )

Each incoming record becomes a unit of work (uow) shaped roughly like:

{
    "pipeline": "handle-thing-created",
    "record": {...},
    "event": {
        "id": "...",
        "type": "thing-created",
        "timestamp": 1548967022000,
    },
}

Lambda Handler Shape

In most Lambdas you do three things:

  1. Convert the AWS trigger event into UOWs.
  2. Initialize rules.
  3. Assemble and run the stream.
from aws_lambda_powertools import Logger
from aws_lambda_stream.pipelines import StreamPipeline, initialize_from
from aws_lambda_stream.events.dynamodb import from_dynamodb
from aws_lambda_stream.utils.opt import DEFAULT_OPTIONS
from .rules import rules


logger = Logger()


def handler(event, context):
    options = {
        **DEFAULT_OPTIONS,
        "logger": logger,
        "bus_name": "my-event-bus",
    }

    StreamPipeline(
        initialize_from(rules),
        options,
    ).assemble(
        from_dynamodb(event),
    )

DEFAULT_OPTIONS provides:

  • logger: standard Python root logger.
  • bus_name: BUS_NAME from the environment.
  • publish: EventBridge publisher.

You can override any of them per Lambda.

Rules And Flavors

Rules are dictionaries. initialize_from(rules) turns them into executable pipelines.

Common rule fields:

  • id: unique rule/pipeline id.
  • flavor: a callable that receives the rule and returns an RxPY operator.
  • event_type: string, list, regex, or callable used to match events.
  • filters: optional list of content predicates.
  • publish: publisher function, usually from DEFAULT_OPTIONS.
  • logger: optional logger. If omitted, a standard pipeline logger is configured.

Built-in flavors include:

  • task: execute business logic and optionally emit an event.
  • collect: store events for later correlation.
  • correlate: build correlation records.
  • evaluate: evaluate collected/correlated events and emit higher-order events.
  • materialize: update materialized views.
  • cdc: publish change-data-capture events.
  • s3, sns, elasticsearch: write to external destinations.
  • expired: emit expiration events from DynamoDB TTL removals.
  • update: query/get/update DynamoDB records.

Filtering Events

Most flavors start by checking event_type and then optional content filters:

{
    "id": "collect-thing-created",
    "flavor": collect,
    "event_type": "thing-created",
    "filters": [
        lambda uow, rule: uow["event"]["thing"]["status"] == "active",
    ],
}

event_type can be any of these:

"thing-created"
["thing-created", "thing-updated"]
re.compile(r"thing-(created|updated)$")
lambda uow: uow["event"]["type"].startswith("thing-")

filters receive (uow, rule) and all filters must return True.

Task Flavor

Use task when you want to run business logic for matching events.

from aws_lambda_stream.flavors.task import task


def execute(uow, rule):
    return {
        "thing_id": uow["event"]["thing"]["id"],
        "processed": True,
    }


rules = [
    {
        "id": "process-thing",
        "flavor": task,
        "event_type": "thing-created",
        "execute": execute,
    },
]

The result is stored in uow["result"] by default. Use result_key to change it:

{
    "id": "process-thing",
    "flavor": task,
    "event_type": "thing-created",
    "execute": execute,
    "result_key": "task_result",
}

Emitting Events From A Task

Add emit to publish a follow-up event.

{
    "id": "complete-task",
    "flavor": task,
    "event_type": "task-requested",
    "execute": execute,
    "emit": lambda uow, rule, template: {
        **template,
        "type": "task-completed",
        "thing": uow["event"]["thing"],
    },
}

template contains a generated id, timestamp, and partition_key.

Return a list from emit to publish multiple events:

"emit": lambda uow, rule, template: [
    {
        **template,
        "type": "first-event",
    },
    {
        **template,
        "type": "second-event",
    },
]

Collect, Correlate, Evaluate

These flavors are useful for event-driven workflows where one event is not enough.

collect

collect stores raw events in DynamoDB so they can be queried later.

from aws_lambda_stream.flavors.collect import collect


{
    "id": "collect-order-events",
    "flavor": collect,
    "event_type": ["order-created", "payment-authorized"],
    "correlation_key": "order.id",
}

If correlation_key is a string, it is read from event. If it is callable, it receives the UOW:

"correlation_key": lambda uow: uow["event"]["order"]["id"]

correlate

correlate reads collected event table stream records and writes correlation records.

from aws_lambda_stream.flavors.correlate import correlate


{
    "id": "correlate-order",
    "flavor": correlate,
    "event_type": ["order-created", "payment-authorized"],
    "correlation_key": "order.id",
}

Use correlation_key_suffix when different workflows share the same entity id:

{
    "id": "correlate-payment",
    "flavor": correlate,
    "event_type": "payment-authorized",
    "correlation_key": "order.id",
    "correlation_key_suffix": "payment",
}

evaluate

evaluate checks whether enough correlated events exist and emits higher-order events.

from aws_lambda_stream.flavors.evaluate import evaluate


def has_order_and_payment(uow):
    types = [event["type"] for event in uow["correlated"]]
    return "order-created" in types and "payment-authorized" in types


{
    "id": "order-ready",
    "flavor": evaluate,
    "event_type": ["order-created", "payment-authorized"],
    "correlation_key_suffix": "payment",
    "expression": has_order_and_payment,
    "emit": "order-ready",
}

expression can return:

  • True or False.
  • An event dictionary.
  • A list of event dictionaries.

When it returns True, the triggering event is used as the trigger.

CDC Flavor

Use cdc to publish a new event from a DynamoDB stream record.

from aws_lambda_stream.flavors.cdc import cdc


{
    "id": "thing-cdc",
    "flavor": cdc,
    "event_type": "THING-created",
    "to_event": lambda uow: {
        "type": "thing-created",
        "thing": uow["event"]["raw"]["new"],
    },
}

to_event receives the UOW and returns fields that are merged into uow["event"] before publishing.

Materialize Flavor

Use materialize to update a DynamoDB materialized view from an event.

from aws_lambda_stream.flavors.materialize import materialize
from aws_lambda_stream.utils.dynamodb import update_expression


{
    "id": "materialize-thing",
    "flavor": materialize,
    "event_type": "thing-created",
    "to_update_request": lambda uow: {
        "Key": {
            "pk": uow["event"]["thing"]["id"],
            "sk": "THING",
        },
        **update_expression({
            "name": uow["event"]["thing"]["name"],
            "timestamp": uow["event"]["timestamp"],
        }),
    },
}

Use split_on when one event should update multiple records:

{
    "id": "materialize-order-items",
    "flavor": materialize,
    "event_type": "order-created",
    "split_on": "event.items",
    "split_target_field": "item",
    "to_update_request": lambda uow: {
        "Key": {
            "pk": uow["item"]["id"],
            "sk": "ITEM",
        },
        **update_expression({
            "order_id": uow["event"]["order"]["id"],
        }),
    },
}

S3 And SNS Flavors

Use s3 to write derived objects.

from aws_lambda_stream.flavors.s3 import s3


{
    "id": "write-order-snapshot",
    "flavor": s3,
    "event_type": "order-ready",
    "bucket_name": "my-snapshot-bucket",
    "to_s3": lambda uow: {
        "Key": "orders/{}.json".format(uow["event"]["order"]["id"]),
        "Body": json.dumps(uow["event"]["order"]).encode("utf-8"),
        "ContentType": "application/json",
    },
}

Use sns to publish batches to an SNS topic.

from aws_lambda_stream.flavors.sns import sns


{
    "id": "notify-order-ready",
    "flavor": sns,
    "event_type": "order-ready",
    "topic_arn": "arn:aws:sns:us-east-1:123456789012:orders",
    "to_sns": lambda uow: [
        {
            "Message": json.dumps(uow["event"]),
        },
    ],
}

Expired Flavor

Use expired with DynamoDB TTL stream removals. If an item has ttl and expire, the flavor emits an expiration event.

from aws_lambda_stream.flavors.expired import expired


{
    "id": "expired-events",
    "flavor": expired,
}

If expire is True, the emitted type is derived from the original event:

  • thing.created becomes thing.created.expired
  • thing-created becomes thing-created-expired

If expire is a string, that string is used as the emitted type:

{
    "ttl": 1548967022,
    "expire": "thing-timeout",
    "event": {
        "id": "thing-1",
        "type": "thing-created",
        "timestamp": 1548967022000,
    },
}

Update Flavor

Use update when an event needs to query records, optionally split results, and update each target.

from aws_lambda_stream.flavors.update import update
from aws_lambda_stream.utils.dynamodb import update_expression


{
    "id": "update-related-things",
    "flavor": update,
    "event_type": "thing-renamed",
    "to_query_request": lambda uow, rule: {
        "IndexName": "DataIndex",
        "KeyConditionExpression": "#data = :data",
        "ExpressionAttributeNames": {
            "#data": "data",
        },
        "ExpressionAttributeValues": {
            ":data": uow["event"]["thing"]["id"],
        },
    },
    "split_on": "query_response",
    "split_target_field": "target",
    "to_get_request": lambda uow, rule: {
        "Keys": [
            {
                "pk": uow["target"]["pk"],
                "sk": uow["target"]["sk"],
            },
        ],
    },
    "to_update_request": lambda uow, rule: {
        "Key": {
            "pk": uow["target"]["pk"],
            "sk": uow["target"]["sk"],
        },
        **update_expression({
            "thing_name": uow["event"]["thing"]["name"],
        }),
    },
}

Use to_fallback_update_request when an update can legitimately return {} and you want to try a second request.

Event Sources

Helpers normalize AWS event payloads:

from aws_lambda_stream.events.kinesis import from_kinesis
from aws_lambda_stream.events.dynamodb import from_dynamodb
from aws_lambda_stream.events.s3 import from_s3
from aws_lambda_stream.events.sns import from_sns
from aws_lambda_stream.events.sqs import from_sqs

Test helpers are also available, for example to_kinesis_records(...), to_dynamodb_records(...), and to_s3_records(...).

Example local test input:

from aws_lambda_stream.events.kinesis import to_kinesis_records, from_kinesis


event = to_kinesis_records([
    {
        "type": "thing-created",
        "timestamp": 1548967022000,
        "partition_key": "thing-1",
        "thing": {
            "id": "thing-1",
        },
    },
])

uows = from_kinesis(event)

Concurrency

StreamPipeline runs pipelines concurrently by default:

StreamPipeline(pipelines, options, concurrency=True)

You can disable concurrency for deterministic local tests:

StreamPipeline(pipelines, options, concurrency=False)

Concurrent pipelines wait for each rule to complete before shutting down their scheduler, so RxPY operators such as flat_map can safely emit nested observables.

Unit Of Work (UOW)

A Unit Of Work, usually called uow, is the dictionary that moves through the pipeline. Every flavor receives a stream of UOWs and may return the same UOW enriched with new fields.

The base shape is intentionally small:

{
    "record": {...},
    "event": {
        "id": "event-id",
        "type": "thing-created",
        "timestamp": 1548967022000,
        "tags": {...},
    },
}

Common fields:

  • record: the original AWS record. It is kept so failures can be inspected or resubmitted.
  • event: the normalized domain event used by filters and flavors.
  • event.id: stable event id.
  • event.type: event type used by event_type filters.
  • event.timestamp: event timestamp in milliseconds.
  • event.tags: optional metadata such as region, source, pipeline, or test skip tags.

Everything else depends on the source event or your domain payload.

For example, an event from an event hub may look like:

{
    "record": {...},
    "event": {
        "id": "evt-1",
        "type": "thing-created",
        "timestamp": 1548967022000,
        "tags": {...},
        "thing": {
            "id": "thing-1",
            "name": "Thing One",
        },
    },
}

A UOW created from DynamoDB Streams includes source-specific data under event.raw:

{
    "record": {...},
    "event": {
        "id": "3",
        "type": "EVENT-created",
        "timestamp": 1548967022000,
        "tags": {
            "region": "us-west-2",
        },
        "raw": {
            "new": {...},
            "old": None,
        },
    },
}

When StreamPipeline runs a rule, it also adds pipeline to the UOW so downstream operators know which rule is processing it:

{
    "pipeline": "rule-id",
    "record": {...},
    "event": {...},
}

Fault Handling

Use faulty(...) around functions that may fail for a single record:

from aws_lambda_stream.utils.faults import faulty
from aws_lambda_stream.utils.operators import rx_map


source.pipe(
    rx_map(faulty(lambda uow: do_work(uow))),
)

Expected per-record failures are collected as fault events and processing continues with the next record. Unexpected RxPY on_error errors remain terminal.

Writing A Custom Flavor

A flavor receives a rule and returns an RxPY operator.

from reactivex import Observable, operators as ops
from aws_lambda_stream.utils.filters import on_event_type, on_content
from aws_lambda_stream.utils.operators import rx_filter, rx_map
from aws_lambda_stream.utils.print import print_start_pipeline, print_end_pipeline


def my_flavor(rule):
    def wrapper(source: Observable):
        return source.pipe(
            rx_filter(on_event_type(rule)),
            ops.do_action(print_start_pipeline(rule)),
            rx_filter(on_content(rule)),
            rx_map(lambda uow: {
                **uow,
                "custom": True,
            }),
            ops.do_action(print_end_pipeline(rule)),
        )
    return wrapper

Use ops.do_action(...) for logging or side effects that should not transform the UOW. Use rx_map(...) and rx_filter(...) when you want per-record failures to be collected as faults and processing to continue.

Local Testing

You can run a pipeline directly in a unit test:

from aws_lambda_stream.pipelines import StreamPipeline, initialize_from
from aws_lambda_stream.events.kinesis import from_kinesis, to_kinesis_records
from aws_lambda_stream.utils.opt import DEFAULT_OPTIONS


def test_pipeline():
    collected = []

    event = to_kinesis_records([
        {
            "type": "thing-created",
            "timestamp": 1,
            "partition_key": "thing-1",
        },
    ])

    StreamPipeline(
        initialize_from(rules),
        DEFAULT_OPTIONS,
        concurrency=False,
    ).assemble(
        from_kinesis(event),
        on_next=lambda pipeline_id, uow: collected.append((pipeline_id, uow)),
    )

    assert len(collected) == 1

Use concurrency=False when you want stable ordering in tests.

Project Templates

The repository includes Serverless Framework templates:

  • event-hub
  • event-lake-s3
  • event-fault-monitor
  • rest-bff-service
  • control-service

Create a project from a template:

sls create \
  --template-url https://github.com/clandro89/aws-lambda-stream/tree/master/templates/event-hub \
  --path myprefix-event-hub

Development

Run tests:

poetry run pytest

Build:

poetry build

Publish:

poetry publish

Credits

License

This library is licensed under the MIT License. See the LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws_lambda_stream-1.4.0.tar.gz (31.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aws_lambda_stream-1.4.0-py3-none-any.whl (48.5 kB view details)

Uploaded Python 3

File details

Details for the file aws_lambda_stream-1.4.0.tar.gz.

File metadata

  • Download URL: aws_lambda_stream-1.4.0.tar.gz
  • Upload date:
  • Size: 31.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.3 Darwin/25.5.0

File hashes

Hashes for aws_lambda_stream-1.4.0.tar.gz
Algorithm Hash digest
SHA256 63b7d76bc969e9d80abe00bf49e7aeca7275e576d9263d24b4f36d1cec4e0cf4
MD5 147e8aebaf26196f8ef0747a06df8739
BLAKE2b-256 0b5ae297f5a5201b11d11a376703e0bd155f4d86fde4a7bfe299f2c503d27b65

See more details on using hashes here.

File details

Details for the file aws_lambda_stream-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: aws_lambda_stream-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 48.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.13.3 Darwin/25.5.0

File hashes

Hashes for aws_lambda_stream-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 27feb5946de1aad240cef654d1ef03e7655dec489f5654ca1d7737f6d4a28391
MD5 981de9e5d26c8376e8df79e67efeb5c9
BLAKE2b-256 37314812dab0295749d9591ccdbd0c5813e8f0fb4724427c1d47894c1d6a2a00

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page