Skip to main content

Typed collections backed by NATS JetStream KV

Project description

Brainless DB

Typed collections backed by NATS JetStream KV. In-memory with background sync.

Quick Start

from typing import Annotated
from msgspec import Meta
from brainlessdb import BrainlessDB, BrainlessDBFeat, BrainlessBucket

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]
    name: Annotated[str, Meta()] = ""

db = BrainlessDB(nats, namespace="app")
users = db.collection(UserV1)  # sync - just registers
await db.start()  # loads all collections and starts watching

# Create
user = users.add(UserV1(id=1, name="Alice"))

# Update
user.name = "Bob"
user.save()  # marks dirty, schedules flush

# Query (sync - operates on in-memory data)
user = users.find(id=1)  # uses index
all_users = users.filter(lambda u: u.id > 0)

await db.stop()

When to Use What

Class Use for Has UUID Stored in
BrainlessBucket Main entities (User, Order, etc.) Yes Own bucket(s)
BrainlessRegistry Singleton config (one per class) Derived from __key__ Shared brainless_registry bucket
BrainlessStruct Nested data, app-local data No Inside parent entity
from brainlessdb import BrainlessBucket, BrainlessStruct

# App-local data - BrainlessStruct (no UUID, not an entity)
class UcsLocal(BrainlessStruct):
    sid: int = 0
    pointer: int = 0

# Nested data - BrainlessStruct
class Address(BrainlessStruct):
    street: str = ""
    city: str = ""

# Main entity - BrainlessBucket (has UUID, stored in NATS)
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    address: Optional[Address] = None  # nested struct
    _: Optional[UcsLocal] = None       # app-local struct

Registry

A singleton config — one instance per class, not a collection. All registries share the brainless_registry bucket; each lives at its own uuid5(__key__) key.

from brainlessdb import BrainlessRegistry

class SipConfigV1(BrainlessRegistry):
    __key__ = "sip.config"        # readable id → uuid5 key
    host: str = "127.0.0.1"
    port: int = 5060

cfg = db.registry(SipConfigV1)    # the live singleton instance
cfg.host                          # "127.0.0.1"

cfg.host = "10.0.0.1"
cfg.save()                        # persist + sync

Remote changes update the instance in place, so a held reference stays current.

Global API

import brainlessdb

brainlessdb.setup(nats, namespace="app")  # sync
users = brainlessdb.collection(UserV1)  # sync
await brainlessdb.start()  # loads all registered collections
await brainlessdb.flush()  # manual flush
await brainlessdb.stop()

Field Types

Config Fields (default)

Persistent data synced across all instances:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    name: Annotated[str, Meta()] = ""

State Fields

Ephemeral data (separate bucket, faster sync):

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    status: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.STATE})] = 0

Indexed Fields

Fast O(1) lookups:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]

Unique Fields

Enforces uniqueness constraint (also auto-indexed):

class UserV1(BrainlessBucket):
    email: Annotated[Optional[str], Meta(extra={"brainlessdb_flags": BrainlessDBFeat.UNIQUE})] = None

Combine flags with |:

counter: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX | BrainlessDBFeat.STATE})] = 0

App-Local Fields

Data private to each namespace:

from brainlessdb import BrainlessBucket, BrainlessStruct

class UcsLocal(BrainlessStruct):
    sid: int = 0

class AriLocal(BrainlessStruct):
    channel_id: str = ""

# Entity with app-local field
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    _: Union[UcsLocal, AriLocal, None] = None

Each namespace only sees its own local data:

# In UCS app (namespace="ucs")
user = UserV1(id=1, _=UcsLocal(sid=123))
users.add(user)
user._.sid  # 123

# In ARI app (namespace="ari")  
user = users.find(id=1)
user._  # None - no ARI local data yet

CRUD Operations

# Add/update (validates types on add)
item = coll.add(MyStruct(...))

# Update via save()
item.field = value
item.save()

# Get by UUID
item = coll.get(uuid_str)

# Delete
coll.delete(item)
coll.delete(uuid_str)

# Clear all
coll.clear()

# Dict-style access
item = coll[uuid_str]
del coll[item]
len(coll)
for item in coll: ...
item in coll

Filtering

All filter/find methods are sync (operate on in-memory data):

# By predicate
items = coll.filter(lambda i: i.priority > 5)

# By field (uses index if available)
items = coll.filter(status=1)

# Nested fields
items = coll.filter(address__city="Prague")

# Combined
items = coll.filter(lambda i: i.active, status=1, limit=10)

# Find single
item = coll.find(id=123)

# Sort
items = coll.order_by("priority", reverse=True)

Events

Callbacks fire on remote changes by default. Set trigger_local=True to also fire on local changes.

# Any change
coll.on_change(lambda old, new: print(f"{old} -> {new}"))

# Deletion
coll.on_delete(lambda item: print(f"deleted: {item}"))

# Specific property
coll.on_property_change(
    status=lambda item, field, old, new: print(f"{field}: {old} -> {new}")
)

# Also trigger on local changes
coll.on_change(my_callback, trigger_local=True)

Watching

Watch starts automatically with start(). Manual control:

await db.unwatch()  # stop watching all
await db.watch()    # resume watching all

Flush Scheduling

  • Changes schedule flush after flush_interval (default 100ms)
  • Multiple changes batch into single flush
  • flush_interval=0 flushes immediately
  • await db.flush() forces immediate flush

Multi-Bucket Architecture

Each struct uses up to 3 NATS KV buckets:

  • {StructName} - config fields (persistent)
  • {StructName}-State - state fields (ephemeral)
  • {StructName}-{LocalClass} - app-local fields (per namespace)

Example: UserV1 with UCS namespace creates:

  • UserV1 (config)
  • UserV1-State (if state fields exist)
  • UserV1-UcsLocal (local data for UCS app)

Low-Level Bucket Access

Bucket exposes NATS KV metadata for consumers that need timestamps or revision info.

from brainlessdb.bucket import Bucket, WatchEntry

bucket = await Bucket.create(js, "UserV1")

# get_entry() — single key with metadata
entry = await bucket.get_entry("some-key")
entry.value       # bytes
entry.revision    # int
entry.created     # datetime (NATS server timestamp)

# all_entries() — all keys with metadata
entries = await bucket.all_entries()  # dict[str, WatchEntry]
for key, entry in entries.items():
    print(key, entry.created)

# watch() — created is included in every event
async for entry in bucket.watch():
    print(entry.key, entry.operation, entry.created)

# watch() with timeout — yields operation="TIMEOUT" on idle, watcher stays alive
async for entry in bucket.watch(timeout=3):
    if entry.operation == "TIMEOUT":
        continue
    ...

get() and all() remain unchanged (return raw bytes).

Dead Letter Queue

Entities that fail to deserialize (e.g. after a schema change) are automatically moved to the brainless_dlq NATS KV bucket and removed from the source bucket. This applies during both initial load and live watch updates.

DLQ key format: {SourceBucket}__{EntityUID}__{Timestamp} (double underscore — NATS KV keys disallow :)

Each entry contains source_bucket, key, error, timestamp, and the original value for recovery.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brainlessdb-0.16.0.tar.gz (65.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

brainlessdb-0.16.0-py3-none-any.whl (20.7 kB view details)

Uploaded Python 3

File details

Details for the file brainlessdb-0.16.0.tar.gz.

File metadata

  • Download URL: brainlessdb-0.16.0.tar.gz
  • Upload date:
  • Size: 65.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.3

File hashes

Hashes for brainlessdb-0.16.0.tar.gz
Algorithm Hash digest
SHA256 a439b88093276f02c1f0de0172f5058e433b0bf834e50e25c110a30692b7bfa3
MD5 dbe67929ce0af84c5afe2986b7a548af
BLAKE2b-256 45bcfe36867cf7e27270fd4b373fa04c541fd01c143e15a6f8b5d177049de494

See more details on using hashes here.

File details

Details for the file brainlessdb-0.16.0-py3-none-any.whl.

File metadata

File hashes

Hashes for brainlessdb-0.16.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cabed20acca934418047dc32dceb4e658d3b692b3a143ad73e3780a59c49cef2
MD5 66e627776e813ebf494d0753583aeba9
BLAKE2b-256 f7fb18eedd3387ee00683102a848b59ab096b98af70367120ddf954584090556

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page