Skip to main content

A DRY DynamoDB normalization layer extracted from Trellis Python.

Project description

๐Ÿ”Œ daplug-ddb

Schema-Driven DynamoDB Normalization & Event Publishing for Python

CircleCI Quality Gate Status Bugs Coverage Python PyPI package License Contributions

daplug-ddb is a lightweight package that provides schema-aware CRUD helpers, batch utilities, and optional SNS publishing so you can treat DynamoDB as a structured datastore without rewriting boilerplate for every project.

โœจ Key Features

  • Schema Mapping โ€“ Convert inbound payloads into strongly typed DynamoDB items driven by your OpenAPI (or JSON schema) definitions.
  • Idempotent CRUD โ€“ Consistent create, overwrite, update, delete, and read operations with optional optimistic locking via an idempotence_key.
  • Batch Helpers โ€“ Simplified batch insert/delete flows that validate data and handle chunking for you.
  • SNS Integration โ€“ Optional event publishing for every write operation so downstream systems stay in sync.

๐Ÿš€ Quick Start

Installation

pip install daplug-ddb
# pipenv install daplug-ddb
# poetry add daplug-ddb
# uv pip install daplug-ddb

Basic Usage

import daplug_ddb

adapter = daplug_ddb.adapter(
    engine="dynamodb",
    table="example-table",
    endpoint="https://dynamodb.us-east-2.amazonaws.com", # optional, will use AWS conventional env vars if using on lambda
    schema="ExampleModel",
    schema_file="openapi.yml",
    identifier="record_id",
    idempotence_key="modified",
)

item = adapter.create(
    data={
        "record_id": "abc123",
        "object_key": {"string_key": "value"},
        "array_number": [1, 2, 3],
        "modified": "2024-01-01",
    }
)

print(item)

The adapter automatically maps the payload to your schema and publishes an SNS event if credentials are provided.

๐Ÿ”ง Advanced Configuration

Selective Updates

# Merge partial updates while preserving existing attributes
adapter.update(
    operation="get",  # fetch original item via get; use "query" for indexes
    query={
        "Key": {"record_id": "abc123", "sort_key": "v1"}
    },
    data={
        "record_id": "abc123",
        "sort_key": "v1",
        "array_number": [1, 2, 3, 4],
    },
    update_list_operation="replace",
)

Hash/Range Prefixing

adapter = daplug_ddb.adapter(
    engine="dynamodb",
    table="tenant-config",
    endpoint="https://dynamodb.us-east-2.amazonaws.com",
    schema="TenantModel",
    schema_file="openapi.yml",
    identifier="tenant_id",
    hash_key="tenant_id",
    hash_prefix="tenant#",
    range_key="sort_key",
    range_prefix="config#",
)

item = adapter.create(data={
    "tenant_id": "abc",
    "sort_key": "default",
    "modified": "2024-01-01",
})
# DynamoDB stores tenant_id as "tenant#abc", but the adapter returns "abc"

When prefixes are configured, the adapter automatically applies them on the way into DynamoDB (including batch operations and deletes) and removes them before returning data or publishing SNS events.

Batched Writes

adapter.batch_insert(
    data=[
        {"record_id": str(idx), "sort_key": str(idx)}
        for idx in range(100)
    ],
    batch_size=25,
)

adapter.batch_delete(
    data=[
        {"record_id": str(idx), "sort_key": str(idx)}
        for idx in range(100)
    ]
)

Idempotent Operations

adapter = daplug_ddb.adapter(
    engine="dynamodb",
    table="orders",
    endpoint="https://dynamodb.us-east-2.amazonaws.com",
    schema="OrderModel",
    schema_file="openapi.yml",
    identifier="order_id",
    idempotence_key="modified",
)

updated = adapter.update(
    data={"order_id": "abc123", "modified": "2024-02-01"},
    operation="get",
    query={"Key": {"order_id": "abc123"}},
)

The adapter fetches the current item, merges the update, and executes a conditional PutItem to ensure the stored modified value still matches what was read. If another writer changes the record first, the operation fails with a conditional check error rather than overwriting the data.

Client Update Request
        โ”‚
        โ–ผ
  [Adapter.fetch]
        โ”‚  (reads original item)
        โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Original Item            โ”‚
โ”‚ idempotence_key = "v1"   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
        โ”‚ merge + map
        โ–ผ
PutItem(Item=โ€ฆ, ConditionExpression=Attr(idempotence_key).eq("v1"))
        โ”‚
   โ”Œโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
   โ”‚            โ”‚
   โ–ผ            โ–ผ
Success     ConditionalCheckFailed
          (another writer changed key)

SNS Publishing

adapter = daplug_ddb.adapter(
    engine="dynamodb",
    table="audit-table",
    schema="AuditModel",
    schema_file="openapi.yml",
    identifier="audit_id",
    idempotence_key="version",
    sns_arn="arn:aws:sns:us-east-2:123456789012:audit-events",
    sns_endpoint="https://sns.us-east-2.amazonaws.com",
    sns_attributes={"source": "daplug"},
)

adapter.delete(
    query={
        "Key": {"audit_id": "abc123", "version": "2024-01-01"}
    }
)
# => publishes a formatted SNS event with schema metadata

๐Ÿงช Local Development

Prerequisites

  • Python 3.9+
  • Pipenv
  • Docker (for running DynamoDB Local during tests)

Environment Setup

git clone https://github.com/paulcruse3/daplug-ddb.git
cd daplug-ddb
pipenv install --dev

Run Tests

# unit tests (no DynamoDB required)
pipenv run test

# integration tests (spins up local DynamoDB when available)
pipenv run integrations

Supplying an idempotence_key enables optimistic concurrency for updates and overwrites. The adapter reads the original item, captures the keyโ€™s value, and issues a PutItem with a ConditionExpression asserting the value is unchanged. If another writer updates the record first, DynamoDB returns a conditional check failure instead of silently overwriting data.

Client Update Request
        โ”‚
        โ–ผ
  [Adapter.fetch]
        โ”‚  (reads original item)
        โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Original Item            โ”‚
โ”‚ idempotence_key = "v1"   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
        โ”‚ merge + map
        โ–ผ
PutItem(Item=โ€ฆ, ConditionExpression=Attr(idempotence_key).eq("v1"))
        โ”‚
   โ”Œโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
   โ”‚            โ”‚
   โ–ผ            โ–ผ
Success     ConditionalCheckFailed
          (another writer changed key)
  • Optional: Omit idempotence_key to mirror DynamoDBโ€™s default โ€œlast write winsโ€ behavior while still benefiting from schema normalization.
  • Safety: When the key is configured but missing on the fetched item, the adapter raises ValueError, surfacing misconfigurations early.
  • Events: SNS notifications include the idempotence metadata so downstream services can reason about version changes.

Coverage & Linting

# generates HTML, XML, and JUnit reports under ./coverage/
pipenv run coverage

# pylint configuration aligned with the legacy project
pipenv run lint

๐Ÿ“ฆ Project Structure

daplug-ddb/
โ”œโ”€โ”€ daplug_ddb/
โ”‚ย ย  โ”œโ”€โ”€ adapter.py           # DynamoDB adapter implementation
โ”‚ย ย  โ”œโ”€โ”€ prefixer.py          # DynamoDB prefixer implementation
โ”‚ย ย  โ”œโ”€โ”€ common/              # Shared helpers (merging, schema loading, logging)
โ”‚ย ย  โ””โ”€โ”€ __init__.py          # Public adapter factory & exports
โ”œโ”€โ”€ tests/
โ”‚ย ย  โ”œโ”€โ”€ integration/         # Integration suite against DynamoDB Local
โ”‚ย ย  โ”œโ”€โ”€ unit/                # Isolated unit tests using mocks
โ”‚ย ย  โ””โ”€โ”€ openapi.yml          # Sample schema used for mapping tests
โ”œโ”€โ”€ Pipfile                  # Runtime and dev dependencies
โ”œโ”€โ”€ setup.py                 # Packaging metadata
โ””โ”€โ”€ README.md

๐Ÿค Contributing

Contributions are welcome! Open an issue or submit a pull request if youโ€™d like to add new features, improve documentation, or expand test coverage.

git checkout -b feature/amazing-improvement
# make your changes
pipenv run lint
pipenv run test
pipenv run integrations
git commit -am "feat: amazing improvement"
git push origin feature/amazing-improvement

๐Ÿ“„ License

Apache License 2.0 โ€“ see LICENSE for full text.


Built to keep DynamoDB integrations DRY, predictable, and schema-driven.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daplug_ddb-1.0.0b3.tar.gz (23.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

daplug_ddb-1.0.0b3-py3-none-any.whl (25.5 kB view details)

Uploaded Python 3

File details

Details for the file daplug_ddb-1.0.0b3.tar.gz.

File metadata

  • Download URL: daplug_ddb-1.0.0b3.tar.gz
  • Upload date:
  • Size: 23.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.17

File hashes

Hashes for daplug_ddb-1.0.0b3.tar.gz
Algorithm Hash digest
SHA256 c70d488f97c2d0576519d50fc1df1bbadfebbee7a78088ae11b0727cd94dd547
MD5 1d15f7f8e11d081fbe6b0e5a8e77e018
BLAKE2b-256 d5ce19ed43674e3dc23db5848fb9e61bb2ec92cc37926015743a88e0d3846362

See more details on using hashes here.

File details

Details for the file daplug_ddb-1.0.0b3-py3-none-any.whl.

File metadata

  • Download URL: daplug_ddb-1.0.0b3-py3-none-any.whl
  • Upload date:
  • Size: 25.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.17

File hashes

Hashes for daplug_ddb-1.0.0b3-py3-none-any.whl
Algorithm Hash digest
SHA256 28ad143a2d9f43858e306e23f6a5bc9e85aaf72018878be0c95aaa78776f7f23
MD5 c460a592da2d2cbbb667439c381e3034
BLAKE2b-256 e3bc2c950ad1dc5aadb98a0f842fdd42706100418b932142338406615ffe9d0b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page