Shared schema, merge, and SNS helpers powering daplug adapters.
Project description
🧩 daplug-core (da•plug)
Shared schema + event plumbing for daplug- adapters*
daplug-core is the tiny layer of glue that both daplug-ddb, daplug-cypher, and future daplug-* projects, relied on in their old common/ directories. It bundles a publisher, logging shim, schema utilities, and merge helpers so the higher-level adapters can stay laser-focused on their respective datastores. This repository is not meant to be a fully fledged adapter on its own—it simply centralizes the primitives the adapters share.
🌈 Why this exists
- Single source of truth – The DynamoDB and Cypher adapters used to carry duplicate copies of the same helpers.
daplug-corekeeps those modules in one place. - Batteries-included SNS publishing – The base
publisherencapsulates SNS fan-out, FIFO metadata, and logging so consuming packages can just hand it messages. - Schema-first tooling –
schema_loaderandschema_mapperread OpenAPI/JSON schemas and project payloads to the shapes your adapters expect. - Deterministic merging –
dict_mergerupgrades nested payloads with configurable list/dict strategies (add, replace, remove) so you can keep optimistic writes tight. - Model-driven event catalog – publish a self-naming Pydantic event model and
asyncapi_generatorreads those models to emit the AsyncAPI spec (exactly like OpenAPI generation reads decorated handlers). No hand-maintained catalog to drift;required_payload_keys/required_headersenforce the body + metadata contract before anything is published.
If you are migrating daplug-ddb or daplug-cypher, remove their legacy common/ folder and import from daplug_core instead. Nothing else changes.
📦 Installation
pip install daplug-core
# or
pipenv install daplug-core
Not on PyPI yet? Until release, install straight from the repo:
pip install git+https://github.com/paulcruse3/daplug-core.git
🔁 How consuming packages use the base
- Declare the dependency in the adapter package (e.g.
daplug-ddb) via Pipfile/pyproject. - Drop the duplicated modules (
common/logger.py,common/publisher.py, etc.). - Import from
daplug_corewherever those utilities were previously referenced.
# inside daplug-ddb
from daplug_core import dict_merger, json_helper, publisher, schema_mapper
merged = dict_merger.merge(original, incoming, update_list_operation="replace")
publisher.publish(arn=sns_arn, data=merged, attributes={"event": "updated"})
Because the API surface stayed the same, adapter code typically only needs import-path updates.
🧱 Building blocks
| Module | Purpose |
|---|---|
base_adapter.BaseAdapter |
Minimal SNS-aware adapter scaffold (used as a mixin by higher-level adapters). |
publisher.publish |
Thin wrapper over boto3 SNS clients with FIFO group/dedupe support and structured logging. |
logger.log |
Consistent JSON stdout logging that honors RUN_MODE=unittest. |
json_helper |
Best-effort try_encode_json / try_decode_json helpers used by loggers and publishers. |
schema_loader.load_schema |
Loads an OpenAPI/JSON schema and resolves $refs using jsonref. |
schema_mapper.map_to_schema |
Recursively projects payloads into schema-shaped dictionaries (supports allOf inheritance). |
dict_merger.merge |
Deep merge with per-call list/dict strategies (add, remove, replace, upsert). |
base_adapter.required_payload_keys / required_headers |
Transport-agnostic publish contract: the body keys and metadata headers that must be present, validated before publish (raises PublishContractError). |
asyncapi_generator |
Globs your Pydantic event models (those with an event_name ClassVar) and emits an AsyncAPI 3.0 spec — payload from the model, headers from the metadata contract. No hand catalog. |
Mix and match these pieces inside datastore-specific adapters.
🔕 Per-call publish controls
BaseAdapter.publish recognises two extra kwargs that adapters propagate
through their create/update/delete method calls:
| Kwarg | Effect |
|---|---|
publish=False |
Skip the SNS publish for this call only. Defaults are unchanged. |
publish_data=<payload> |
Publish this payload to SNS instead of the data the adapter just wrote. Useful when the SNS consumers want a different shape than the row that was stored. |
adapter.create(data=row, publish=False) # write, do not notify
adapter.update(data=row, publish_data={"id": row["id"], "event": "updated"})
If both are passed, publish=False wins.
📣 Event catalog generation (AsyncAPI)
This works exactly like OpenAPI generation: the developer writes normal typed
code, and generate-asyncapi reads that code to emit api/v1/asyncapi.yml,
which CI byte-diffs. There is no hand-maintained catalog — the event model is
the single source of truth, the same way a Pydantic response model is for OpenAPI.
An event is a Pydantic model that names itself via an event_name ClassVar;
its fields are the payload schema. Publish the model and daplug reads the name +
payload off it. The generator globs the event models, so an event cannot be
published without appearing in the spec.
1. Declare the event (the only thing the developer writes)
# api/v1/events/document_events.py
from typing import ClassVar
from pydantic import BaseModel, Field
class DocumentCreated(BaseModel):
"""A document was created""" # docstring -> AsyncAPI message title
event_name: ClassVar[str] = "v1-documents-document-created" # the ONLY place the name lives
document_id: str = Field(examples=["doc-1"])
user_id: str
status: str = "draft"
created: str
modified: str
2. Publish it — daplug reads the name + payload off the model
# the adapter detects the Pydantic model, dumps it, injects event_name as the
# 'event' header, and enforces the contract before anything reaches the transport
self.adapter.create(data=DocumentCreated.model_validate(document.to_dict()))
Declare the contract once on the adapter (transport-agnostic):
daplug_ddb.adapter(
...,
required_payload_keys=["document_id", "user_id", "status", "created", "modified"], # message body
required_headers=["event", "service", "version"], # message metadata
)
required_payload_keys (body) and required_headers (metadata) are validated
before publish — a missing key raises PublishContractError, never a silent
half-event. They map onto the two halves of an AsyncAPI message: payload and
headers. Transport bindings stay operator-prefixed (sns_arn, future
kafka_brokers/eventbridge_bus); the contract names stay generic.
3. Generate the spec (mirrors generate-openapi)
python -m daplug_core.asyncapi_generator \
--title documents --version v1 --channel documents \
--events 'api/v1/events/**/*.py' \
--output api/v1/asyncapi.yml
--events globs and imports the event modules, discovers every Pydantic model
carrying an event_name, and emits AsyncAPI 3.0: payload from
model_json_schema() (nested models inlined, no external $refs) and headers
from the message metadata contract. Regenerate + diff in CI exactly like
openapi.yml; render to Confluence with confluence-ops/publish-asyncapi.
The legacy
--bootstrap/event_registrycatalog path is retained, deprecated, only so in-flight repos migrate one event at a time.
🧭 Example: refactoring daplug-ddb
# before (inside daplug_ddb/common/publisher.py)
from . import logger
import boto3
# after
from daplug_core import publisher
publisher.publish(
arn=self.sns_arn,
data=payload,
fifo_group_id=fifo_group,
fifo_duplication_id=fifo_dedupe,
attributes={"source": "daplug-ddb"},
)
# before
from .common.dict_merger import merge
# after
from daplug_core import dict_merger
updated_item = dict_merger.merge(original, patch, update_list_operation="replace")
The same pattern applies inside daplug-cypher when merging node payloads or formatting SNS events.
⚙️ Local development
git clone https://github.com/paulcruse3/daplug-core.git
cd daplug-core
pipenv install --dev
Run tests & coverage
pipenv run test # pytest tests/
pipenv run test-cov # pytest --cov=daplug_core --cov-report=term-missing
pipenv run lint # pylint --fail-under 10 daplug_core
Ship updates downstream
- Bump the version in
setup.py(andsetup.cfgif needed). - Publish to PyPI or deliver a git tag.
- Update
daplug-ddbanddaplug-cypherto depend on the new version. - Remove any residual
common/references in those repos and re-run their suites.
🤝 Contributing
Pull requests are welcome—especially improvements that make life easier for the DynamoDB and Cypher adapters. If you add a helper here, remember to wire it up in the consuming packages as well.
git checkout -b feat/better-schema-mapper
pipenv run test-cov
pipenv run lint
git commit -am "feat: better schema mapper"
📄 License
Apache 2.0 – see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daplug_core-1.0.0b9.tar.gz.
File metadata
- Download URL: daplug_core-1.0.0b9.tar.gz
- Upload date:
- Size: 25.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d1fa067f50bf0bb5e05814a71cce838126dd9386405d53bb458632db122c72f4
|
|
| MD5 |
2eab740c3e2491c858f992e7a720fefc
|
|
| BLAKE2b-256 |
fa122d0c79d9f506ff3e5495e2b0db0faa7cabaf8a59787679fd0e136dd4bc28
|
File details
Details for the file daplug_core-1.0.0b9-py3-none-any.whl.
File metadata
- Download URL: daplug_core-1.0.0b9-py3-none-any.whl
- Upload date:
- Size: 18.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2fe089ade92996cc75a02e2c4df971c6667c8e91e13d93154a16f945f6552c7d
|
|
| MD5 |
7a4be409cd1be1260f1e2030f61cbbea
|
|
| BLAKE2b-256 |
52ea54097f8345d9c88985e9d2bd0af4715a881c39a3d36f9da6d053127787d0
|