Skip to main content

Shared schema, merge, and SNS helpers powering daplug adapters.

Project description

๐Ÿ”— daplug-sql (daโ€ขplug)

Schema-Driven SQL Normalization & Event Publishing for Python

CircleCI Quality Gate Status Bugs Coverage Python PyPI package License Contributions

daplug-sql wraps psycopg2 / mysql-connector with optimistic CRUD helpers and SNS event fan-out so your Postgres and MySQL services stay DRY and event-driven.

๐Ÿ“Ž Agents โ€“ a dedicated playbook lives in .agents/AGENTS.md.


โœจ Key Features

  • Single adapter factory โ€“ daplug_sql.adapter(**kwargs) returns a ready-to-go adapter configured for Postgres or MySQL based on the engine parameter.
  • Optimistic CRUD โ€“ Identifier-aware insert, update, upsert, and delete guard against duplicates and emit SNS events automatically.
  • Connection reuse โ€“ Thread-safe cache reuses connections per endpoint/database/user/port/engine and lazily closes them.
  • Integration-tested โ€“ pipenv run integration spins up both Postgres and MySQL via docker-compose and runs the real test suite.

๐Ÿš€ Quick Start

Installation

pip install daplug-sql
# pipenv install daplug-sql
# poetry add daplug-sql
# uv pip install daplug-sql

Minimal Example

from daplug_sql import adapter

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="postgres",  # "mysql" also supported
)

sql.connect()
sql.insert(
    data={"customer_id": "abc123", "name": "Ada"},
    table="customers",
    identifier="customer_id",
)
record = sql.get("abc123", table="customers", identifier="customer_id")
print(record)
sql.close()

โš™๏ธ Configuration

Parameter Type Required Description
endpoint str โœ… Host/IP of the Postgres/MySQL server.
database str โœ… Database/schema name.
user str โœ… Database username.
password str โœ… Database password.
engine str โž– 'postgres' (default) or 'mysql'.
autocommit bool โž– Defaults to True; set False for manual transaction control.
sns_arn str โž– SNS topic ARN used when publishing CRUD events.
sns_endpoint str โž– Optional SNS endpoint URL (e.g., LocalStack).
sns_attributes dict โž– Default SNS message attributes merged into every publish.

Per-Call Options

Every CRUD/query helper expects the target table and identifier column at call time so one adapter can manage multiple tables:

Argument Description
table Table to operate on (customers, orders, etc.).
identifier Column that uniquely identifies rows (customer_id).
commit Override autocommit per call (True/False).
debug Log SQL statements via the adapter logger when True.
sns_attributes Per-call attributes merged with defaults before publish.
fifo_group_id / fifo_duplication_id Optional FIFO metadata passed straight to SNS.
publish Set to False to skip the SNS publish for this call only (default True).
publish_data Replace the published payload entirely (the row write is unchanged).

SNS Publishing

SQLAdapter inherits daplug-core's SNS publisher. Provide the topic details when constructing the adapter:

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="postgres",
    sns_arn="arn:aws:sns:us-east-1:123456789012:sql-events",
    sns_endpoint="http://localhost:4566",  # optional (LocalStack)
    sns_attributes={"service": "billing"},
)
  • sns_attributes passed to adapter(...) become defaults for every publish.
  • Each CRUD helper accepts its own sns_attributes to overlay call-specific metadata.
  • FIFO topics are supported via the fifo_group_id and fifo_duplication_id kwargs on individual calls.

Example:

sql.insert(
    data={"customer_id": "abc123", "name": "Ada"},
    table="customers",
    identifier="customer_id",
    sns_attributes={"event": "customer-created"},
    fifo_group_id="customers",
)

If sns_arn is omitted, publish calls are skipped automatically. To skip a single call while keeping defaults intact, pass publish=False. To publish a different payload than the row that was written, pass publish_data={...}.

sql.insert(data=row, table="customers", identifier="customer_id", publish=False)

sql.update(
    data=row,
    table="customers",
    identifier="customer_id",
    publish_data={"id": row["customer_id"], "event": "updated"},
)

๐Ÿงญ Public API Cheat Sheet

Method Description
connect() Opens a connection + cursor using the engine-specific connector.
close() Closes the cursor/connection and evicts the cached connector.
commit(commit=True) Commits the underlying DB connection when commit is truthy.
insert(data, table, identifier, **kwargs) Validates data, enforces uniqueness on the provided identifier, inserts the row, and publishes SNS.
update(data, table, identifier, **kwargs) Fetches the existing row, merges via dict_merger, runs UPDATE, publishes SNS.
upsert(data, table, identifier, **kwargs) Calls update when the row exists; falls back to insert.
get(identifier_value, table, identifier, **kwargs) Returns the first matching row or None.
read(identifier_value, table, identifier, **kwargs) Alias of get.
query(query, params, table, identifier, **kwargs) Executes a read-only statement (SELECT) and returns all rows as dictionaries.
delete(identifier_value, table, identifier, **kwargs) Deletes the row, publishes SNS, and ignores missing rows.
create_index(table_name, index_columns) Issues CREATE INDEX index_col1_col2 ON table_name (col1, col2) using safe identifiers.

All identifier-based helpers sanitize names with SAFE_IDENTIFIER to prevent SQL injection through table/column inputs.


๐Ÿ“š Usage Examples

Insert + Query (Postgres)

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="postgres",
)
sql.connect()

sql.insert(data={"sku": "W-1000", "name": "Widget", "cost": 99}, table="inventory", identifier="sku")
rows = sql.query(
    query="SELECT sku, name FROM inventory WHERE cost >= %(min_cost)s",
    params={"min_cost": 50},
    table="inventory",
    identifier="sku",
)
print(rows)
sql.close()

Transactions (MySQL)

sql = adapter(
    endpoint="127.0.0.1",
    database="daplug",
    user="svc",
    password="secret",
    engine="mysql",
    autocommit=False,
)
sql.connect()

try:
    sql.insert(data={"order_id": "O-1", "status": "pending"}, table="orders", identifier="order_id", commit=False)
    sql.update(data={"order_id": "O-1", "status": "shipped"}, table="orders", identifier="order_id", commit=False)
    sql.commit(True)
finally:
    sql.close()

Per-call Table Overrides

# Share one adapter across multiple tables by overriding table + identifier per call
sql.insert(data=payload, table="orders", identifier="order_id")
sql.create_index("orders", ["status", "created_at"])

๐Ÿงช Testing & Tooling

Command Description
pipenv run lint Runs pylint and exports HTML/JSON to coverage/lint.
pipenv run type-check Runs mypy using the new Protocol types.
pipenv run test Executes the unit suite (mocks only).
pipenv run integration Starts Postgres + MySQL via docker-compose and runs tests/integration.
pipenv run test_ci Runs unit tests and integration tests sequentially (no Docker management).
pipenv run coverage Full coverage run producing HTML, XML, JUnit, and pretty reports.

Integration tests rely on tests/integration/docker-compose.yml. The CircleCI pipeline mirrors this by launching Postgres and MySQL sidecars, waiting for them to be reachable, and then executing pipenv run coverage so artifacts are published automatically.


๐Ÿ—‚ Project Layout

daplug-sql/
โ”œโ”€โ”€ daplug_sql/
โ”‚ย ย  โ”œโ”€โ”€ adapter.py           # SQLAdapter implementation
โ”‚ย ย  โ”œโ”€โ”€ exception.py         # Adapter-specific exceptions
โ”‚ย ย  โ”œโ”€โ”€ sql_connector.py     # Engine-aware connector wrapper
โ”‚ย ย  โ”œโ”€โ”€ sql_connection.py    # Connection caching decorators
โ”‚ย ย  โ”œโ”€โ”€ types/__init__.py    # Shared typing helpers (Protocols, aliases)
โ”‚ย ย  โ””โ”€โ”€ __init__.py          # Adapter factory export
โ”œโ”€โ”€ tests/
โ”‚ย ย  โ”œโ”€โ”€ unit/                # Pure unit tests (mocks only)
โ”‚ย ย  โ””โ”€โ”€ integration/         # Integration tests (Postgres + MySQL)
โ”œโ”€โ”€ tests/integration/docker-compose.yml
โ”œโ”€โ”€ Pipfile / Pipfile.lock   # Runtime + dev dependencies
โ”œโ”€โ”€ setup.py                 # Packaging metadata
โ”œโ”€โ”€ README.md
โ””โ”€โ”€ .agents/AGENTS.md        # Automation/Triage playbook for agents

๐Ÿค Contributing

  1. Fork / branch (git checkout -b feature/amazing)
  2. pipenv install --dev
  3. Add/change code + tests
  4. Run pipenv run lint && pipenv run type-check && pipenv run test && pipenv run integration
  5. Open a pull request and tag @dual

๐Ÿ“„ License

Apache License 2.0 โ€“ see LICENSE.


Built to keep SQL integrations event-driven and zero-boilerplate.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daplug_sql-1.0.0b4.tar.gz (18.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

daplug_sql-1.0.0b4-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

File details

Details for the file daplug_sql-1.0.0b4.tar.gz.

File metadata

  • Download URL: daplug_sql-1.0.0b4.tar.gz
  • Upload date:
  • Size: 18.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.14

File hashes

Hashes for daplug_sql-1.0.0b4.tar.gz
Algorithm Hash digest
SHA256 1f125295082222e2d5a5851cdf1cc7b8471ce5c5f56f92d67c86baba4613a946
MD5 6ffcd3a29f169d0625937d25dea56db1
BLAKE2b-256 e16373814a039ac20ead70de9fda799a0f67b317c9909c00b055ef5dd3414383

See more details on using hashes here.

File details

Details for the file daplug_sql-1.0.0b4-py3-none-any.whl.

File metadata

  • Download URL: daplug_sql-1.0.0b4-py3-none-any.whl
  • Upload date:
  • Size: 15.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.14

File hashes

Hashes for daplug_sql-1.0.0b4-py3-none-any.whl
Algorithm Hash digest
SHA256 a3f685c525f6b2135c571c1b3c79c0e6b5d6ad0196b8c77f961676b6db352016
MD5 e444d483f7bbd42b06d5df7ac74e8e54
BLAKE2b-256 0fc009ca2ca857753acc9add1123aa7803fd0305be753139180bbe670210dc81

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page