Skip to main content

Python SDK for MuninnDB — the cognitive memory database

Project description

MuninnDB Python SDK

An async-first Python client for MuninnDB, a cognitive memory database with semantic search, graph traversal, and real-time subscriptions.

Features

  • Async/await throughout — Built on httpx for concurrent, non-blocking operations
  • Semantic memory activation — Query memories by meaning, not keywords
  • Graph associations — Link engrams and traverse relationships
  • Real-time subscriptions — Server-Sent Events (SSE) with auto-reconnect
  • Automatic retry logic — Exponential backoff with jitter for transient failures
  • Type-safe — Full type hints for IDE support and runtime validation
  • Connection pooling — Configurable keepalive and concurrent connections

Installation

Requirements

  • Python 3.11+

Setup

pip install -r requirements.txt

Quick Start

import asyncio
from muninn import MuninnClient

async def main():
    async with MuninnClient("http://127.0.0.1:8476") as client:
        # Write a memory
        engram_id = await client.write(
            vault="default",
            concept="neural plasticity",
            content="The brain's ability to reorganize neural connections.",
            tags=["neuroscience", "learning"]
        )
        print(f"Created: {engram_id}")

        # Activate memory (semantic search)
        results = await client.activate(
            vault="default",
            context=["how does learning work?"],
            max_results=10
        )

        for item in results.activations:
            print(f"[{item.score:.2f}] {item.concept}")

asyncio.run(main())

API Reference

Core Methods

write(vault, concept, content, tags=None, confidence=0.9, stability=0.5) → str

Write an engram (memory) to the database.

engram_id = await client.write(
    vault="default",
    concept="example",
    content="Long-form content",
    tags=["tag1", "tag2"],
    confidence=0.95,
    stability=0.8
)

Returns: ULID string ID of created engram


activate(vault, context, max_results=10, threshold=0.1, brief_mode="auto") → ActivateResponse

Activate memory using semantic search and optional graph traversal.

result = await client.activate(
    vault="default",
    context=["query", "terms"],
    max_results=10,
    threshold=0.1,
    brief_mode="extractive"  # "auto", "extractive", "abstractive"
)

for item in result.activations:
    print(f"Score: {item.score}, Concept: {item.concept}")

for sentence in result.brief or []:
    print(f"Brief: {sentence.text}")

Returns: ActivateResponse with:

  • query_id — Query identifier
  • total_found — Total matching engrams
  • activations — List of ActivationItem (id, concept, content, score, confidence, why, hop_path, dormant)
  • latency_ms — Query latency
  • brief — Optional extractive/abstractive summary

read(id, vault="default") → ReadResponse

Read a specific engram by ID.

engram = await client.read("01JM2345...", vault="default")
print(engram.concept, engram.confidence)

Returns: ReadResponse with full engram details


forget(id, vault="default", hard=False) → bool

Delete an engram (soft or hard).

# Soft delete (recoverable)
await client.forget(engram_id, vault="default")

# Hard delete (permanent)
await client.forget(engram_id, vault="default", hard=True)

Returns: True on success


link(source_id, target_id, vault="default", rel_type=5, weight=1.0) → bool

Create an association between two engrams.

await client.link(
    source_id="01JM...",
    target_id="01JM...",
    vault="default",
    rel_type=5,
    weight=0.9
)

Returns: True on success


stats() → StatResponse

Get database statistics and coherence metrics.

stats = await client.stats()
print(f"Engrams: {stats.engram_count}")
print(f"Storage: {stats.storage_bytes} bytes")

if stats.coherence:
    for vault_name, coherence in stats.coherence.items():
        print(f"Vault {vault_name} coherence: {coherence.score:.2f}")

Returns: StatResponse with engram_count, vault_count, storage_bytes, and coherence dict


subscribe(vault="default", push_on_write=True, threshold=0.0) → SSEStream

Subscribe to real-time vault events via Server-Sent Events.

stream = client.subscribe(vault="default", push_on_write=True)
async for push in stream:
    print(f"New engram: {push.engram_id}")
    if condition:
        await stream.close()

Returns: Async iterable yielding Push events with:

  • subscription_id — Subscription ID
  • trigger — Event type ("new_write", etc.)
  • push_number — Push sequence number
  • engram_id — ID of written engram
  • at — Unix timestamp

health() → bool

Check if MuninnDB server is reachable and healthy.

if await client.health():
    print("Server OK")

Returns: True if server responds with 200 OK


Configuration

Create a client with custom settings:

client = MuninnClient(
    base_url="http://127.0.0.1:8476",      # Server address
    token="your-bearer-token",              # Optional auth token
    timeout=5.0,                            # Request timeout (seconds)
    max_retries=3,                          # Max retry attempts
    retry_backoff=0.5,                      # Initial backoff multiplier
    max_connections=20,                     # Max concurrent connections
    keepalive_connections=10                # Max keepalive pool size
)

Error Handling

The SDK raises semantic error types:

from muninn import (
    MuninnError,                 # Base error
    MuninnAuthError,             # 401 Unauthorized
    MuninnNotFound,              # 404 Not Found
    MuninnConflict,              # 409 Conflict
    MuninnServerError,           # 5xx Server errors
    MuninnConnectionError,       # Network errors
    MuninnTimeoutError           # Request timeout
)

async with MuninnClient() as client:
    try:
        result = await client.activate(vault="default", context=["query"])
    except MuninnAuthError:
        print("Invalid token")
    except MuninnNotFound:
        print("Vault not found")
    except MuninnConnectionError:
        print("Network error - will retry automatically")
    except MuninnError as e:
        print(f"Error: {e.status_code} - {e}")

Examples

Example 1: Write and Activate

python examples/write_activate.py

Writes multiple neuroscience engrams and activates them with a query.

Example 2: Real-Time Subscriptions

python examples/subscribe.py

Subscribes to a vault and writes an engram, demonstrating SSE push events.

Example 3: Cognitive Loop

python examples/cognitive_loop.py

Full workflow: write → activate → link → inspect coherence.

Retry Logic

The client automatically retries transient failures:

  • Retried errors: 502, 503, 504, network errors, timeouts
  • Not retried: 4xx client errors
  • Backoff: Exponential with jitter: retry_backoff * (2^attempt) + random(0, 0.1)
  • Max attempts: Configured via max_retries (default: 3)

Example with custom retry settings:

async with MuninnClient(
    base_url="http://127.0.0.1:8476",
    max_retries=5,
    retry_backoff=1.0
) as client:
    # This will retry up to 5 times with longer delays
    result = await client.activate(vault="default", context=["query"])

Type System

Full type hints enable IDE autocomplete:

from muninn import (
    MuninnClient,
    ActivateResponse,
    ActivationItem,
    ReadResponse,
    StatResponse,
    Push
)

async with MuninnClient() as client:
    result: ActivateResponse = await client.activate(vault="default", context=[])
    for item: ActivationItem in result.activations:
        print(item.score, item.concept)

Performance Tips

  1. Reuse client: Create one MuninnClient and reuse it for multiple operations
  2. Batch operations: Use concurrent tasks for parallel writes/activations
  3. Connection pooling: Adjust max_connections and keepalive_connections based on workload
  4. Timeout tuning: Increase timeout for large activation queries
import asyncio

async with MuninnClient() as client:
    # Parallel writes
    tasks = [
        client.write(vault="default", concept=f"item-{i}", content="...")
        for i in range(100)
    ]
    ids = await asyncio.gather(*tasks)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muninn_python-0.4.4a0.tar.gz (5.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muninn_python-0.4.4a0-py3-none-any.whl (4.4 kB view details)

Uploaded Python 3

File details

Details for the file muninn_python-0.4.4a0.tar.gz.

File metadata

  • Download URL: muninn_python-0.4.4a0.tar.gz
  • Upload date:
  • Size: 5.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for muninn_python-0.4.4a0.tar.gz
Algorithm Hash digest
SHA256 30fdfae951eec39079b4dc9a7038bdcd4cd253f22be4b79c9d39a76327ae94eb
MD5 337dc1649476a76e5b5835f1323aea1b
BLAKE2b-256 ee030d54970f97ab1602e530a244c7dc71832808ccc5604760dd649533219e2c

See more details on using hashes here.

Provenance

The following attestation bundles were made for muninn_python-0.4.4a0.tar.gz:

Publisher: publish-sdk.yml on scrypster/muninndb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muninn_python-0.4.4a0-py3-none-any.whl.

File metadata

File hashes

Hashes for muninn_python-0.4.4a0-py3-none-any.whl
Algorithm Hash digest
SHA256 e27e429f7dfe8b335445016713cecb8b4c6ab3c34b1076278f6b19da85c966a5
MD5 b4ebc8b2c5701fc8f9e0ca8fd6c64d8a
BLAKE2b-256 8ee9b2f76ce0b3d89bdf53e274f3f3d020541007fffac32a81459215c8a81057

See more details on using hashes here.

Provenance

The following attestation bundles were made for muninn_python-0.4.4a0-py3-none-any.whl:

Publisher: publish-sdk.yml on scrypster/muninndb

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page