Skip to main content

A Neo4j and Neptune graph normalization layer using the adapter pattern.

Project description

🔗 daplug-cypher (da•plug)

Schema-Driven Cypher Normalization & Event Publishing for Python

CircleCI Python License Contributions PyPI package Quality Gate Status Bugs Coverage

daplug-cypher brings the ergonomics of an adapter patthern to graph databases. It bundles Cypher-friendly schema mapping, optimistic concurrency, and SNS event fan-out so your graph services stay DRY, version-safe, and event-driven—whether you deploy to Neo4j or AWS Neptune (openCypher).

✨ Key Features

  • Unified factorydaplug_cypher.adapter(**kwargs) returns a ready-to-go adapter with SNS support, just like daplug_ddb.
  • Schema mapping – Reuse OpenAPI/JSON schemas to validate and normalize payloads before writing nodes or relationships.
  • Optimistic concurrency – Guard updates with identifier + version keys; the adapter enforces atomic Cypher SET semantics.
  • Relationship helpers – Convenience methods that enforce safe Cypher patterns for creating/deleting relationships.
  • Backend flexibility – Supply bolt={...} for Neo4j, neptune={...} for Neptune, or both; the adapter chooses the right driver config automatically.
  • Per-operation targeting – Pass node, identifier, and idempotence_key to each call so shared adapters can manage multiple labels safely.
  • Per-call SNS metadata – Supply sns_attributes when writing to annotate events with request-specific context.

🚀 Quick Start

Installation

pip install daplug-cypher
# pipenv install daplug-cypher
# poetry add daplug-cypher
# uv pip install daplug-cypher

Basic Usage

from daplug_cypher import adapter

graph = adapter(
    bolt={
        "url": "bolt://localhost:7687",
        "user": "neo4j",
        "password": "password",
    },
    schema_file="openapi.yml",
    schema="CustomerModel",
)

payload = {
    "customer_id": "abc123",
    "name": "Ada",
    "version": 1,
}

graph.create(data=payload, node="Customer")
result = graph.read(
    query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
    placeholder={"id": "abc123"},
    node="Customer",
)

print(result["Customer"][0]["name"])

graph.create(
    data=payload,
    node="Customer",
    sns_attributes={"source": "api"},
)

Because the adapter is schema-aware, every write can opt into mapping by passing schema. Skip it when you want to persist the payload exactly as provided. Select the node label (and identifiers) per call so a single adapter can service multiple models, and add sns_attributes when you want to decorate published events with request-specific context.

🔧 Advanced Configuration

Public API Cheat Sheet

from daplug_cypher import adapter

graph = adapter(bolt={"url": "bolt://localhost:7687", "user": "neo4j", "password": "password"})

# CREATE ---------------------------------------------------------------
graph.create(
    data={"customer_id": "abc123", "name": "Ada", "version": 1},
    node="Customer",
    sns_attributes={"event": "customer-created"},
)

# READ / MATCH ---------------------------------------------------------
graph.read(
    query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
    placeholder={"id": "abc123"},
    node="Customer",
)

# QUERY (raw parameterized Cypher) ------------------------------------
graph.query(
    query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
    placeholder={"id": "abc123"},
    sns_attributes={"source": "reporting"},
)

# UPDATE (optimistic) --------------------------------------------------
graph.update(
    data={"status": "vip"},
    query="MATCH (c:Customer) WHERE c.customer_id = $id RETURN c",
    placeholder={"id": "abc123"},
    original_idempotence_value=1,
    node="Customer",
    identifier="customer_id",
    idempotence_key="version",
    sns_attributes={"event": "customer-updated"},
)

# DELETE ---------------------------------------------------------------
graph.delete(
    delete_identifier="abc123",
    node="Customer",
    identifier="customer_id",
    sns_attributes={"event": "customer-deleted"},
)

# RELATIONSHIP HELPERS -------------------------------------------------
graph.create_relationship(
    query="""
        MATCH (c:Customer), (o:Order)
        WHERE c.customer_id = $customer AND o.order_id = $order
        CREATE (c)-[:PLACED]->(o)
        RETURN c, o
    """,
    placeholder={"customer": "abc123", "order": "o-789"},
    sns_attributes={"event": "relationship-created"},
)

graph.delete_relationship(
    query="""
        MATCH (c:Customer)-[r:PLACED]->(o:Order)
        WHERE c.customer_id = $customer AND o.order_id = $order
        DETACH DELETE r
    """,
    placeholder={"customer": "abc123", "order": "o-789"},
    sns_attributes={"event": "relationship-deleted"},
)

Each method mirrors the DynamoDB adapter API: provide per-call metadata, and the adapter handles schema normalization, optimistic locking, driver orchestration, and optional SNS fan-out.

Neo4j & Neptune Targets

graph = adapter(
    bolt={"url": "bolt://localhost:7687", "user": "neo4j", "password": "password"},
    neptune={"url": "bolt://neptune-endpoint:8182", "user": "user", "password": "secret"},
)

Provide both dictionaries to allow local Neo4j development with a production Neptune endpoint. When neptune is supplied it wins; otherwise bolt is used. Use the same adapter instance for different node types by passing the appropriate label to each call (e.g., graph.create(..., node="Order")).

Optimistic Updates

graph.update(
    data={"order_id": "abc123", "updated_at": 2, "status": "shipped"},
    query="MATCH (o:Order) WHERE o.order_id = $id RETURN o",
    placeholder={"id": "abc123"},
    original_idempotence_value=1,  # the previous value of updated_at
    node="Order",
    identifier="order_id",
    idempotence_key="updated_at",
    sns_attributes={"event": "status-change"},
)

If another session updates the node first, the adapter raises ValueError("ATOMIC ERROR...") rather than overwriting silently.

Relationship Helpers

graph.create(data={"customer_id": "abc123", "version": 1}, node="Customer")
graph.create(data={"order_id": "o-789", "version": 1}, node="Order")

graph.create_relationship(
    query="""
        MATCH (c:Customer), (o:Order)
        WHERE c.customer_id = $customer AND o.order_id = $order
        CREATE (c)-[:PLACED]->(o)
        RETURN c, o
    """,
    placeholder={"customer": "abc123", "order": "o-789"},
)

graph.delete_relationship(
    query="""
        MATCH (c:Customer)-[r:PLACED]->(o:Order)
        WHERE c.customer_id = $customer AND o.order_id = $order
        DETACH DELETE r
    """,
    placeholder={"customer": "abc123", "order": "o-789"},
)

Validation ensures relationship queries include edge notation and destructive operations actually delete nodes/relationships.

SNS Event Publishing

graph = adapter(
    bolt={...},
    sns_arn="arn:aws:sns:us-east-2:123456789012:customers",
    sns_attributes={"service": "crm"},
)

graph.delete(delete_identifier="abc123", node="Customer", identifier="customer_id")

Each CRUD helper automatically publishes an SNS message when sns_arn is set. Provide default metadata through sns_attributes at adapter construction (for example {"service": "crm"}) and add request-specific context per call: graph.create(..., sns_attributes={"source": "api"}). Per-call keys override adapter defaults, operation is injected automatically, and None values are stripped so events remain clean. Non-string values are sent using the appropriate SNS Number type.

🧪 Testing

We split fast unit tests from integration suites targeting Neo4j and Neptune-compatible endpoints.

# Unit tests (pure Python, heavy mocking)
pipenv run test

# Integration suites
pipenv run test_neo4j     # requires Neo4j Bolt endpoint (defaults to bolt://localhost:7687)
pipenv run test_neptune   # reuses Bolt settings, can point at Neptune or LocalStack

# Coverage (Neo4j suite under coverage)
pipenv run coverage

Environment variables to override defaults:

Variable Purpose Default
NEO4J_BOLT_URL Neo4j Bolt connection URI bolt://localhost:7687
NEO4J_USER / _PASSWORD Neo4j credentials neo4j / password
NEPTUNE_BOLT_URL Neptune Bolt-compatible endpoint falls back to Neo4j
NEPTUNE_USER / _PASSWORD Neptune credentials falls back to Neo4j

🧰 Tooling & CI

.circleci/config.yml mirrors the DynamoDB project:

  • install-build installs dependencies and persists the workspace.
  • lint and type-check run pipenv run lint and pipenv run mypy.
  • test-neo4j and test-neptune run pytest markers in parallel; the Neptune job provisions a LocalStack container for compatibility checks.
  • install-build-publish retains the token-based PyPI workflow.

🛠️ Local Development

Prerequisites

  • Python 3.9+
  • Pipenv
  • Docker (for running Neo4j or LocalStack locally)

Environment Setup

git clone https://github.com/paulcruse3/daplug-cypher.git
cd daplug-cypher
pipenv install --dev

Workflow

pipenv run lint          # pylint (JSON + HTML report)
pipenv run mypy          # static typing (post-phase polish)
pipenv run test          # unit tests
pipenv run test_neo4j    # integration suite (requires Bolt endpoint)
pipenv run test_neptune  # integration suite (LocalStack/Neptune)

When running Neo4j via Docker, set NEO4J_AUTH=neo4j/password before docker run so the tests can authenticate automatically.

🗂️ Project Structure

daplug-cypher/
├── daplug_cypher/
│   ├── adapter.py         # Cypher adapter implementation
│   ├── common/            # Shared schema, merge, logging, publisher helpers
│   ├── cypher/            # Parameter + serialization utilities
│   ├── types/             # Shared TypedDict/type aliases (reused by common)
│   └── __init__.py        # Public adapter factory & exports
├── tests/
│   ├── integration/       # Neo4j & Neptune pytest suites
│   └── unit/              # Mock-based unit coverage for every module
├── .circleci/config.yml   # CI pipeline
├── Pipfile                # Runtime & dev dependencies + scripts
├── setup.py / setup.cfg   # Packaging metadata & pytest config
└── README.md              # You are here

🤝 Contributing

We welcome issues and pull requests! Please ensure linting, typing, and both integration suites pass before submitting.

git checkout -b feature/amazing-cypher
# make your changes
pipenv run lint
pipenv run mypy
pipenv run test
pipenv run test_neo4j
pipenv run test_neptune
git commit -am "feat: amazing cypher enhancement"
git push origin feature/amazing-cypher

📄 License

Apache License 2.0 – see LICENSE for the full text.


Built to keep Cypher integrations as clean and predictable

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

daplug_cypher-1.0.0b8.tar.gz (20.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

daplug_cypher-1.0.0b8-py3-none-any.whl (17.9 kB view details)

Uploaded Python 3

File details

Details for the file daplug_cypher-1.0.0b8.tar.gz.

File metadata

  • Download URL: daplug_cypher-1.0.0b8.tar.gz
  • Upload date:
  • Size: 20.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.17

File hashes

Hashes for daplug_cypher-1.0.0b8.tar.gz
Algorithm Hash digest
SHA256 b3266c2d19e26b3f8946edd3f80b0f55063cb7950ae94f44251c8185ef7e149a
MD5 efb7bba5739e819961ab7179fd62ec7f
BLAKE2b-256 5da84d230329ffc8037eee7e33576132727ddc8efb6f10dac33623f17c97dea1

See more details on using hashes here.

File details

Details for the file daplug_cypher-1.0.0b8-py3-none-any.whl.

File metadata

File hashes

Hashes for daplug_cypher-1.0.0b8-py3-none-any.whl
Algorithm Hash digest
SHA256 27f572ca4f2b0a3547462caf28674f2bab8cc896ba056a9b261233728d9ab76d
MD5 8a4033c1349243449bea9d4c5ecf6996
BLAKE2b-256 4f8c7b784609df096c5354f121127af265d7d0629f2dd76bf49cf09ff073211a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page