Skip to main content

Schema-first async persistence for NATS JetStream KV

Project description

Brainless DB

"Because without brain, you can’t have split-brain!"

Database build on top of NATS JetStream Key/Value Store. The main purpose is to maintain configuration for a multi-service, multi-node system where uptime is more important than strict consistency. Data is held in memory and automatically synchronized to NATS KV.

Quick Start

import brainlessdb

# Connect to NATS and setup
await brainlessdb.setup(nats, namespace="myapp", location="prague-1")

# Add entity - schema inferred from first add()
call = brainlessdb.call.add(channel_id="123", queue_id=5, state="waiting")

# Attribute access with auto dirty tracking
call.state = "ringing"  # marked dirty, flushed in background

# Find and filter
call = await brainlessdb.call.find(channel_id="123")
waiting = await brainlessdb.call.filter(state="waiting")

# Delete
del brainlessdb.call[call]

# Cleanup
await brainlessdb.stop()

Type-Safe Usage

Use dataclasses for type hints and IDE support:

from dataclasses import dataclass
from typing import Optional

@dataclass
class Call:
    channel_id: str
    queue_id: int
    state: str
    uuid: Optional[str] = None  # auto-populated from entity

# Set type for collection - all queries return typed instances
brainlessdb.call.typed(Call)

# Now find/filter/all/order_by return Call instances
call = await brainlessdb.call.find(channel_id="123")
print(call.channel_id)  # IDE knows this is str
print(call.uuid)        # UUID auto-populated

# Add from dataclass
call = brainlessdb.call.add(Call(channel_id="456", queue_id=1, state="new"))

# Delete works with typed instances (uses uuid attribute)
del brainlessdb.call[call]
if call in brainlessdb.call:
    print("still exists")

One-off Type Conversion

Convert individual entities without setting collection type:

entity = await brainlessdb.call.find(channel_id="123")
call = entity.as_type(Call)  # Convert to dataclass
uuid = entity.uuid           # Still have access to entity

Collections

Collections are created on first access and persist to NATS KV buckets:

# These create/access collections automatically
brainlessdb.call.add(...)      # creates 'myapp-call' bucket
brainlessdb.user.add(...)      # creates 'myapp-user' bucket
brainlessdb.queue_item.add(...) # creates 'myapp-queue_item' bucket

Adding Entities

# From keyword arguments
call = brainlessdb.call.add(channel_id="123", state="waiting")

# From dictionary
call = brainlessdb.call.add({"channel_id": "456", "state": "ringing"})

# From dataclass instance
call = brainlessdb.call.add(Call(channel_id="789", queue_id=1, state="active"))

# Mixed - dataclass + overrides
call = brainlessdb.call.add(base_call, state="overridden")

Retrieving Entities

# By UUID
call = await brainlessdb.call.get("550e8400-e29b-41d4-a716-446655440000")

# Find first match (or None)
call = await brainlessdb.call.find(channel_id="123")
call = await brainlessdb.call.find(state="waiting", queue_id=5)

# Filter all matches
waiting = await brainlessdb.call.filter(state="waiting")
active_q5 = await brainlessdb.call.filter(queue_id=5, state="active")

# Get all
all_calls = await brainlessdb.call.all()

Nested Filtering

Use double underscore for nested field access (Django-style):

# Filter by nested fields
call = await brainlessdb.call.find(caller__city="Prague")
calls = await brainlessdb.call.filter(contact__address__zip="12345")

# Works with order_by too
calls = await brainlessdb.call.order_by("caller__name")

Sorting

# Ascending
calls = await brainlessdb.call.order_by("priority")

# Descending (minus prefix)
calls = await brainlessdb.call.order_by("-created_at")

# Multiple keys
calls = await brainlessdb.call.order_by("state", "-priority")

# With filter criteria
calls = await brainlessdb.call.order_by("-created_at", state="active")

Deleting Entities

# By UUID string
brainlessdb.call.delete("550e8400-e29b-41d4-a716-446655440000")

# By Entity instance
brainlessdb.call.delete(entity)

# By typed dataclass (uses uuid attribute)
brainlessdb.call.delete(call)

# Dict-style deletion
del brainlessdb.call[call]
del brainlessdb.call["550e8400-..."]

Collection Info

count = brainlessdb.call.count()
count = len(brainlessdb.call)

# Check existence (accepts uuid string, Entity, or typed object)
if "550e8400-..." in brainlessdb.call:
    ...
if call in brainlessdb.call:
    ...

# Iteration
for call in brainlessdb.call:
    print(call.channel_id)

# Dict-style access by UUID
call = brainlessdb.call["550e8400-..."]

Entity Access

Entities wrap stored data with attribute access and dirty tracking:

# Attribute access
print(call.state)
call.state = "ringing"  # auto-marks dirty

# Dict-style access
print(call["state"])
call["state"] = "active"

# Check field exists
if "queue_id" in call:
    print(call.queue_id)

# Get data
data = call.to_dict()  # {"uuid": "...", "channel_id": "123", ...}
data = call.data       # same but without uuid

# Properties
call.uuid   # entity UUID
call.dirty  # True if modified since last flush

Persistence

Background Flush

Changes are flushed to NATS automatically in the background:

# Default: flush every 0.5 seconds
await brainlessdb.setup(nats, namespace="app")

# Custom interval
await brainlessdb.setup(nats, namespace="app", flush_interval=1.0)

Manual Flush

# Flush all collections
await brainlessdb.flush()

# Flush single collection
await brainlessdb.call.flush()

Graceful Shutdown

Always call stop() for final flush:

await brainlessdb.stop()

Instance-Based Usage

For tests or multiple databases:

from brainlessdb import Brainless

db = Brainless(nats, namespace="test", location="local")
await db.start()

call = db.call.add(channel_id="456", state="waiting")
call = await db.call.find(channel_id="456")

await db.stop()

In-Memory Mode

Works without NATS for testing:

db = Brainless(None, namespace="test")
await db.start()

# All operations work, just no persistence
call = db.call.add(channel_id="123", state="new")
call = await db.call.find(channel_id="123")

await db.stop()

Use Cases

Queue Management

@dataclass
class QueueItem:
    priority: int
    caller_id: str
    queue_id: int
    state: str
    uuid: Optional[str] = None

brainlessdb.queue.typed(QueueItem)

# Add to queue
item = brainlessdb.queue.add(QueueItem(
    priority=1,
    caller_id="caller-123",
    queue_id=5,
    state="waiting"
))

# Get next item by priority
next_item = await brainlessdb.queue.order_by("priority", state="waiting")
if next_item:
    next_item[0].state = "processing"

# Remove completed
del brainlessdb.queue[item]

Session Storage

@dataclass
class Session:
    user_id: int
    token: str
    created_at: float
    last_seen: float
    uuid: Optional[str] = None

brainlessdb.session.typed(Session)

# Create session
session = brainlessdb.session.add(Session(
    user_id=42,
    token="abc123",
    created_at=time.time(),
    last_seen=time.time()
))

# Find by token
session = await brainlessdb.session.find(token="abc123")

# Update last seen
session.last_seen = time.time()

# Find stale sessions
stale = await brainlessdb.session.filter(last_seen__lt=time.time() - 3600)

Call State Tracking

@dataclass
class Call:
    channel_id: str
    state: str
    caller: dict
    queue_id: Optional[int] = None
    agent_id: Optional[int] = None
    uuid: Optional[str] = None

brainlessdb.call.typed(Call)

# New call
call = brainlessdb.call.add(Call(
    channel_id="chan-123",
    state="ringing",
    caller={"number": "+1234567890", "city": "Prague"}
))

# Find by nested field
call = await brainlessdb.call.find(caller__city="Prague")

# State transitions
call.state = "answered"
call.agent_id = 5

# Get calls by agent
agent_calls = await brainlessdb.call.filter(agent_id=5, state="answered")

Multi-Location Sync

# Prague datacenter
await brainlessdb.setup(nats, namespace="app", location="prague-1")

# UUIDs include location for conflict resolution
call = brainlessdb.call.add(channel_id="123", state="new")
print(call.uuid)  # "550e8400-e29b-41d4-a716-446655440000" (UUID1 with location-based node)

Architecture

  • Schema inference: Schema locked after first add() to collection
  • Lazy indexing: O(1) lookups, indexes built on first filter/find
  • Background flush: Async persistence with configurable interval
  • NATS KV storage: One bucket per collection ({namespace}-{collection})
  • UUID1 generation: Time-based UUIDs with location-derived node ID

API Reference

Global Functions

Function Description
await setup(nats, namespace, location, flush_interval) Initialize global instance
await stop() Stop and final flush
await flush() Flush all collections
brainlessdb.{name} Access collection by name

Collection Methods

Method Description
typed(cls) Set dataclass type for results
add(data, **kwargs) Add entity (dict, dataclass, or kwargs)
await get(uuid) Get by UUID
await find(**criteria) Find first match
await filter(**criteria) Find all matches
await all() Get all entities
await order_by(*keys, **criteria) Sorted results
delete(entity) Delete by uuid/entity/typed object
count() / len() Entity count
await load() Load from NATS
await flush() Flush to NATS
clear() Clear in-memory (not NATS)

Entity Properties & Methods

Property/Method Description
uuid Entity UUID
data Raw data dict (no uuid)
dirty Modified since flush
to_dict() Data with uuid
as_type(cls) Convert to dataclass

Status

🚧 Work in progress

  • Collections with add/get/delete/filter/find
  • Entity dirty tracking
  • NATS KV bucket integration
  • Background flush
  • Lazy auto-indexing for O(1) lookups
  • Sorted iteration with order_by
  • Type-safe dataclass conversion
  • UUID auto-population in typed results
  • Schema storage in NATS
  • Watch for changes from other nodes
  • CRDTs for conflict resolution

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brainlessdb-0.1.1.tar.gz (20.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

brainlessdb-0.1.1-py3-none-any.whl (19.9 kB view details)

Uploaded Python 3

File details

Details for the file brainlessdb-0.1.1.tar.gz.

File metadata

  • Download URL: brainlessdb-0.1.1.tar.gz
  • Upload date:
  • Size: 20.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for brainlessdb-0.1.1.tar.gz
Algorithm Hash digest
SHA256 fae9559def62c3e3053821d082559f52548dbe1dfe0444f6df375b277c3bbde0
MD5 131ed7b1af76d8a210c04d0b82a0b90d
BLAKE2b-256 be722a8db1f64bff7de3a24c5dd16e2d2b86bbd1f657e8e5dedc43c6c6646532

See more details on using hashes here.

File details

Details for the file brainlessdb-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: brainlessdb-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 19.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for brainlessdb-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 38d4c1931389616834cecbcba9d94681285600dcd5fded2df9d711b43eebb378
MD5 b06eb03c2fb6677e0934ec054836bf82
BLAKE2b-256 4c2857696bd1b8235347e3b025a145e30df72e638d95dbf2d6a59ffec4a42709

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page