Skip to main content

Shared schema, merge, and SNS helpers powering daplug adapters.

Project description

๐Ÿ”— daplug-sql (daโ€ขplug)

Schema-Driven SQL Normalization & Event Publishing for Python

CircleCI Quality Gate Status Bugs Coverage Python PyPI package License Contributions

daplug-sql wraps psycopg2 / mysql-connector with optimistic CRUD helpers and SNS event fan-out so your Postgres and MySQL services stay DRY and event-driven.

๐Ÿ“Ž Agents โ€“ a dedicated playbook lives in .agents/AGENTS.md.


โœจ Key Features

  • Single adapter factory โ€“ daplug_sql.adapter(**kwargs) returns a ready-to-go adapter configured for Postgres or MySQL based on the engine parameter.
  • Optimistic CRUD โ€“ Identifier-aware insert, update, upsert, and delete guard against duplicates and emit SNS events automatically.
  • Connection reuse โ€“ Thread-safe cache reuses connections per endpoint/database/user/port/engine and lazily closes them.
  • Integration-tested โ€“ pipenv run integration spins up both Postgres and MySQL via docker-compose and runs the real test suite.

๐Ÿš€ Quick Start

Installation

pip install daplug-sql
# pipenv install daplug-sql
# poetry add daplug-sql
# uv pip install daplug-sql

Minimal Example

from daplug_sql import adapter

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="postgres",  # "mysql" also supported
)

sql.connect()
sql.insert(
    data={"customer_id": "abc123", "name": "Ada"},
    table="customers",
    identifier="customer_id",
)
record = sql.get("abc123", table="customers", identifier="customer_id")
print(record)
sql.close()

โš™๏ธ Configuration

Parameter Type Required Description
endpoint str โœ… Host/IP of the Postgres/MySQL server.
database str โœ… Database/schema name.
user str โœ… Database username.
password str โœ… Database password.
engine str โž– 'postgres' (default) or 'mysql'.
autocommit bool โž– Defaults to True; set False for manual transaction control.
sns_arn str โž– SNS topic ARN used when publishing CRUD events.
sns_endpoint str โž– Optional SNS endpoint URL (e.g., LocalStack).
sns_attributes dict โž– Default SNS message attributes merged into every publish.

Per-Call Options

Every CRUD/query helper expects the target table and identifier column at call time so one adapter can manage multiple tables:

Argument Description
table Table to operate on (customers, orders, etc.).
identifier Column that uniquely identifies rows (customer_id).
commit Override autocommit per call (True/False).
debug Log SQL statements via the adapter logger when True.
sns_attributes Per-call attributes merged with defaults before publish.
fifo_group_id / fifo_duplication_id Optional FIFO metadata passed straight to SNS.

SNS Publishing

SQLAdapter inherits daplug-core's SNS publisher. Provide the topic details when constructing the adapter:

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="postgres",
    sns_arn="arn:aws:sns:us-east-1:123456789012:sql-events",
    sns_endpoint="http://localhost:4566",  # optional (LocalStack)
    sns_attributes={"service": "billing"},
)
  • sns_attributes passed to adapter(...) become defaults for every publish.
  • Each CRUD helper accepts its own sns_attributes to overlay call-specific metadata.
  • FIFO topics are supported via the fifo_group_id and fifo_duplication_id kwargs on individual calls.

Example:

sql.insert(
    data={"customer_id": "abc123", "name": "Ada"},
    table="customers",
    identifier="customer_id",
    sns_attributes={"event": "customer-created"},
    fifo_group_id="customers",
)

If sns_arn is omitted, publish calls are skipped automatically.


๐Ÿงญ Public API Cheat Sheet

Method Description
connect() Opens a connection + cursor using the engine-specific connector.
close() Closes the cursor/connection and evicts the cached connector.
commit(commit=True) Commits the underlying DB connection when commit is truthy.
insert(data, table, identifier, **kwargs) Validates data, enforces uniqueness on the provided identifier, inserts the row, and publishes SNS.
update(data, table, identifier, **kwargs) Fetches the existing row, merges via dict_merger, runs UPDATE, publishes SNS.
upsert(data, table, identifier, **kwargs) Calls update when the row exists; falls back to insert.
get(identifier_value, table, identifier, **kwargs) Returns the first matching row or None.
read(identifier_value, table, identifier, **kwargs) Alias of get.
query(query, params, table, identifier, **kwargs) Executes a read-only statement (SELECT) and returns all rows as dictionaries.
delete(identifier_value, table, identifier, **kwargs) Deletes the row, publishes SNS, and ignores missing rows.
create_index(table_name, index_columns) Issues CREATE INDEX index_col1_col2 ON table_name (col1, col2) using safe identifiers.

All identifier-based helpers sanitize names with SAFE_IDENTIFIER to prevent SQL injection through table/column inputs.


๐Ÿ“š Usage Examples

Insert + Query (Postgres)

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="postgres",
)
sql.connect()

sql.insert(data={"sku": "W-1000", "name": "Widget", "cost": 99}, table="inventory", identifier="sku")
rows = sql.query(
    query="SELECT sku, name FROM inventory WHERE cost >= %(min_cost)s",
    params={"min_cost": 50},
    table="inventory",
    identifier="sku",
)
print(rows)
sql.close()

Transactions (MySQL)

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="mysql",
    autocommit=False,
)
sql.connect()

try:
    sql.insert(data={"order_id": "O-1", "status": "pending"}, table="orders", identifier="order_id", commit=False)
    sql.update(data={"order_id": "O-1", "status": "shipped"}, table="orders", identifier="order_id", commit=False)
    sql.commit(True)
finally:
    sql.close()

Per-call Table Overrides

# Share one adapter across multiple tables by overriding table + identifier per call
sql.insert(data=payload, table="orders", identifier="order_id")
sql.create_index("orders", ["status", "created_at"])

๐Ÿงช Testing & Tooling

Command Description
pipenv run lint Runs pylint and exports HTML/JSON to coverage/lint.
pipenv run type-check Runs mypy using the new Protocol types.
pipenv run test Executes the unit suite (mocks only).
pipenv run integration Starts Postgres + MySQL via docker-compose and runs tests/integration.
pipenv run test_ci Runs unit tests and integration tests sequentially (no Docker management).
pipenv run coverage Full coverage run producing HTML, XML, JUnit, and pretty reports.

Integration tests rely on tests/integration/docker-compose.yml. The CircleCI pipeline mirrors this by launching Postgres and MySQL sidecars, waiting for them to be reachable, and then executing pipenv run coverage so artifacts are published automatically.


๐Ÿ—‚ Project Layout

daplug-sql/
โ”œโ”€โ”€ daplug_sql/
โ”‚ย ย  โ”œโ”€โ”€ adapter.py           # SQLAdapter implementation
โ”‚ย ย  โ”œโ”€โ”€ exception.py         # Adapter-specific exceptions
โ”‚ย ย  โ”œโ”€โ”€ sql_connector.py     # Engine-aware connector wrapper
โ”‚ย ย  โ”œโ”€โ”€ sql_connection.py    # Connection caching decorators
โ”‚ย ย  โ”œโ”€โ”€ types/__init__.py    # Shared typing helpers (Protocols, aliases)
โ”‚ย ย  โ””โ”€โ”€ __init__.py          # Adapter factory export
โ”œโ”€โ”€ tests/
โ”‚ย ย  โ”œโ”€โ”€ unit/                # Pure unit tests (mocks only)
โ”‚ย ย  โ””โ”€โ”€ integration/         # Integration tests (Postgres + MySQL)
โ”œโ”€โ”€ tests/integration/docker-compose.yml
โ”œโ”€โ”€ Pipfile / Pipfile.lock   # Runtime + dev dependencies
โ”œโ”€โ”€ setup.py                 # Packaging metadata
โ”œโ”€โ”€ README.md
โ””โ”€โ”€ .agents/AGENTS.md        # Automation/Triage playbook for agents

๐Ÿค Contributing

  1. Fork / branch (git checkout -b feature/amazing)
  2. pipenv install --dev
  3. Add/change code + tests
  4. Run pipenv run lint && pipenv run type-check && pipenv run test && pipenv run integration
  5. Open a pull request and tag @dual

๐Ÿ“„ License

Apache License 2.0 โ€“ see LICENSE.


Built to keep SQL integrations event-driven and zero-boilerplate.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daplug_sql-1.0.0b1.tar.gz (18.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

daplug_sql-1.0.0b1-py3-none-any.whl (15.4 kB view details)

Uploaded Python 3

File details

Details for the file daplug_sql-1.0.0b1.tar.gz.

File metadata

  • Download URL: daplug_sql-1.0.0b1.tar.gz
  • Upload date:
  • Size: 18.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.17

File hashes

Hashes for daplug_sql-1.0.0b1.tar.gz
Algorithm Hash digest
SHA256 055b6d10fcea34c9c18195d2787692cf5fd5a7acde2134a5ae97367f99b4d3f7
MD5 098eeaa5ca3e562f26ee9d1ad456d421
BLAKE2b-256 1234538982b0a3ee4ec5cc0102963ef0b1ddf46feb02ebc413394371e6ed3c42

See more details on using hashes here.

File details

Details for the file daplug_sql-1.0.0b1-py3-none-any.whl.

File metadata

  • Download URL: daplug_sql-1.0.0b1-py3-none-any.whl
  • Upload date:
  • Size: 15.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.17

File hashes

Hashes for daplug_sql-1.0.0b1-py3-none-any.whl
Algorithm Hash digest
SHA256 b838ad71561c7b2c3eee225246f4b4dac97ca45c646eac918e4dd3e2609af6c7
MD5 3fc26c4e7eb51cf9d73fd1692cff7f94
BLAKE2b-256 e26fba3f911d198d978a618e1ea60e1b2c38ede8da57148d4662c615b85e11e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page