Skip to main content

Shared schema, merge, and SNS helpers powering daplug adapters.

Project description

🧩 daplug-core (da•plug)

Shared schema + event plumbing for daplug- adapters*

CircleCI Quality Gate Status Bugs Coverage Python PyPI package License Contributions

daplug-core is the tiny layer of glue that both daplug-ddb, daplug-cypher, and future daplug-* projects, relied on in their old common/ directories. It bundles a publisher, logging shim, schema utilities, and merge helpers so the higher-level adapters can stay laser-focused on their respective datastores. This repository is not meant to be a fully fledged adapter on its own—it simply centralizes the primitives the adapters share.


🌈 Why this exists

  • Single source of truth – The DynamoDB and Cypher adapters used to carry duplicate copies of the same helpers. daplug-core keeps those modules in one place.
  • Batteries-included SNS publishing – The base publisher encapsulates SNS fan-out, FIFO metadata, and logging so consuming packages can just hand it messages.
  • Schema-first toolingschema_loader and schema_mapper read OpenAPI/JSON schemas and project payloads to the shapes your adapters expect.
  • Deterministic mergingdict_merger upgrades nested payloads with configurable list/dict strategies (add, replace, remove) so you can keep optimistic writes tight.
  • Model-driven event catalogevent_registry + asyncapi_generator turn the events you already publish through daplug into a generated AsyncAPI spec. Because publishing flows through daplug, the catalog is derived from your models instead of hand-written and drifting.

If you are migrating daplug-ddb or daplug-cypher, remove their legacy common/ folder and import from daplug_core instead. Nothing else changes.


📦 Installation

pip install daplug-core
# or
pipenv install daplug-core

Not on PyPI yet? Until release, install straight from the repo:

pip install git+https://github.com/paulcruse3/daplug-core.git

🔁 How consuming packages use the base

  1. Declare the dependency in the adapter package (e.g. daplug-ddb) via Pipfile/pyproject.
  2. Drop the duplicated modules (common/logger.py, common/publisher.py, etc.).
  3. Import from daplug_core wherever those utilities were previously referenced.
# inside daplug-ddb
from daplug_core import dict_merger, json_helper, publisher, schema_mapper

merged = dict_merger.merge(original, incoming, update_list_operation="replace")
publisher.publish(arn=sns_arn, data=merged, attributes={"event": "updated"})

Because the API surface stayed the same, adapter code typically only needs import-path updates.


🧱 Building blocks

Module Purpose
base_adapter.BaseAdapter Minimal SNS-aware adapter scaffold (used as a mixin by higher-level adapters).
publisher.publish Thin wrapper over boto3 SNS clients with FIFO group/dedupe support and structured logging.
logger.log Consistent JSON stdout logging that honors RUN_MODE=unittest.
json_helper Best-effort try_encode_json / try_decode_json helpers used by loggers and publishers.
schema_loader.load_schema Loads an OpenAPI/JSON schema and resolves $refs using jsonref.
schema_mapper.map_to_schema Recursively projects payloads into schema-shaped dictionaries (supports allOf inheritance).
dict_merger.merge Deep merge with per-call list/dict strategies (add, remove, replace, upsert).
event_registry.register_event Declares an event name and the schema (schema_file + schema_key) that describes its payload.
asyncapi_generator Builds an AsyncAPI 3.0 spec from the registered events, $ref-ing the same OpenAPI schemas your API already defines.

Mix and match these pieces inside datastore-specific adapters.

🔕 Per-call publish controls

BaseAdapter.publish recognises two extra kwargs that adapters propagate through their create/update/delete method calls:

Kwarg Effect
publish=False Skip the SNS publish for this call only. Defaults are unchanged.
publish_data=<payload> Publish this payload to SNS instead of the data the adapter just wrote. Useful when the SNS consumers want a different shape than the row that was stored.
adapter.create(data=row, publish=False)             # write, do not notify
adapter.update(data=row, publish_data={"id": row["id"], "event": "updated"})

If both are passed, publish=False wins.


📣 Event catalog generation (AsyncAPI)

Services that publish through daplug shouldn't hand-maintain a separate EVENTS.md. The payload of every event is just a model snapshot, and that model is already described in the service's openapi.yml under components/schemas. event_registry and asyncapi_generator turn that into a generated, verifiable contract.

Generate-only today. The registry is read at build time to emit the spec. It does not validate or reshape payloads on the publish hot path (a possible future follow-up).

1. Register each event next to where it is published

from daplug_core import event_registry

event_registry.register_event(
    "v1-documents-document-created",   # the SNS `event` attribute
    "api/v1/openapi.yml",              # schema_file: where the payload schema lives
    "Document",                        # schema_key: the components/schemas key
    "A document was created",          # optional human description
)

Registration is the single source that binds an event name to the model schema describing its payload. The same map can later gate CI (every published event name must be registered) and drive publish-time validation.

2. Generate the spec

python -m daplug_core.asyncapi_generator \
    --title documents \
    --version v1 \
    --channel documents \
    --bootstrap api.v1.persistence.events_registry \
    --output api/v1/asyncapi.yml

--bootstrap imports the module(s) that call register_event so the registry is populated before the spec is written (repeatable). The emitted asyncapi.yml is AsyncAPI 3.0: each event becomes a message whose payload $refs #/components/schemas/<schema_key>, pulled from your OpenAPI file via schema_loader (so REST and events share one schema definition).

3. Verify and publish like OpenAPI

Treat the generated file exactly like openapi.yml: regenerate it in CI and diff against the committed copy (fails on drift), then render it to Confluence. Because the spec is generated from registered events, an undocumented event can't be omitted and a stale payload can't survive the diff.

Programmatic use

from daplug_core import asyncapi_generator

spec = asyncapi_generator.generate(title="documents", version="v1", channel="documents")
asyncapi_generator.write_spec(spec, "api/v1/asyncapi.yml")

🧭 Example: refactoring daplug-ddb

# before (inside daplug_ddb/common/publisher.py)
from . import logger
import boto3

# after
from daplug_core import publisher

publisher.publish(
    arn=self.sns_arn,
    data=payload,
    fifo_group_id=fifo_group,
    fifo_duplication_id=fifo_dedupe,
    attributes={"source": "daplug-ddb"},
)
# before
from .common.dict_merger import merge

# after
from daplug_core import dict_merger

updated_item = dict_merger.merge(original, patch, update_list_operation="replace")

The same pattern applies inside daplug-cypher when merging node payloads or formatting SNS events.


⚙️ Local development

git clone https://github.com/paulcruse3/daplug-core.git
cd daplug-core
pipenv install --dev

Run tests & coverage

pipenv run test       # pytest tests/
pipenv run test-cov   # pytest --cov=daplug_core --cov-report=term-missing
pipenv run lint       # pylint --fail-under 10 daplug_core

Ship updates downstream

  1. Bump the version in setup.py (and setup.cfg if needed).
  2. Publish to PyPI or deliver a git tag.
  3. Update daplug-ddb and daplug-cypher to depend on the new version.
  4. Remove any residual common/ references in those repos and re-run their suites.

🤝 Contributing

Pull requests are welcome—especially improvements that make life easier for the DynamoDB and Cypher adapters. If you add a helper here, remember to wire it up in the consuming packages as well.

git checkout -b feat/better-schema-mapper
pipenv run test-cov
pipenv run lint
git commit -am "feat: better schema mapper"

📄 License

Apache 2.0 – see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daplug_core-1.0.0b8.tar.gz (21.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

daplug_core-1.0.0b8-py3-none-any.whl (16.2 kB view details)

Uploaded Python 3

File details

Details for the file daplug_core-1.0.0b8.tar.gz.

File metadata

  • Download URL: daplug_core-1.0.0b8.tar.gz
  • Upload date:
  • Size: 21.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.14

File hashes

Hashes for daplug_core-1.0.0b8.tar.gz
Algorithm Hash digest
SHA256 4e0a38d6eaeda888bd937dfc3e9aca505ee746dfc91aed7895e197ac77cbe7b7
MD5 4b07787d9cf0b7b83044996237bb91bd
BLAKE2b-256 3824a0466abba5808e8dc9d15af1714ce88dde5857b2ad5ef94991473fbe0d72

See more details on using hashes here.

File details

Details for the file daplug_core-1.0.0b8-py3-none-any.whl.

File metadata

  • Download URL: daplug_core-1.0.0b8-py3-none-any.whl
  • Upload date:
  • Size: 16.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.14

File hashes

Hashes for daplug_core-1.0.0b8-py3-none-any.whl
Algorithm Hash digest
SHA256 70bbfc735ac9a1eee057844ab2170155c531a0fbb7c0105005dbed5e1bce75f1
MD5 378091db0d39760f6b693d9a11fe4531
BLAKE2b-256 cdef80a0918b22072404b070760f7bc2bf03886cad8ee34ab7b0023f84f9ad2f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page