A framework for content-based routing of records in a Dynamodb Stream to the callable that should handle them
Project description
dynamodb-stream-router
WARNING - Version 0.0.6 is a breaking change from version 0.0.5. Please review the documentation before upgrading
Provies a framework for mapping records in a Dynamodb stream to callables based on the event name (MODIFY, INSERT, DELETE) and content.
Installation
pip install dynamodb-stream-router
Routing
- Routes are determined by examining the
eventName
in thedynamodb
section of the DynamoDB streamRecord
and through acondition
which examines the contents of theRecord
. - Conditions can either be a callable that takes a
RouteRecord
and returnsTrue
orFalse
, or it may be a string expression that will be parsed into a callable. See below for the expression language. - Routes have a priority, which is honored in an acending order. If multiple matching routes have the same priority, they will be executed in random order (concurrectly, if an
executor
is provided) - Routes are matched based on a
RouteRecord
, which is a helper class that (lazily) deserializers the DynamoDB item structure (used inKeys
,NewImage
andOldImage
) into Python types, exactly in the same way that the boto3 dynamodb Table resource does. - Route handling functions take a
RouteRecord
and can return anything. The return value is not used by the framework.
Expressions
Keywords and types:
Type | Description | Example |
---|---|---|
VALUE | A quoted string (single or double quote), integer, or float representing a literal value | 'foo', 1, 3.8 |
$OLD | A reference to RouteRecord.old_image |
$OLD.foo |
$NEW | A reference to RouteRecord.old_image |
$NEW.foo |
PATH | A path starting from a root of $OLD or $NEW. Can be specified using dot syntax or python style keys. When using dot reference paths must conform to python's restrictions. | $OLD.foo, $NEW.foo.bar, $OLD["foo"] |
INDEX | An integer used as an index into a list or set | $OLD.foo[0] |
Operators:
Symbol | Action |
---|---|
& | Logical AND |
| | Logical OR |
() | Statement grouping |
== | Equality |
!= | Non equality |
> | Greater than |
>= | Greater than or equal to |
< | Less than |
<= | Less than or equal to |
=~ | Regex comparison PATH =~ 'regex' where 'regex' is a quoted VALUE |
Comparison operators, except for regex comparison, can compare PATH
to VALUE
, PATH
to PATH
, or even VALUE
to VALUE
.
functions
Function | Arguments | Description |
---|---|---|
has_changed(VALUE, VALUE) | VALUE - Comma separated list of quoted values | Tests $OLD and $NEW. If value is in one and not the other, or in both and differs, the the function will return True. Returns True if any key meets conditions. |
is_type(PATH, TYPE) |
|
Tests if PATH exists and the VALUE at PATH is of type TYPE. |
attribute_exists(PATH) | PATH - The path to test | Returns True if the provided path exists |
from_json(PATH) | PATH - The path to decode | Returns object decoded using simplejson.loads() |
Examples
from dynamodb_stream_router import on_insert, on_modify, on_remove, on_operations, Operation, route_records, RouteRecord
@on_insert("$NEW.foo == 'bar'", 0)
def print_new_record(record: RouteRecord) -> None:
print(record.new_image)
def test_old_foo(record: RouteRecord) -> bool:
return record.old_image["foo"] == "bar2"
@on_remove(test_old_foo, 0)
def print_old_record(record: RouteRecord) -> None:
print(record.old_record)
@on_modify("has_changed('foo') & attribute_exists($NEW.foo)", 1)
def print_changed_foo(record: RouteRecord) -> None:
print(f'{record.old_image.get("foo")} -> {record.new_image.get("foo")}')
@on_operations({Operation.INSERT, Operation.MODIFY, Operation.REMOVE}, 1)
def hello_world(record: RouteRecord) -> str:
return "Hello, DB STREAM"
def lambda_handler(event, context):
route_records(event["Records"])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file dynamodb-stream-router-0.0.9.tar.gz
.
File metadata
- Download URL: dynamodb-stream-router-0.0.9.tar.gz
- Upload date:
- Size: 14.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2abc0de5aa9b8af2d2e585d95490f368644b3b3c8f02ded68b9d385d20e8a570 |
|
MD5 | 74532c294936a48801dded29de6e366d |
|
BLAKE2b-256 | fe7babfb992b4050e78bb698c00fadc2d954cd03935465ee6fb4c42adf548ee9 |
File details
Details for the file dynamodb_stream_router-0.0.9-py3-none-any.whl
.
File metadata
- Download URL: dynamodb_stream_router-0.0.9-py3-none-any.whl
- Upload date:
- Size: 13.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 426fac55229572a12aa2e5714cb6e8347ccdef2540e6d9a16a4eb4a06d82384a |
|
MD5 | 6ccccfa375e1c6b4c7f333c9ef9fb965 |
|
BLAKE2b-256 | dee98fe0462d167746e61c5f8949ee3b7ed50a8906424c1b03ae24006ea2f0cf |