Skip to main content

Official Python SDK for Nexus graph database

Project description

Nexus Python SDK

PyPI License Python CI

Official Python SDK for Nexus graph database.

Installation

pip install nexus-sdk

Quick Start (RPC — default)

import asyncio
from nexus_sdk import NexusClient

async def main():
    # Defaults to nexus://127.0.0.1:15475 (binary RPC).
    async with NexusClient() as client:
        result = await client.execute_cypher("RETURN 1 AS one")
        print(f"{result.rows[0]}  (transport: {client.endpoint_description()})")

asyncio.run(main())

Transports

URL form Transport Default port Use case
nexus://host[:port] Binary RPC (MessagePack) 15475 Default. Lowest latency.
http://host[:port] HTTP/JSON (httpx) 15474 Browser proxies, firewalls.
https://host[:port] HTTPS/JSON 443 Public-internet HTTP with TLS.
resp3://host[:port] RESP3 (reserved) 15476 Not yet shipped — raises.

Precedence: URL scheme > NEXUS_SDK_TRANSPORT env var > transport kwarg > default (nexus).

# HTTP fallback
client = NexusClient(base_url="http://localhost:15474", api_key="nexus_sk_...")

# Transport hint on a bare URL
client = NexusClient(base_url="host:15474", transport="http")

# Env override (`NEXUS_SDK_TRANSPORT=http`) honoured automatically

Full cross-SDK spec: docs/specs/sdk-transport.md.

Usage

Basic Example

import asyncio
from nexus_sdk import NexusClient

async def main():
    async with NexusClient() as client:
        # Cypher (routes through the active transport)
        result = await client.execute_cypher("MATCH (n) RETURN n LIMIT 10")
        print(f"Found {len(result.rows)} rows")

        # Convenience helpers call into Cypher under the hood
        create_response = await client.create_node(
            labels=["Person"],
            properties={"name": "Alice"},
        )
        print(f"Created node with ID: {create_response.node_id}")

asyncio.run(main())

With Authentication

# Using API key
client = NexusClient(
    "http://localhost:15474",
    api_key="your-api-key"
)

# Or using username/password
client = NexusClient(
    "http://localhost:15474",
    username="user",
    password="pass"
)

Schema Management

# Create a label
response = await client.create_label("Person")

# List all labels
labels = await client.list_labels()
print(f"Labels: {labels.labels}")

# Create a relationship type
response = await client.create_rel_type("KNOWS")

# List all relationship types
types = await client.list_rel_types()
print(f"Types: {types.types}")

Query Builder

from nexus_sdk import QueryBuilder

# Build queries type-safely
query = (
    QueryBuilder()
    .match_("(n:Person)")
    .where_("n.age > $min_age")
    .return_("n.name, n.age")
    .order_by("n.age DESC")
    .limit(10)
    .param("min_age", 25)
    .build()
)

result = await client.execute_cypher(query.query, query.params)

Batch Operations

# Batch create nodes
nodes = [
    {"labels": ["Person"], "properties": {"name": f"Person{i}", "age": 20 + i}}
    for i in range(10)
]
batch_response = await client.batch_create_nodes(nodes)
print(f"Created {len(batch_response.node_ids)} nodes")

# Batch create relationships
relationships = [
    {
        "source_id": node_ids[i],
        "target_id": node_ids[i + 1],
        "rel_type": "KNOWS",
        "properties": {"since": 2020},
    }
    for i in range(len(node_ids) - 1)
]
rel_batch = await client.batch_create_relationships(relationships)

Performance Monitoring

# Get query statistics
stats = await client.get_query_statistics()
print(f"Total queries: {stats.statistics.total_queries}")
print(f"Average time: {stats.statistics.average_execution_time_ms}ms")

# Get slow queries
slow_queries = await client.get_slow_queries()
for query in slow_queries.queries:
    print(f"Slow query: {query.query} ({query.execution_time_ms}ms)")

# Get plan cache statistics
cache_stats = await client.get_plan_cache_statistics()
print(f"Hit rate: {cache_stats.hit_rate:.2%}")

# Clear plan cache
await client.clear_plan_cache()

Advanced Transactions

from nexus_sdk import Transaction

# Begin transaction with Transaction class
tx = await client.begin_transaction()
print(f"Transaction active: {tx.is_active()}")
print(f"Status: {tx.status()}")

# Execute queries within transaction
result = await tx.execute("CREATE (n:Person {name: $name}) RETURN n", {"name": "Alice"})

# Commit or rollback
await tx.commit()  # or tx.rollback()

Multi-Database Support

# List all databases
databases = await client.list_databases()
print(f"Available databases: {databases.databases}")
print(f"Default database: {databases.default_database}")

# Create a new database
create_result = await client.create_database("mydb")
print(f"Created: {create_result.name}")

# Switch to the new database
switch_result = await client.switch_database("mydb")
print(f"Switched to: mydb")

# Get current database
current_db = await client.get_current_database()
print(f"Current database: {current_db}")

# Create data in the current database
result = await client.execute_cypher(
    "CREATE (n:Product {name: $name}) RETURN n",
    {"name": "Laptop"}
)

# Get database information
db_info = await client.get_database("mydb")
print(f"Nodes: {db_info.node_count}, Relationships: {db_info.relationship_count}")

# Drop database (must not be current database)
await client.switch_database("neo4j")  # Switch away first
drop_result = await client.drop_database("mydb")

# Or connect directly to a specific database
async with NexusClient("http://localhost:15474", database="mydb") as client:
    # All operations will use 'mydb'
    result = await client.execute_cypher("MATCH (n) RETURN n LIMIT 10", None)

Using Context Manager

async with NexusClient("http://localhost:15474") as client:
    result = await client.execute_cypher("MATCH (n) RETURN n LIMIT 10", None)
    print(f"Found {len(result.rows)} rows")

High Availability with Replication

Nexus supports master-replica replication for high availability and read scaling. Use the master for all write operations and replicas for read operations.

import asyncio
from nexus_sdk import NexusClient

class NexusCluster:
    """Client for Nexus cluster with master-replica topology."""

    def __init__(self, master_url: str, replica_urls: list[str]):
        """
        Initialize cluster client.

        Args:
            master_url: URL of the master node (for writes)
            replica_urls: List of replica URLs (for reads)
        """
        self.master = NexusClient(master_url)
        self.replicas = [NexusClient(url) for url in replica_urls]
        self._replica_index = 0

    def _get_replica(self) -> NexusClient:
        """Round-robin replica selection."""
        if not self.replicas:
            return self.master
        replica = self.replicas[self._replica_index]
        self._replica_index = (self._replica_index + 1) % len(self.replicas)
        return replica

    async def write(self, query: str, params: dict = None):
        """Execute write query on master."""
        return await self.master.execute_cypher(query, params)

    async def read(self, query: str, params: dict = None):
        """Execute read query on replica (round-robin)."""
        return await self._get_replica().execute_cypher(query, params)

    async def close(self):
        """Close all connections."""
        await self.master.close()
        for replica in self.replicas:
            await replica.close()

async def main():
    # Connect to cluster
    # Master handles all writes, replicas handle reads
    cluster = NexusCluster(
        master_url="http://master:15474",
        replica_urls=[
            "http://replica1:15474",
            "http://replica2:15474",
        ]
    )

    # Write operations go to master
    await cluster.write(
        "CREATE (n:Person {name: $name, age: $age}) RETURN n",
        {"name": "Alice", "age": 30}
    )

    # Read operations are distributed across replicas
    result = await cluster.read(
        "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
        {"min_age": 25}
    )
    print(f"Found {len(result.rows)} persons")

    # High-volume reads are load-balanced
    for i in range(100):
        result = await cluster.read("MATCH (n) RETURN count(n) as total", None)

    await cluster.close()

asyncio.run(main())

Replication Architecture

┌─────────────────────────────────────────────────────────────┐
│                      Application                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │              NexusCluster Client                     │   │
│   │   write() ──────────┐     read() ───────────────┐   │   │
│   └─────────────────────┼───────────────────────────┼───┘   │
└─────────────────────────┼───────────────────────────┼───────┘
                          │                           │
                          ▼                           ▼
              ┌───────────────────┐     ┌─────────────────────┐
              │      MASTER       │     │      REPLICAS       │
              │   (writes only)   │────▶│   (reads only)      │
              │                   │ WAL │  ┌───────────────┐  │
              │ • CREATE          │────▶│  │   Replica 1   │  │
              │ • UPDATE          │     │  └───────────────┘  │
              │ • DELETE          │     │  ┌───────────────┐  │
              │ • MERGE           │────▶│  │   Replica 2   │  │
              └───────────────────┘     │  └───────────────┘  │
                                        └─────────────────────┘

Starting a Nexus Cluster

# Start master node
NEXUS_REPLICATION_ROLE=master \
NEXUS_REPLICATION_BIND_ADDR=0.0.0.0:15475 \
./nexus-server

# Start replica 1
NEXUS_REPLICATION_ROLE=replica \
NEXUS_REPLICATION_MASTER_ADDR=master:15475 \
./nexus-server

# Start replica 2
NEXUS_REPLICATION_ROLE=replica \
NEXUS_REPLICATION_MASTER_ADDR=master:15475 \
./nexus-server

Monitoring Replication Status

import httpx

async def check_replication_status(master_url: str):
    async with httpx.AsyncClient() as client:
        # Check master status
        response = await client.get(f"{master_url}/replication/status")
        status = response.json()
        print(f"Role: {status['role']}")
        print(f"Running: {status['running']}")
        print(f"Connected replicas: {status.get('replica_count', 0)}")

        # Get master stats
        response = await client.get(f"{master_url}/replication/master/stats")
        stats = response.json()
        print(f"Entries replicated: {stats['entries_replicated']}")
        print(f"Connected replicas: {stats['connected_replicas']}")

        # List replicas
        response = await client.get(f"{master_url}/replication/replicas")
        replicas = response.json()
        for replica in replicas['replicas']:
            print(f"  - {replica['id']}: lag={replica['lag']}, healthy={replica['healthy']}")

Features

  • ✅ Cypher query execution
  • ✅ Database statistics
  • ✅ Health check
  • ✅ Node CRUD operations (Create, Read, Update, Delete)
  • ✅ Relationship CRUD operations (Create, Update, Delete)
  • ✅ Schema management (Labels, Relationship Types)
  • ✅ Transaction support (BEGIN, COMMIT, ROLLBACK)
  • Batch operations (batch create nodes/relationships)
  • Performance monitoring (query statistics, slow queries, plan cache)
  • Query Builder (type-safe Cypher query construction)
  • Advanced Transaction (Transaction class with state management)
  • Multi-database support (create, list, switch, drop databases)
  • ✅ Proper error handling
  • ✅ Type-safe models with Pydantic
  • ✅ Async/await support

Dependencies

Install from PyPI:

pip install nexus-sdk

Or install from source:

pip install -r requirements.txt

Core Dependencies

  • httpx>=0.24.0 - Modern HTTP client
  • pydantic>=2.0.0 - Data validation

Development Dependencies

pip install -r requirements-dev.txt

Examples

See the examples/ directory for complete examples:

  • basic_usage.py - Basic operations with nodes, relationships, and schema
  • with_auth.py - Authentication examples
  • transactions.py - Advanced transaction management with Transaction class
  • query_builder.py - Query builder usage examples
  • batch_operations.py - Batch create operations
  • performance_monitoring.py - Performance monitoring examples
  • multi_database.py - Multi-database support examples

Run examples with:

python examples/basic_usage.py
python examples/with_auth.py
python examples/transactions.py
python examples/query_builder.py
python examples/batch_operations.py
python examples/performance_monitoring.py
python examples/multi_database.py

Testing

Run tests with:

# Install development dependencies
pip install -e ".[dev]"

# Run unit tests
pytest

# Run with coverage
pytest --cov=nexus_sdk --cov-report=html

License

Licensed under the Apache License, Version 2.0.

See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hivehub_nexus_sdk-1.14.0.tar.gz (36.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hivehub_nexus_sdk-1.14.0-py3-none-any.whl (39.2 kB view details)

Uploaded Python 3

File details

Details for the file hivehub_nexus_sdk-1.14.0.tar.gz.

File metadata

  • Download URL: hivehub_nexus_sdk-1.14.0.tar.gz
  • Upload date:
  • Size: 36.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for hivehub_nexus_sdk-1.14.0.tar.gz
Algorithm Hash digest
SHA256 809582b4ea8557d9ffd6a957cdd534da4b565165435dab1bbf2af3f90bd5fffa
MD5 030b8cf218b96046bb722582bbded8fd
BLAKE2b-256 f6153816efcbfc1bdc7c8d0827369d0c8b57b28b32796ebaf3843bc8ae27b3b2

See more details on using hashes here.

File details

Details for the file hivehub_nexus_sdk-1.14.0-py3-none-any.whl.

File metadata

File hashes

Hashes for hivehub_nexus_sdk-1.14.0-py3-none-any.whl
Algorithm Hash digest
SHA256 413c1ece67b538d1da6e660603b042e4e7d0bcecbe3422cf5d00e9907d692515
MD5 4ba3036d266cb5da2d1073226097a482
BLAKE2b-256 bc85482070e449e11aa4aefae4fd6670eac7cc4323d5c11f1e4415dc2434c25f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page