Skip to main content

A framework for content-based routing of records in a Dynamodb Stream to the callable that should handle them

Project description

dynamodb-stream-router

WARNING - Version 0.0.6 is a breaking change from version 0.0.5. Please review the documentation before upgrading

Provies a framework for mapping records in a Dynamodb stream to callables based on the event name (MODIFY, INSERT, DELETE) and content.

Installation

pip install dynamodb-stream-router

Routing

  • Routes are determined by examining the eventName in the dynamodb section of the DynamoDB stream Record and through a condition which examines the contents of the Record.
  • Conditions can either be a callable that takes a RouteRecord and returns True or False, or it may be a string expression that will be parsed into a callable. See below for the expression language.
  • Routes have a priority, which is honored in an acending order. If multiple matching routes have the same priority, they will be executed in random order (concurrectly, if an executor is provided)
  • Routes are matched based on a RouteRecord, which is a helper class that (lazily) deserializers the DynamoDB item structure (used in Keys, NewImage and OldImage) into Python types, exactly in the same way that the boto3 dynamodb Table resource does.
  • Route handling functions take a RouteRecord and can return anything. The return value is not used by the framework.

Expressions

Keywords and types:

Type Description Example
VALUE A quoted string (single or double quote), integer, or float representing a literal value 'foo', 1, 3.8
$OLD A reference to RouteRecord.old_image $OLD.foo
$NEW A reference to RouteRecord.old_image $NEW.foo
PATH A path starting from a root of $OLD or $NEW. Can be specified using dot syntax or python style keys. When using dot reference paths must conform to python's restrictions. $OLD.foo, $NEW.foo.bar, $OLD["foo"]
INDEX An integer used as an index into a list or set $OLD.foo[0]

Operators:

Symbol Action
& Logical AND
| Logical OR
() Statement grouping
== Equality
!= Non equality
> Greater than
>= Greater than or equal to
< Less than
<= Less than or equal to
=~ Regex comparison PATH =~ 'regex' where 'regex' is a quoted VALUE

Comparison operators, except for regex comparison, can compare PATH to VALUE, PATH to PATH, or even VALUE to VALUE.

functions

Function Arguments Description
has_changed(VALUE, VALUE) VALUE - Comma separated list of quoted values Tests $OLD and $NEW. If value is in one and not the other, or in both and differs, the the function will return True. Returns True if any key meets conditions.
is_type(PATH, TYPE)
  • PATH - The path to test in the form of $OLD.foo.bar</li
  • TYPE - A Dynamodb type. Can be one of S, SS, B, BS, N, NS, L, M, or BOOL
Tests if PATH exists and the VALUE at PATH is of type TYPE.
attribute_exists(PATH) PATH - The path to test Returns True if the provided path exists
from_json(PATH) PATH - The path to decode Returns object decoded using simplejson.loads()

Examples

from dynamodb_stream_router import on_insert, on_modify, on_remove, on_operations, Operation, route_records, RouteRecord

@on_insert("$NEW.foo == 'bar'", 0)
def print_new_record(record: RouteRecord) -> None:
    print(record.new_image)

def test_old_foo(record: RouteRecord) -> bool:
    return record.old_image["foo"] == "bar2"

@on_remove(test_old_foo, 0)
def print_old_record(record: RouteRecord) -> None:
    print(record.old_record)

@on_modify("has_changed('foo') & attribute_exists($NEW.foo)", 1)
def print_changed_foo(record: RouteRecord) -> None:
    print(f'{record.old_image.get("foo")} -> {record.new_image.get("foo")}')

@on_operations({Operation.INSERT, Operation.MODIFY, Operation.REMOVE}, 1)
def hello_world(record: RouteRecord) -> str:
    return "Hello, DB STREAM"

def lambda_handler(event, context):
    route_records(event["Records"])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamodb-stream-router-0.0.9.tar.gz (14.0 kB view details)

Uploaded Source

Built Distribution

dynamodb_stream_router-0.0.9-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file dynamodb-stream-router-0.0.9.tar.gz.

File metadata

  • Download URL: dynamodb-stream-router-0.0.9.tar.gz
  • Upload date:
  • Size: 14.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for dynamodb-stream-router-0.0.9.tar.gz
Algorithm Hash digest
SHA256 2abc0de5aa9b8af2d2e585d95490f368644b3b3c8f02ded68b9d385d20e8a570
MD5 74532c294936a48801dded29de6e366d
BLAKE2b-256 fe7babfb992b4050e78bb698c00fadc2d954cd03935465ee6fb4c42adf548ee9

See more details on using hashes here.

File details

Details for the file dynamodb_stream_router-0.0.9-py3-none-any.whl.

File metadata

  • Download URL: dynamodb_stream_router-0.0.9-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for dynamodb_stream_router-0.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 426fac55229572a12aa2e5714cb6e8347ccdef2540e6d9a16a4eb4a06d82384a
MD5 6ccccfa375e1c6b4c7f333c9ef9fb965
BLAKE2b-256 dee98fe0462d167746e61c5f8949ee3b7ed50a8906424c1b03ae24006ea2f0cf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page