A framework for content-based routing of records in a Dynamodb Stream to the callable that should handle them
Project description
dynamodb-stream-router
WARNING - Version 0.0.6 is a breaking change from version 0.0.5. Please review the documentation before upgrading
Provies a framework for mapping records in a Dynamodb stream to callables based on the event name (MODIFY, INSERT, DELETE) and content.
Installation
pip install dynamodb-stream-router
Routing
- Routes are determined by examining the
eventName
in thedynamodb
section of the DynamoDB streamRecord
and through acondition
which examines the contents of theRecord
. - Conditions can either be a callable that takes a
RouteRecord
and returnsTrue
orFalse
, or it may be a string expression that will be parsed into a callable. See below for the expression language. - Routes have a priority, which is honored in an acending order. If multiple matching routes have the same priority, they will be executed in random order (concurrectly, if an
executor
is provided) - Routes are matched based on a
RouteRecord
, which is a helper class that (lazily) deserializers the DynamoDB item structure (used inKeys
,NewImage
andOldImage
) into Python types, exactly in the same way that the boto3 dynamodb Table resource does. - Route handling functions take a
RouteRecord
and can return anything. The return value is not used by the framework.
Expressions
Keywords and types:
Type | Description | Example |
---|---|---|
VALUE | A quoted string (single or double quote), integer, or float representing a literal value | 'foo', 1, 3.8 |
$OLD | A reference to RouteRecord.old_image |
$OLD.foo |
$NEW | A reference to RouteRecord.old_image |
$NEW.foo |
PATH | A path starting from a root of $OLD or $NEW. Can be specified using dot syntax or python style keys. When using dot reference paths must conform to python's restrictions. | $OLD.foo, $NEW.foo.bar, $OLD["foo"] |
INDEX | An integer used as an index into a list or set | $OLD.foo[0] |
Operators:
Symbol | Action |
---|---|
& | Logical AND |
| | Logical OR |
() | Statement grouping |
== | Equality |
!= | Non equality |
> | Greater than |
>= | Greater than or equal to |
< | Less than |
<= | Less than or equal to |
=~ | Regex comparison PATH =~ 'regex' where 'regex' is a quoted VALUE |
Comparison operators, except for regex comparison, can compare PATH
to VALUE
, PATH
to PATH
, or even VALUE
to VALUE
.
functions
Function | Arguments | Description |
---|---|---|
has_changed(VALUE, VALUE) | VALUE - Comma separated list of quoted values | Tests $OLD and $NEW. If value is in one and not the other, or in both and differs, the the function will return True. Returns True if any key meets conditions. |
is_type(PATH, TYPE) |
|
Tests if PATH exists and the VALUE at PATH is of type TYPE. |
attribute_exists(PATH) | PATH - The path to test | Returns True if the provided path exists |
from_json(PATH) | PATH - The path to decode | Returns object decoded using simplejson.loads() |
Examples
from dynamodb_stream_router import on_insert, on_modify, on_remove, on_operations, Operation, route_records, RouteRecord
@on_insert("$NEW.foo == 'bar'", 0)
def print_new_record(record: RouteRecord) -> None:
print(record.new_image)
def test_old_foo(record: RouteRecord) -> bool:
return record.old_image["foo"] == "bar2"
@on_remove(test_old_foo, 0)
def print_old_record(record: RouteRecord) -> None:
print(record.old_record)
@on_modify("has_changed('foo') & attribute_exists($NEW.foo)", 1)
def print_changed_foo(record: RouteRecord) -> None:
print(f'{record.old_image.get("foo")} -> {record.new_image.get("foo")}')
@on_operations({Operation.INSERT, Operation.MODIFY, Operation.REMOVE}, 1)
def hello_world(record: RouteRecord) -> str:
return "Hello, DB STREAM"
def lambda_handler(event, context):
route_records(event["Records"])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for dynamodb-stream-router-0.0.9.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2abc0de5aa9b8af2d2e585d95490f368644b3b3c8f02ded68b9d385d20e8a570 |
|
MD5 | 74532c294936a48801dded29de6e366d |
|
BLAKE2b-256 | fe7babfb992b4050e78bb698c00fadc2d954cd03935465ee6fb4c42adf548ee9 |
Close
Hashes for dynamodb_stream_router-0.0.9-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 426fac55229572a12aa2e5714cb6e8347ccdef2540e6d9a16a4eb4a06d82384a |
|
MD5 | 6ccccfa375e1c6b4c7f333c9ef9fb965 |
|
BLAKE2b-256 | dee98fe0462d167746e61c5f8949ee3b7ed50a8906424c1b03ae24006ea2f0cf |