Skip to main content

Schema-first async persistence for NATS JetStream KV

Project description

Brainless

"Because without brain, you can’t have split-brain!"

Database build on top of NATS JetStream Key/Value Store. The main purpose is to maintain configuration for a multi-service, multi-node system where uptime is more important than strict consistency. Data is held in memory and automatically synchronized to NATS KV.

Quick Start

import brainless

# Connect to NATS and setup
await brainless.setup(nats, namespace="myapp", location="prague-1")

# Add entity - schema inferred from first add()
call = brainless.call.add(channel_id="123", queue_id=5, state="waiting")

# Attribute access with auto dirty tracking
call.state = "ringing"  # marked dirty, flushed in background

# Find and filter
call = await brainless.call.find(channel_id="123")
waiting = await brainless.call.filter(state="waiting")

# Delete
del brainless.call[call]

# Cleanup
await brainless.stop()

Type-Safe Usage

Use dataclasses for type hints and IDE support:

from dataclasses import dataclass
from typing import Optional

@dataclass
class Call:
    channel_id: str
    queue_id: int
    state: str
    uuid: Optional[str] = None  # auto-populated from entity

# Set type for collection - all queries return typed instances
brainless.call.typed(Call)

# Now find/filter/all/order_by return Call instances
call = await brainless.call.find(channel_id="123")
print(call.channel_id)  # IDE knows this is str
print(call.uuid)        # UUID auto-populated

# Add from dataclass
call = brainless.call.add(Call(channel_id="456", queue_id=1, state="new"))

# Delete works with typed instances (uses uuid attribute)
del brainless.call[call]
if call in brainless.call:
    print("still exists")

One-off Type Conversion

Convert individual entities without setting collection type:

entity = await brainless.call.find(channel_id="123")
call = entity.as_type(Call)  # Convert to dataclass
uuid = entity.uuid           # Still have access to entity

Collections

Collections are created on first access and persist to NATS KV buckets:

# These create/access collections automatically
brainless.call.add(...)      # creates 'myapp-call' bucket
brainless.user.add(...)      # creates 'myapp-user' bucket
brainless.queue_item.add(...) # creates 'myapp-queue_item' bucket

Adding Entities

# From keyword arguments
call = brainless.call.add(channel_id="123", state="waiting")

# From dictionary
call = brainless.call.add({"channel_id": "456", "state": "ringing"})

# From dataclass instance
call = brainless.call.add(Call(channel_id="789", queue_id=1, state="active"))

# Mixed - dataclass + overrides
call = brainless.call.add(base_call, state="overridden")

Retrieving Entities

# By UUID
call = await brainless.call.get("550e8400-e29b-41d4-a716-446655440000")

# Find first match (or None)
call = await brainless.call.find(channel_id="123")
call = await brainless.call.find(state="waiting", queue_id=5)

# Filter all matches
waiting = await brainless.call.filter(state="waiting")
active_q5 = await brainless.call.filter(queue_id=5, state="active")

# Get all
all_calls = await brainless.call.all()

Nested Filtering

Use double underscore for nested field access (Django-style):

# Filter by nested fields
call = await brainless.call.find(caller__city="Prague")
calls = await brainless.call.filter(contact__address__zip="12345")

# Works with order_by too
calls = await brainless.call.order_by("caller__name")

Sorting

# Ascending
calls = await brainless.call.order_by("priority")

# Descending (minus prefix)
calls = await brainless.call.order_by("-created_at")

# Multiple keys
calls = await brainless.call.order_by("state", "-priority")

# With filter criteria
calls = await brainless.call.order_by("-created_at", state="active")

Deleting Entities

# By UUID string
brainless.call.delete("550e8400-e29b-41d4-a716-446655440000")

# By Entity instance
brainless.call.delete(entity)

# By typed dataclass (uses uuid attribute)
brainless.call.delete(call)

# Dict-style deletion
del brainless.call[call]
del brainless.call["550e8400-..."]

Collection Info

count = brainless.call.count()
count = len(brainless.call)

# Check existence (accepts uuid string, Entity, or typed object)
if "550e8400-..." in brainless.call:
    ...
if call in brainless.call:
    ...

# Iteration
for call in brainless.call:
    print(call.channel_id)

# Dict-style access by UUID
call = brainless.call["550e8400-..."]

Entity Access

Entities wrap stored data with attribute access and dirty tracking:

# Attribute access
print(call.state)
call.state = "ringing"  # auto-marks dirty

# Dict-style access
print(call["state"])
call["state"] = "active"

# Check field exists
if "queue_id" in call:
    print(call.queue_id)

# Get data
data = call.to_dict()  # {"uuid": "...", "channel_id": "123", ...}
data = call.data       # same but without uuid

# Properties
call.uuid   # entity UUID
call.dirty  # True if modified since last flush

Persistence

Background Flush

Changes are flushed to NATS automatically in the background:

# Default: flush every 0.5 seconds
await brainless.setup(nats, namespace="app")

# Custom interval
await brainless.setup(nats, namespace="app", flush_interval=1.0)

Manual Flush

# Flush all collections
await brainless.flush()

# Flush single collection
await brainless.call.flush()

Graceful Shutdown

Always call stop() for final flush:

await brainless.stop()

Instance-Based Usage

For tests or multiple databases:

from brainless import Brainless

db = Brainless(nats, namespace="test", location="local")
await db.start()

call = db.call.add(channel_id="456", state="waiting")
call = await db.call.find(channel_id="456")

await db.stop()

In-Memory Mode

Works without NATS for testing:

db = Brainless(None, namespace="test")
await db.start()

# All operations work, just no persistence
call = db.call.add(channel_id="123", state="new")
call = await db.call.find(channel_id="123")

await db.stop()

Use Cases

Queue Management

@dataclass
class QueueItem:
    priority: int
    caller_id: str
    queue_id: int
    state: str
    uuid: Optional[str] = None

brainless.queue.typed(QueueItem)

# Add to queue
item = brainless.queue.add(QueueItem(
    priority=1,
    caller_id="caller-123",
    queue_id=5,
    state="waiting"
))

# Get next item by priority
next_item = await brainless.queue.order_by("priority", state="waiting")
if next_item:
    next_item[0].state = "processing"

# Remove completed
del brainless.queue[item]

Session Storage

@dataclass
class Session:
    user_id: int
    token: str
    created_at: float
    last_seen: float
    uuid: Optional[str] = None

brainless.session.typed(Session)

# Create session
session = brainless.session.add(Session(
    user_id=42,
    token="abc123",
    created_at=time.time(),
    last_seen=time.time()
))

# Find by token
session = await brainless.session.find(token="abc123")

# Update last seen
session.last_seen = time.time()

# Find stale sessions
stale = await brainless.session.filter(last_seen__lt=time.time() - 3600)

Call State Tracking

@dataclass
class Call:
    channel_id: str
    state: str
    caller: dict
    queue_id: Optional[int] = None
    agent_id: Optional[int] = None
    uuid: Optional[str] = None

brainless.call.typed(Call)

# New call
call = brainless.call.add(Call(
    channel_id="chan-123",
    state="ringing",
    caller={"number": "+1234567890", "city": "Prague"}
))

# Find by nested field
call = await brainless.call.find(caller__city="Prague")

# State transitions
call.state = "answered"
call.agent_id = 5

# Get calls by agent
agent_calls = await brainless.call.filter(agent_id=5, state="answered")

Multi-Location Sync

# Prague datacenter
await brainless.setup(nats, namespace="app", location="prague-1")

# UUIDs include location for conflict resolution
call = brainless.call.add(channel_id="123", state="new")
print(call.uuid)  # "550e8400-e29b-41d4-a716-446655440000" (UUID1 with location-based node)

Architecture

  • Schema inference: Schema locked after first add() to collection
  • Lazy indexing: O(1) lookups, indexes built on first filter/find
  • Background flush: Async persistence with configurable interval
  • NATS KV storage: One bucket per collection ({namespace}-{collection})
  • UUID1 generation: Time-based UUIDs with location-derived node ID

API Reference

Global Functions

Function Description
await setup(nats, namespace, location, flush_interval) Initialize global instance
await stop() Stop and final flush
await flush() Flush all collections
brainless.{name} Access collection by name

Collection Methods

Method Description
typed(cls) Set dataclass type for results
add(data, **kwargs) Add entity (dict, dataclass, or kwargs)
await get(uuid) Get by UUID
await find(**criteria) Find first match
await filter(**criteria) Find all matches
await all() Get all entities
await order_by(*keys, **criteria) Sorted results
delete(entity) Delete by uuid/entity/typed object
count() / len() Entity count
await load() Load from NATS
await flush() Flush to NATS
clear() Clear in-memory (not NATS)

Entity Properties & Methods

Property/Method Description
uuid Entity UUID
data Raw data dict (no uuid)
dirty Modified since flush
to_dict() Data with uuid
as_type(cls) Convert to dataclass

Status

🚧 Work in progress

  • Collections with add/get/delete/filter/find
  • Entity dirty tracking
  • NATS KV bucket integration
  • Background flush
  • Lazy auto-indexing for O(1) lookups
  • Sorted iteration with order_by
  • Type-safe dataclass conversion
  • UUID auto-population in typed results
  • Schema storage in NATS
  • Watch for changes from other nodes
  • CRDTs for conflict resolution

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brainlessdb-0.1.0.tar.gz (20.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

brainlessdb-0.1.0-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file brainlessdb-0.1.0.tar.gz.

File metadata

  • Download URL: brainlessdb-0.1.0.tar.gz
  • Upload date:
  • Size: 20.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for brainlessdb-0.1.0.tar.gz
Algorithm Hash digest
SHA256 646b9b7094f1c28f4d9609f62aca8c4461530cee6b3cd76b80b35e52917d8816
MD5 45cd510082af89b4c8f51b453b5cc0bb
BLAKE2b-256 3b7ff049bfba64b82aeb1fffb5c21d159089a5d83bd184a4f28909faed0bdfa9

See more details on using hashes here.

File details

Details for the file brainlessdb-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: brainlessdb-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for brainlessdb-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 55c05ea6526258bc0220dc9b332873547cb602648b888e418a407480bb7394b0
MD5 78c53acb2fe21db75bc292934dfac24a
BLAKE2b-256 a832ef45eddf72d2f5138298b63c13a9267a1e01820029a6367172b6aa154f69

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page