A Neo4j and Neptune graph normalization layer using the adapter pattern.
Project description
🔗 daplug-cypher (da•plug)
Schema-Driven Cypher Normalization & Event Publishing for Python
daplug-cypher brings the ergonomics of an adapter patthern to graph databases. It bundles Cypher-friendly schema mapping, optimistic concurrency, and SNS event fan-out so your graph services stay DRY, version-safe, and event-driven—whether you deploy to Neo4j or AWS Neptune (openCypher).
✨ Key Features
- Unified factory –
daplug_cypher.adapter(**kwargs)returns a ready-to-go adapter with SNS support, just likedaplug_ddb. - Schema mapping – Reuse OpenAPI/JSON schemas to validate and normalize payloads before writing nodes or relationships.
- Optimistic concurrency – Guard updates with identifier + version keys; the adapter enforces atomic Cypher
SETsemantics. - Relationship helpers – Convenience methods that enforce safe Cypher patterns for creating/deleting relationships.
- Backend flexibility – Supply
bolt={...}for Neo4j,neptune={...}for Neptune, or both; the adapter chooses the right driver config automatically. - Per-operation targeting – Pass
node,identifier, andidempotence_keyto each call so shared adapters can manage multiple labels safely. - Per-call SNS metadata – Supply
sns_attributeswhen writing to annotate events with request-specific context.
🚀 Quick Start
Installation
pip install daplug-cypher
# pipenv install daplug-cypher
# poetry add daplug-cypher
# uv pip install daplug-cypher
Basic Usage
from daplug_cypher import adapter
graph = adapter(
bolt={
"url": "bolt://localhost:7687",
"user": "neo4j",
"password": "password",
},
schema_file="openapi.yml",
schema="CustomerModel",
)
payload = {
"customer_id": "abc123",
"name": "Ada",
"version": 1,
}
graph.create(data=payload, node="Customer")
result = graph.read(
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
node="Customer",
)
print(result["Customer"][0]["name"])
graph.create(
data=payload,
node="Customer",
sns_attributes={"source": "api"},
)
Because the adapter is schema-aware, every write can opt into mapping by passing schema. Skip it when you want to persist the payload exactly as provided. Select the node label (and identifiers) per call so a single adapter can service multiple models, and add sns_attributes when you want to decorate published events with request-specific context.
🔧 Advanced Configuration
Public API Cheat Sheet
from daplug_cypher import adapter
graph = adapter(bolt={"url": "bolt://localhost:7687", "user": "neo4j", "password": "password"})
# CREATE ---------------------------------------------------------------
graph.create(
data={"customer_id": "abc123", "name": "Ada", "version": 1},
node="Customer",
sns_attributes={"event": "customer-created"},
)
# READ / MATCH ---------------------------------------------------------
graph.read(
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
node="Customer",
)
# QUERY (raw parameterized Cypher) ------------------------------------
graph.query(
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
sns_attributes={"source": "reporting"},
)
# UPDATE (optimistic) --------------------------------------------------
graph.update(
data={"status": "vip"},
query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
placeholder={"id": "abc123"},
original_idempotence_value=1,
node="Customer",
identifier="customer_id",
idempotence_key="version",
sns_attributes={"event": "customer-updated"},
)
# DELETE ---------------------------------------------------------------
graph.delete(
delete_identifier="abc123",
node="Customer",
identifier="customer_id",
sns_attributes={"event": "customer-deleted"},
)
# RELATIONSHIP HELPERS -------------------------------------------------
graph.create_relationship(
query="""
MATCH (c:Customer), (o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
CREATE (c)-[:PLACED]->(o)
RETURN c, o
""",
placeholder={"customer": "abc123", "order": "o-789"},
sns_attributes={"event": "relationship-created"},
)
graph.delete_relationship(
query="""
MATCH (c:Customer)-[r:PLACED]->(o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
DETACH DELETE r
""",
placeholder={"customer": "abc123", "order": "o-789"},
sns_attributes={"event": "relationship-deleted"},
)
Each method mirrors the DynamoDB adapter API: provide per-call metadata, and the adapter handles schema normalization, optimistic locking, driver orchestration, and optional SNS fan-out.
Neo4j & Neptune Targets
graph = adapter(
bolt={"url": "bolt://localhost:7687", "user": "neo4j", "password": "password"},
neptune={"url": "bolt://neptune-endpoint:8182", "user": "user", "password": "secret"},
)
Provide both dictionaries to allow local Neo4j development with a production Neptune endpoint. When neptune is supplied it wins; otherwise bolt is used.
Use the same adapter instance for different node types by passing the appropriate label to each call (e.g., graph.create(..., node="Order")).
Optimistic Updates
graph.update(
data={"order_id": "abc123", "updated_at": 2, "status": "shipped"},
query="MATCH (o:Order) WHERE o.order_id = $id RETURN o",
placeholder={"id": "abc123"},
original_idempotence_value=1, # the previous value of updated_at
node="Order",
identifier="order_id",
idempotence_key="updated_at",
sns_attributes={"event": "status-change"},
)
If another session updates the node first, the adapter raises ValueError("ATOMIC ERROR...") rather than overwriting silently.
Relationship Helpers
graph.create(data={"customer_id": "abc123", "version": 1}, node="Customer")
graph.create(data={"order_id": "o-789", "version": 1}, node="Order")
graph.create_relationship(
query="""
MATCH (c:Customer), (o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
CREATE (c)-[:PLACED]->(o)
RETURN c, o
""",
placeholder={"customer": "abc123", "order": "o-789"},
)
graph.delete_relationship(
query="""
MATCH (c:Customer)-[r:PLACED]->(o:Order)
WHERE c.customer_id = $customer AND o.order_id = $order
DETACH DELETE r
""",
placeholder={"customer": "abc123", "order": "o-789"},
)
Validation ensures relationship queries include edge notation and destructive operations actually delete nodes/relationships.
SNS Event Publishing
graph = adapter(
bolt={...},
sns_arn="arn:aws:sns:us-east-2:123456789012:customers",
sns_attributes={"service": "crm"},
)
graph.delete(delete_identifier="abc123", node="Customer", identifier="customer_id")
Each CRUD helper automatically publishes an SNS message when sns_arn is set. Provide default metadata through sns_attributes at adapter construction (for example {"service": "crm"}) and add request-specific context per call: graph.create(..., sns_attributes={"source": "api"}). Per-call keys override adapter defaults, operation is injected automatically, and None values are stripped so events remain clean. Non-string values are sent using the appropriate SNS Number type.
🧪 Testing
We split fast unit tests from integration suites targeting Neo4j and Neptune-compatible endpoints.
# Unit tests (pure Python, heavy mocking)
pipenv run test
# Integration suites
pipenv run test_neo4j # requires Neo4j Bolt endpoint (defaults to bolt://localhost:7687)
pipenv run test_neptune # reuses Bolt settings, can point at Neptune or LocalStack
# Coverage (Neo4j suite under coverage)
pipenv run coverage
Environment variables to override defaults:
| Variable | Purpose | Default |
|---|---|---|
NEO4J_BOLT_URL |
Neo4j Bolt connection URI | bolt://localhost:7687 |
NEO4J_USER / _PASSWORD |
Neo4j credentials | neo4j / password |
NEPTUNE_BOLT_URL |
Neptune Bolt-compatible endpoint | falls back to Neo4j |
NEPTUNE_USER / _PASSWORD |
Neptune credentials | falls back to Neo4j |
🧰 Tooling & CI
.circleci/config.yml mirrors the DynamoDB project:
install-buildinstalls dependencies and persists the workspace.lintandtype-checkrunpipenv run lintandpipenv run mypy.test-neo4jandtest-neptunerun pytest markers in parallel; the Neptune job provisions a LocalStack container for compatibility checks.install-build-publishretains the token-based PyPI workflow.
🛠️ Local Development
Prerequisites
- Python 3.9+
- Pipenv
- Docker (for running Neo4j or LocalStack locally)
Environment Setup
git clone https://github.com/paulcruse3/daplug-cypher.git
cd daplug-cypher
pipenv install --dev
Workflow
pipenv run lint # pylint (JSON + HTML report)
pipenv run mypy # static typing (post-phase polish)
pipenv run test # unit tests
pipenv run test_neo4j # integration suite (requires Bolt endpoint)
pipenv run test_neptune # integration suite (LocalStack/Neptune)
When running Neo4j via Docker, set NEO4J_AUTH=neo4j/password before docker run so the tests can authenticate automatically.
🗂️ Project Structure
daplug-cypher/
├── daplug_cypher/
│ ├── adapter.py # Cypher adapter implementation
│ ├── common/ # Shared schema, merge, logging, publisher helpers
│ ├── cypher/ # Parameter + serialization utilities
│ ├── types/ # Shared TypedDict/type aliases (reused by common)
│ └── __init__.py # Public adapter factory & exports
├── tests/
│ ├── integration/ # Neo4j & Neptune pytest suites
│ └── unit/ # Mock-based unit coverage for every module
├── .circleci/config.yml # CI pipeline
├── Pipfile # Runtime & dev dependencies + scripts
├── setup.py / setup.cfg # Packaging metadata & pytest config
└── README.md # You are here
🤝 Contributing
We welcome issues and pull requests! Please ensure linting, typing, and both integration suites pass before submitting.
git checkout -b feature/amazing-cypher
# make your changes
pipenv run lint
pipenv run mypy
pipenv run test
pipenv run test_neo4j
pipenv run test_neptune
git commit -am "feat: amazing cypher enhancement"
git push origin feature/amazing-cypher
📄 License
Apache License 2.0 – see LICENSE for the full text.
Built to keep Cypher integrations as clean and predictable
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daplug_cypher-1.0.0b3.tar.gz.
File metadata
- Download URL: daplug_cypher-1.0.0b3.tar.gz
- Upload date:
- Size: 22.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
101c0a78241551d56ce2c9e95c328cb4c51de3b4ce7bf832533cbb68c1d91ca7
|
|
| MD5 |
64864bd68cc27701517d197675331477
|
|
| BLAKE2b-256 |
20d6ae6e544aa8344f7a0602988860a78acd7936440e6900b027aa4f34b69916
|
File details
Details for the file daplug_cypher-1.0.0b3-py3-none-any.whl.
File metadata
- Download URL: daplug_cypher-1.0.0b3-py3-none-any.whl
- Upload date:
- Size: 22.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.17
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
64f66aa8f7a972b1ad73070eb265ea0fb60dc3844b42f5839e372311878274d9
|
|
| MD5 |
bca1da7ed753740ee61a79e759c4bd08
|
|
| BLAKE2b-256 |
dddec62a3b96543e1b421ddd8354fcaeaaa9bffbd1b455ea021257443b155935
|