Shared schema, merge, and SNS helpers powering daplug adapters.
Project description
๐ daplug-sql (daโขplug)
Schema-Driven SQL Normalization & Event Publishing for Python
daplug-sql wraps psycopg2 / mysql-connector with optimistic CRUD helpers and SNS event fan-out so your Postgres and MySQL services stay DRY and event-driven.
๐ Agents โ a dedicated playbook lives in
.agents/AGENTS.md.
โจ Key Features
- Single adapter factory โ
daplug_sql.adapter(**kwargs)returns a ready-to-go adapter configured for Postgres or MySQL based on theengineparameter. - Optimistic CRUD โ Identifier-aware
insert,update,upsert, anddeleteguard against duplicates and emit SNS events automatically. - Connection reuse โ Thread-safe cache reuses connections per endpoint/database/user/port/engine and lazily closes them.
- Integration-tested โ
pipenv run integrationspins up both Postgres and MySQL via docker-compose and runs the real test suite.
๐ Quick Start
Installation
pip install daplug-sql
# pipenv install daplug-sql
# poetry add daplug-sql
# uv pip install daplug-sql
Minimal Example
from daplug_sql import adapter
sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="postgres", # "mysql" also supported
)
sql.connect()
sql.insert(
data={"customer_id": "abc123", "name": "Ada"},
table="customers",
identifier="customer_id",
)
record = sql.get("abc123", table="customers", identifier="customer_id")
print(record)
sql.close()
โ๏ธ Configuration
| Parameter | Type | Required | Description |
|---|---|---|---|
endpoint |
str |
โ | Host/IP of the Postgres/MySQL server. |
database |
str |
โ | Database/schema name. |
user |
str |
โ | Database username. |
password |
str |
โ | Database password. |
engine |
str |
โ | 'postgres' (default) or 'mysql'. |
autocommit |
bool |
โ | Defaults to True; set False for manual transaction control. |
sns_arn |
str |
โ | SNS topic ARN used when publishing CRUD events. |
sns_endpoint |
str |
โ | Optional SNS endpoint URL (e.g., LocalStack). |
sns_attributes |
dict |
โ | Default SNS message attributes merged into every publish. |
Per-Call Options
Every CRUD/query helper expects the target table and identifier column at call time so one adapter can manage multiple tables:
| Argument | Description |
|---|---|
table |
Table to operate on (customers, orders, etc.). |
identifier |
Column that uniquely identifies rows (customer_id). |
commit |
Override autocommit per call (True/False). |
debug |
Log SQL statements via the adapter logger when True. |
sns_attributes |
Per-call attributes merged with defaults before publish. |
fifo_group_id / fifo_duplication_id |
Optional FIFO metadata passed straight to SNS. |
SNS Publishing
SQLAdapter inherits daplug-core's SNS publisher. Provide the topic details when constructing the adapter:
sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="postgres",
sns_arn="arn:aws:sns:us-east-1:123456789012:sql-events",
sns_endpoint="http://localhost:4566", # optional (LocalStack)
sns_attributes={"service": "billing"},
)
sns_attributespassed toadapter(...)become defaults for every publish.- Each CRUD helper accepts its own
sns_attributesto overlay call-specific metadata. - FIFO topics are supported via the
fifo_group_idandfifo_duplication_idkwargs on individual calls.
Example:
sql.insert(
data={"customer_id": "abc123", "name": "Ada"},
table="customers",
identifier="customer_id",
sns_attributes={"event": "customer-created"},
fifo_group_id="customers",
)
If sns_arn is omitted, publish calls are skipped automatically.
๐งญ Public API Cheat Sheet
| Method | Description |
|---|---|
connect() |
Opens a connection + cursor using the engine-specific connector. |
close() |
Closes the cursor/connection and evicts the cached connector. |
commit(commit=True) |
Commits the underlying DB connection when commit is truthy. |
insert(data, table, identifier, **kwargs) |
Validates data, enforces uniqueness on the provided identifier, inserts the row, and publishes SNS. |
update(data, table, identifier, **kwargs) |
Fetches the existing row, merges via dict_merger, runs UPDATE, publishes SNS. |
upsert(data, table, identifier, **kwargs) |
Calls update when the row exists; falls back to insert. |
get(identifier_value, table, identifier, **kwargs) |
Returns the first matching row or None. |
read(identifier_value, table, identifier, **kwargs) |
Alias of get. |
query(query, params, table, identifier, **kwargs) |
Executes a read-only statement (SELECT) and returns all rows as dictionaries. |
delete(identifier_value, table, identifier, **kwargs) |
Deletes the row, publishes SNS, and ignores missing rows. |
create_index(table_name, index_columns) |
Issues CREATE INDEX index_col1_col2 ON table_name (col1, col2) using safe identifiers. |
All identifier-based helpers sanitize names with
SAFE_IDENTIFIERto prevent SQL injection through table/column inputs.
๐ Usage Examples
Insert + Query (Postgres)
sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="postgres",
)
sql.connect()
sql.insert(data={"sku": "W-1000", "name": "Widget", "cost": 99}, table="inventory", identifier="sku")
rows = sql.query(
query="SELECT sku, name FROM inventory WHERE cost >= %(min_cost)s",
params={"min_cost": 50},
table="inventory",
identifier="sku",
)
print(rows)
sql.close()
Transactions (MySQL)
sql = adapter(
endpoint="127.0.0.1",
database="daplug",
user="svc",
password="secret",
engine="mysql",
autocommit=False,
)
sql.connect()
try:
sql.insert(data={"order_id": "O-1", "status": "pending"}, table="orders", identifier="order_id", commit=False)
sql.update(data={"order_id": "O-1", "status": "shipped"}, table="orders", identifier="order_id", commit=False)
sql.commit(True)
finally:
sql.close()
Per-call Table Overrides
# Share one adapter across multiple tables by overriding table + identifier per call
sql.insert(data=payload, table="orders", identifier="order_id")
sql.create_index("orders", ["status", "created_at"])
๐งช Testing & Tooling
| Command | Description |
|---|---|
pipenv run lint |
Runs pylint and exports HTML/JSON to coverage/lint. |
pipenv run type-check |
Runs mypy using the new Protocol types. |
pipenv run test |
Executes the unit suite (mocks only). |
pipenv run integration |
Starts Postgres + MySQL via docker-compose and runs tests/integration. |
pipenv run test_ci |
Runs unit tests and integration tests sequentially (no Docker management). |
pipenv run coverage |
Full coverage run producing HTML, XML, JUnit, and pretty reports. |
Integration tests rely on tests/integration/docker-compose.yml. The CircleCI pipeline mirrors this by launching Postgres and MySQL sidecars, waiting for them to be reachable, and then executing pipenv run coverage so artifacts are published automatically.
๐ Project Layout
daplug-sql/
โโโ daplug_sql/
โย ย โโโ adapter.py # SQLAdapter implementation
โย ย โโโ exception.py # Adapter-specific exceptions
โย ย โโโ sql_connector.py # Engine-aware connector wrapper
โย ย โโโ sql_connection.py # Connection caching decorators
โย ย โโโ types/__init__.py # Shared typing helpers (Protocols, aliases)
โย ย โโโ __init__.py # Adapter factory export
โโโ tests/
โย ย โโโ unit/ # Pure unit tests (mocks only)
โย ย โโโ integration/ # Integration tests (Postgres + MySQL)
โโโ tests/integration/docker-compose.yml
โโโ Pipfile / Pipfile.lock # Runtime + dev dependencies
โโโ setup.py # Packaging metadata
โโโ README.md
โโโ .agents/AGENTS.md # Automation/Triage playbook for agents
๐ค Contributing
- Fork / branch (
git checkout -b feature/amazing) pipenv install --dev- Add/change code + tests
- Run
pipenv run lint && pipenv run type-check && pipenv run test && pipenv run integration - Open a pull request and tag
@dual
๐ License
Apache License 2.0 โ see LICENSE.
Built to keep SQL integrations event-driven and zero-boilerplate.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daplug_sql-1.0.0b1.tar.gz.
File metadata
- Download URL: daplug_sql-1.0.0b1.tar.gz
- Upload date:
- Size: 18.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
055b6d10fcea34c9c18195d2787692cf5fd5a7acde2134a5ae97367f99b4d3f7
|
|
| MD5 |
098eeaa5ca3e562f26ee9d1ad456d421
|
|
| BLAKE2b-256 |
1234538982b0a3ee4ec5cc0102963ef0b1ddf46feb02ebc413394371e6ed3c42
|
File details
Details for the file daplug_sql-1.0.0b1-py3-none-any.whl.
File metadata
- Download URL: daplug_sql-1.0.0b1-py3-none-any.whl
- Upload date:
- Size: 15.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b838ad71561c7b2c3eee225246f4b4dac97ca45c646eac918e4dd3e2609af6c7
|
|
| MD5 |
3fc26c4e7eb51cf9d73fd1692cff7f94
|
|
| BLAKE2b-256 |
e26fba3f911d198d978a618e1ea60e1b2c38ede8da57148d4662c615b85e11e9
|