A DRY DynamoDB normalization layer extracted from Trellis Python.
Project description
๐ daplug-ddb (daโขplug)
Schema-Driven DynamoDB Normalization & Event Publishing for Python
daplug-ddb is a lightweight package that provides schema-aware CRUD helpers, batch utilities, and optional SNS publishing so you can treat DynamoDB as a structured datastore without rewriting boilerplate for every project.
โจ Key Features
- Schema Mapping โ Convert inbound payloads into strongly typed DynamoDB items driven by your OpenAPI (or JSON schema) definitions.
- Idempotent CRUD โ Consistent
create,overwrite,update,delete, andreadoperations with optional optimistic locking via anidempotence_key. - Batch Helpers โ Simplified batch insert/delete flows that validate data and handle chunking for you.
- SNS Integration โ Optional event publishing for every write operation so downstream systems stay in sync.
๐ Quick Start
Installation
pip install daplug-ddb
# pipenv install daplug-ddb
# poetry add daplug-ddb
# uv pip install daplug-ddb
Basic Usage
import daplug_ddb
adapter = daplug_ddb.adapter(
table="example-table",
endpoint="https://dynamodb.us-east-2.amazonaws.com", # optional, will use AWS conventional env vars if using on lambda
schema_file="openapi.yml",
hash_key="record_id",
idempotence_key="modified",
)
item = adapter.create(
data={
"record_id": "abc123",
"object_key": {"string_key": "value"},
"array_number": [1, 2, 3],
"modified": "2024-01-01",
},
schema="ExampleModel",
)
print(item)
Because the adapter is configured with a schema_file, every call can opt into
mapping by supplying schema. Skip the schema argument when you want to write
the data exactly as provided.
๐ง Advanced Configuration
Selective Updates
# Merge partial updates while preserving existing attributes
adapter.update(
operation="get", # fetch original item via get; use "query" for indexes
query={
"Key": {"record_id": "abc123", "sort_key": "v1"}
},
data={
"record_id": "abc123",
"sort_key": "v1",
"array_number": [1, 2, 3, 4],
},
update_list_operation="replace",
)
Hash/Range Prefixing
adapter = daplug_ddb.adapter(
table="tenant-config",
endpoint="https://dynamodb.us-east-2.amazonaws.com",
schema_file="openapi.yml",
hash_key="tenant_id",
)
prefix_args = {
"hash_key": "tenant_id",
"hash_prefix": "tenant#",
"range_key": "sort_key",
"range_prefix": "config#",
}
item = adapter.create(
data={
"tenant_id": "abc",
"sort_key": "default",
"modified": "2024-01-01",
},
schema="TenantModel",
**prefix_args,
)
# DynamoDB stores tenant_id as "tenant#abc", but the adapter returns "abc"
When prefixes are provided, the adapter automatically applies them on the way
into DynamoDB (including batch operations and deletes) and removes them before
returning data or publishing SNS events. Pass the same prefix_args to reads
(get, query, scan) so query keys are expanded and responses are cleaned.
Codex workflow references:
Batched Writes
adapter.batch_insert(
data=[
{"record_id": str(idx), "sort_key": str(idx)}
for idx in range(100)
],
batch_size=25,
)
adapter.batch_delete(
data=[
{"record_id": str(idx), "sort_key": str(idx)}
for idx in range(100)
]
)
Idempotent Operations
adapter = daplug_ddb.adapter(
table="orders",
endpoint="https://dynamodb.us-east-2.amazonaws.com",
schema_file="openapi.yml",
hash_key="order_id",
idempotence_key="modified",
)
updated = adapter.update(
data={"order_id": "abc123", "modified": "2024-02-01"},
operation="get",
query={"Key": {"order_id": "abc123"}},
schema="OrderModel",
)
The adapter fetches the current item, merges the update, and executes a
conditional PutItem to ensure the stored modified value still matches
what was read. If another writer changes the record first, the operation
fails with a conditional check error rather than overwriting the data.
Set raise_idempotence_error=True if you prefer the adapter to raise a
ValueError instead of relying on DynamoDB's conditional failure. Leaving it
at the default (False) allows you to detect conflicts without breaking the
update flow.
adapter = daplug_ddb.adapter(
table="orders",
schema_file="openapi.yml",
hash_key="order_id",
idempotence_key="modified",
raise_idempotence_error=True,
)
Enable idempotence_use_latest=True when you want the adapter to keep the
most recent copy based on the timestamp stored in the idempotence key. Stale
updates are ignored automatically.
adapter = daplug_ddb.adapter(
table="orders",
schema_file="openapi.yml",
hash_key="order_id",
idempotence_key="modified",
idempotence_use_latest=True,
)
Stale updates are short-circuited before DynamoDB writes occur.
Client Update Request
โ
โผ
[Adapter.fetch]
โ (reads original item)
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Original Item โ
โ modified = "2024-01-01" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ merge + map
โผ
PutItem rejected โ original returned
Client Update Request
โ
โผ
[Adapter.fetch]
โ (reads original item)
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Original Item โ
โ idempotence_key = "v1" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ merge + map
โผ
PutItem(Item=โฆ, ConditionExpression=Attr(idempotence_key).eq("v1"))
โ
โโโโโโดโโโโโโโโ
โ โ
โผ โผ
Success ConditionalCheckFailed
(another writer changed key)
SNS Publishing
Per-call SNS Attributes
You can supply request-scoped SNS message attributes by passing 'sns_attributes' into any adapter operation (e.g. 'create', 'update', 'delete'). These merge with adapter defaults and schema-derived metadata.
adapter = daplug_ddb.adapter(
table="audit-table",
schema_file="openapi.yml",
hash_key="audit_id",
idempotence_key="version",
sns_arn="arn:aws:sns:us-east-2:123456789012:audit-events",
sns_endpoint="https://sns.us-east-2.amazonaws.com",
sns_attributes={"source": "daplug"},
)
adapter.create(
data=item,
schema="AuditModel",
sns_attributes={"source": "billing", "priority": "high"},
)
# => publishes a formatted SNS event with schema metadata
๐ Method Reference
Each adapter instance holds shared configuration such as schema_file, SNS
defaults, and optional key prefixes. Pass the schema name (and any
operation-specific overrides) when you invoke a method.
adapter = daplug_ddb.adapter(
table="orders",
schema_file="openapi.yml",
hash_key="order_id",
idempotence_key="modified",
)
create (wrapper around insert/overwrite)
# default: behaves like insert (requires hash_key)
adapter.create(data=payload, schema="OrderModel")
# explicit overwrite (upsert semantics)
adapter.create(
operation="overwrite",
data=payload,
schema="OrderModel",
)
insert
adapter.insert(data=payload, schema="OrderModel")
overwrite
adapter.overwrite(data=payload, schema="OrderModel")
get
adapter.get(
query={"Key": {"order_id": "abc123"}},
schema="OrderModel",
)
query
adapter.query(
query={
"IndexName": "test_query_id",
"KeyConditionExpression": "test_query_id = :id",
"ExpressionAttributeValues": {":id": "def345"},
},
schema="OrderModel",
)
scan
adapter.scan(schema="OrderModel")
# raw DynamoDB response
adapter.scan(raw_scan=True)
read
read delegates to get, query, or scan based on the
operation kwarg.
# single item
adapter.read(operation="get", query={"Key": {"order_id": "abc123"}}, schema="OrderModel")
# query
adapter.read(
operation="query",
query={
"KeyConditionExpression": "test_query_id = :id",
"ExpressionAttributeValues": {":id": "def345"},
},
schema="OrderModel",
)
update
adapter.update(
data={"order_id": "abc123", "modified": "2024-03-02"},
operation="get",
query={"Key": {"order_id": "abc123"}},
schema="OrderModel",
)
delete
adapter.delete(query={"Key": {"order_id": "abc123"}})
batch_insert
adapter.batch_insert(data=[{...} for _ in range(10)], schema="OrderModel", batch_size=25)
batch_delete
adapter.batch_delete(data=[{...} for _ in range(10)], batch_size=25)
Prefixing Helpers
Include per-call prefix overrides whenever you need to scope keys.
adapter.insert(
data=payload,
schema="OrderModel",
hash_key="order_id",
hash_prefix="tenant#",
)
๐งช Local Development
Prerequisites
- Python 3.9+
- Pipenv
- Docker (for running DynamoDB Local during tests)
Environment Setup
git clone https://github.com/paulcruse3/daplug-ddb.git
cd daplug-ddb
pipenv install --dev
Run Tests
# unit tests (no DynamoDB required)
pipenv run test
# integration tests (spins up local DynamoDB when available)
pipenv run integrations
Supplying an idempotence_key enables optimistic concurrency for updates and overwrites. The adapter reads the original item, captures the keyโs value, and issues a PutItem with a ConditionExpression asserting the value is unchanged. If another writer updates the record first, DynamoDB returns a conditional check failure instead of silently overwriting data.
Client Update Request
โ
โผ
[Adapter.fetch]
โ (reads original item)
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Original Item โ
โ idempotence_key = "v1" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ merge + map
โผ
PutItem(Item=โฆ, ConditionExpression=Attr(idempotence_key).eq("v1"))
โ
โโโโโโดโโโโโโโโ
โ โ
โผ โผ
Success ConditionalCheckFailed
(another writer changed key)
- Optional: Omit
idempotence_keyto mirror DynamoDBโs default โlast write winsโ behavior while still benefiting from schema normalization. - Safety: When the key is configured but missing on the fetched item, the adapter raises
ValueError, surfacing misconfigurations early. - Events: SNS notifications include the idempotence metadata so downstream services can reason about version changes.
Coverage & Linting
# generates HTML, XML, and JUnit reports under ./coverage/
pipenv run coverage
# pylint configuration aligned with the legacy project
pipenv run lint
๐ฆ Project Structure
daplug-ddb/
โโโ daplug_ddb/
โย ย โโโ adapter.py # DynamoDB adapter implementation
โย ย โโโ prefixer.py # DynamoDB prefixer implementation
โย ย โโโ common/ # Shared helpers (merging, schema loading, logging)
โย ย โโโ __init__.py # Public adapter factory & exports
โโโ tests/
โย ย โโโ integration/ # Integration suite against DynamoDB Local
โย ย โโโ unit/ # Isolated unit tests using mocks
โย ย โโโ openapi.yml # Sample schema used for mapping tests
โโโ Pipfile # Runtime and dev dependencies
โโโ setup.py # Packaging metadata
โโโ README.md
๐ค Contributing
Contributions are welcome! Open an issue or submit a pull request if youโd like to add new features, improve documentation, or expand test coverage.
git checkout -b feature/amazing-improvement
# make your changes
pipenv run lint
pipenv run test
pipenv run integrations
git commit -am "feat: amazing improvement"
git push origin feature/amazing-improvement
๐ License
Apache License 2.0 โ see LICENSE for full text.
Built to keep DynamoDB integrations DRY, predictable, and schema-driven.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daplug_ddb-1.0.0b9.tar.gz.
File metadata
- Download URL: daplug_ddb-1.0.0b9.tar.gz
- Upload date:
- Size: 28.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0ffbbc1678c8ce5bb006b5ffb1a099b919da7141f8068fa1db7af793ac803615
|
|
| MD5 |
76d813b305bce12d95a94bd13314fa63
|
|
| BLAKE2b-256 |
19eea3c0a7472a6d0a379ca356f3d5db53f709ca7bfb46c086b7ff85e1b23665
|
File details
Details for the file daplug_ddb-1.0.0b9-py3-none-any.whl.
File metadata
- Download URL: daplug_ddb-1.0.0b9-py3-none-any.whl
- Upload date:
- Size: 29.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
59edd6b8378e12235ed48315597c5596739d6fd56ea7cdc10c9b1f69b046378e
|
|
| MD5 |
8fdc88e37de3b56475e13c6e582b7b2c
|
|
| BLAKE2b-256 |
52600b76c16e91c2c03f2831a01875cc1cfbdb047e5757279a64a64e1379f9ac
|