Shared schema, merge, and SNS helpers powering daplug adapters.
Project description
🧩 daplug-core (da•plug)
Shared schema + event plumbing for daplug- adapters*
daplug-core is the tiny layer of glue that both daplug-ddb, daplug-cypher, and future daplug-* projects, relied on in their old common/ directories. It bundles a publisher, logging shim, schema utilities, and merge helpers so the higher-level adapters can stay laser-focused on their respective datastores. This repository is not meant to be a fully fledged adapter on its own—it simply centralizes the primitives the adapters share.
🌈 Why this exists
- Single source of truth – The DynamoDB and Cypher adapters used to carry duplicate copies of the same helpers.
daplug-corekeeps those modules in one place. - Batteries-included SNS publishing – The base
publisherencapsulates SNS fan-out, FIFO metadata, and logging so consuming packages can just hand it messages. - Schema-first tooling –
schema_loaderandschema_mapperread OpenAPI/JSON schemas and project payloads to the shapes your adapters expect. - Deterministic merging –
dict_mergerupgrades nested payloads with configurable list/dict strategies (add, replace, remove) so you can keep optimistic writes tight. - Model-driven event catalog –
event_registry+asyncapi_generatorturn the events you already publish through daplug into a generated AsyncAPI spec. Because publishing flows through daplug, the catalog is derived from your models instead of hand-written and drifting.
If you are migrating daplug-ddb or daplug-cypher, remove their legacy common/ folder and import from daplug_core instead. Nothing else changes.
📦 Installation
pip install daplug-core
# or
pipenv install daplug-core
Not on PyPI yet? Until release, install straight from the repo:
pip install git+https://github.com/paulcruse3/daplug-core.git
🔁 How consuming packages use the base
- Declare the dependency in the adapter package (e.g.
daplug-ddb) via Pipfile/pyproject. - Drop the duplicated modules (
common/logger.py,common/publisher.py, etc.). - Import from
daplug_corewherever those utilities were previously referenced.
# inside daplug-ddb
from daplug_core import dict_merger, json_helper, publisher, schema_mapper
merged = dict_merger.merge(original, incoming, update_list_operation="replace")
publisher.publish(arn=sns_arn, data=merged, attributes={"event": "updated"})
Because the API surface stayed the same, adapter code typically only needs import-path updates.
🧱 Building blocks
| Module | Purpose |
|---|---|
base_adapter.BaseAdapter |
Minimal SNS-aware adapter scaffold (used as a mixin by higher-level adapters). |
publisher.publish |
Thin wrapper over boto3 SNS clients with FIFO group/dedupe support and structured logging. |
logger.log |
Consistent JSON stdout logging that honors RUN_MODE=unittest. |
json_helper |
Best-effort try_encode_json / try_decode_json helpers used by loggers and publishers. |
schema_loader.load_schema |
Loads an OpenAPI/JSON schema and resolves $refs using jsonref. |
schema_mapper.map_to_schema |
Recursively projects payloads into schema-shaped dictionaries (supports allOf inheritance). |
dict_merger.merge |
Deep merge with per-call list/dict strategies (add, remove, replace, upsert). |
event_registry.register_event |
Declares an event name and the schema (schema_file + schema_key) that describes its payload. |
asyncapi_generator |
Builds an AsyncAPI 3.0 spec from the registered events, $ref-ing the same OpenAPI schemas your API already defines. |
Mix and match these pieces inside datastore-specific adapters.
🔕 Per-call publish controls
BaseAdapter.publish recognises two extra kwargs that adapters propagate
through their create/update/delete method calls:
| Kwarg | Effect |
|---|---|
publish=False |
Skip the SNS publish for this call only. Defaults are unchanged. |
publish_data=<payload> |
Publish this payload to SNS instead of the data the adapter just wrote. Useful when the SNS consumers want a different shape than the row that was stored. |
adapter.create(data=row, publish=False) # write, do not notify
adapter.update(data=row, publish_data={"id": row["id"], "event": "updated"})
If both are passed, publish=False wins.
📣 Event catalog generation (AsyncAPI)
Services that publish through daplug shouldn't hand-maintain a separate EVENTS.md.
The payload of every event is just a model snapshot, and that model is already
described in the service's openapi.yml under components/schemas. event_registry
and asyncapi_generator turn that into a generated, verifiable contract.
Generate-only today. The registry is read at build time to emit the spec. It does not validate or reshape payloads on the publish hot path (a possible future follow-up).
1. Register each event next to where it is published
from daplug_core import event_registry
event_registry.register_event(
"v1-documents-document-created", # the SNS `event` attribute
"api/v1/openapi.yml", # schema_file: where the payload schema lives
"Document", # schema_key: the components/schemas key
"A document was created", # optional human description
)
Registration is the single source that binds an event name to the model schema describing its payload. The same map can later gate CI (every published event name must be registered) and drive publish-time validation.
2. Generate the spec
python -m daplug_core.asyncapi_generator \
--title documents \
--version v1 \
--channel documents \
--bootstrap api.v1.persistence.events_registry \
--output api/v1/asyncapi.yml
--bootstrap imports the module(s) that call register_event so the registry is
populated before the spec is written (repeatable). The emitted asyncapi.yml is
AsyncAPI 3.0: each event becomes a message whose payload $refs
#/components/schemas/<schema_key>, pulled from your OpenAPI file via
schema_loader (so REST and events share one schema definition).
3. Verify and publish like OpenAPI
Treat the generated file exactly like openapi.yml: regenerate it in CI and diff
against the committed copy (fails on drift), then render it to Confluence. Because
the spec is generated from registered events, an undocumented event can't be
omitted and a stale payload can't survive the diff.
Programmatic use
from daplug_core import asyncapi_generator
spec = asyncapi_generator.generate(title="documents", version="v1", channel="documents")
asyncapi_generator.write_spec(spec, "api/v1/asyncapi.yml")
🧭 Example: refactoring daplug-ddb
# before (inside daplug_ddb/common/publisher.py)
from . import logger
import boto3
# after
from daplug_core import publisher
publisher.publish(
arn=self.sns_arn,
data=payload,
fifo_group_id=fifo_group,
fifo_duplication_id=fifo_dedupe,
attributes={"source": "daplug-ddb"},
)
# before
from .common.dict_merger import merge
# after
from daplug_core import dict_merger
updated_item = dict_merger.merge(original, patch, update_list_operation="replace")
The same pattern applies inside daplug-cypher when merging node payloads or formatting SNS events.
⚙️ Local development
git clone https://github.com/paulcruse3/daplug-core.git
cd daplug-core
pipenv install --dev
Run tests & coverage
pipenv run test # pytest tests/
pipenv run test-cov # pytest --cov=daplug_core --cov-report=term-missing
pipenv run lint # pylint --fail-under 10 daplug_core
Ship updates downstream
- Bump the version in
setup.py(andsetup.cfgif needed). - Publish to PyPI or deliver a git tag.
- Update
daplug-ddbanddaplug-cypherto depend on the new version. - Remove any residual
common/references in those repos and re-run their suites.
🤝 Contributing
Pull requests are welcome—especially improvements that make life easier for the DynamoDB and Cypher adapters. If you add a helper here, remember to wire it up in the consuming packages as well.
git checkout -b feat/better-schema-mapper
pipenv run test-cov
pipenv run lint
git commit -am "feat: better schema mapper"
📄 License
Apache 2.0 – see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daplug_core-1.0.0b8.tar.gz.
File metadata
- Download URL: daplug_core-1.0.0b8.tar.gz
- Upload date:
- Size: 21.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4e0a38d6eaeda888bd937dfc3e9aca505ee746dfc91aed7895e197ac77cbe7b7
|
|
| MD5 |
4b07787d9cf0b7b83044996237bb91bd
|
|
| BLAKE2b-256 |
3824a0466abba5808e8dc9d15af1714ce88dde5857b2ad5ef94991473fbe0d72
|
File details
Details for the file daplug_core-1.0.0b8-py3-none-any.whl.
File metadata
- Download URL: daplug_core-1.0.0b8-py3-none-any.whl
- Upload date:
- Size: 16.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
70bbfc735ac9a1eee057844ab2170155c531a0fbb7c0105005dbed5e1bce75f1
|
|
| MD5 |
378091db0d39760f6b693d9a11fe4531
|
|
| BLAKE2b-256 |
cdef80a0918b22072404b070760f7bc2bf03886cad8ee34ab7b0023f84f9ad2f
|