Skip to main content

Typed collections backed by NATS JetStream KV

Project description

Brainless DB

Typed collections backed by NATS JetStream KV. In-memory with background sync.

Quick Start

from typing import Annotated
from msgspec import Meta
from brainlessdb import BrainlessDB, BrainlessDBFeat, BrainlessBucket

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]
    name: Annotated[str, Meta()] = ""

db = BrainlessDB(nats, namespace="app")
users = db.collection(UserV1)  # sync - just registers
await db.start()  # loads all collections and starts watching

# Create
user = users.add(UserV1(id=1, name="Alice"))

# Update
user.name = "Bob"
user.save()  # marks dirty, schedules flush

# Query (sync - operates on in-memory data)
user = users.find(id=1)  # uses index
all_users = users.filter(lambda u: u.id > 0)

await db.stop()

When to Use What

Class Use for Has UUID Stored in
BrainlessBucket Main entities (User, Order, etc.) Yes Own bucket(s)
BrainlessStruct Nested data, app-local data No Inside parent entity
from brainlessdb import BrainlessBucket, BrainlessStruct

# App-local data - BrainlessStruct (no UUID, not an entity)
class UcsLocal(BrainlessStruct):
    sid: int = 0
    pointer: int = 0

# Nested data - BrainlessStruct
class Address(BrainlessStruct):
    street: str = ""
    city: str = ""

# Main entity - BrainlessBucket (has UUID, stored in NATS)
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    address: Optional[Address] = None  # nested struct
    _: Optional[UcsLocal] = None       # app-local struct

Global API

import brainlessdb

brainlessdb.setup(nats, namespace="app")  # sync
users = brainlessdb.collection(UserV1)  # sync
await brainlessdb.start()  # loads all registered collections
await brainlessdb.flush()  # manual flush
await brainlessdb.stop()

Field Types

Config Fields (default)

Persistent data synced across all instances:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    name: Annotated[str, Meta()] = ""

State Fields

Ephemeral data (separate bucket, faster sync):

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    status: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.STATE})] = 0

Indexed Fields

Fast O(1) lookups:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]

Unique Fields

Enforces uniqueness constraint (also auto-indexed):

class UserV1(BrainlessBucket):
    email: Annotated[Optional[str], Meta(extra={"brainlessdb_flags": BrainlessDBFeat.UNIQUE})] = None

Combine flags with |:

counter: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX | BrainlessDBFeat.STATE})] = 0

App-Local Fields

Data private to each namespace:

from brainlessdb import BrainlessBucket, BrainlessStruct

class UcsLocal(BrainlessStruct):
    sid: int = 0

class AriLocal(BrainlessStruct):
    channel_id: str = ""

# Entity with app-local field
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    _: Union[UcsLocal, AriLocal, None] = None

Each namespace only sees its own local data:

# In UCS app (namespace="ucs")
user = UserV1(id=1, _=UcsLocal(sid=123))
users.add(user)
user._.sid  # 123

# In ARI app (namespace="ari")  
user = users.find(id=1)
user._  # None - no ARI local data yet

CRUD Operations

# Add/update (validates types on add)
item = coll.add(MyStruct(...))

# Update via save()
item.field = value
item.save()

# Get by UUID
item = coll.get(uuid_str)

# Delete
coll.delete(item)
coll.delete(uuid_str)

# Clear all
coll.clear()

# Dict-style access
item = coll[uuid_str]
del coll[item]
len(coll)
for item in coll: ...
item in coll

Filtering

All filter/find methods are sync (operate on in-memory data):

# By predicate
items = coll.filter(lambda i: i.priority > 5)

# By field (uses index if available)
items = coll.filter(status=1)

# Nested fields
items = coll.filter(address__city="Prague")

# Combined
items = coll.filter(lambda i: i.active, status=1, limit=10)

# Find single
item = coll.find(id=123)

# Sort
items = coll.order_by("priority", reverse=True)

Events

Callbacks fire on remote changes by default. Set trigger_local=True to also fire on local changes.

# Any change
coll.on_change(lambda old, new: print(f"{old} -> {new}"))

# Deletion
coll.on_delete(lambda item: print(f"deleted: {item}"))

# Specific property
coll.on_property_change(
    status=lambda item, field, old, new: print(f"{field}: {old} -> {new}")
)

# Also trigger on local changes
coll.on_change(my_callback, trigger_local=True)

Watching

Watch starts automatically with start(). Manual control:

await db.unwatch()  # stop watching all
await db.watch()    # resume watching all

Flush Scheduling

  • Changes schedule flush after flush_interval (default 100ms)
  • Multiple changes batch into single flush
  • flush_interval=0 flushes immediately
  • await db.flush() forces immediate flush

Multi-Bucket Architecture

Each struct uses up to 3 NATS KV buckets:

  • {StructName} - config fields (persistent)
  • {StructName}-State - state fields (ephemeral)
  • {StructName}-{LocalClass} - app-local fields (per namespace)

Example: UserV1 with UCS namespace creates:

  • UserV1 (config)
  • UserV1-State (if state fields exist)
  • UserV1-UcsLocal (local data for UCS app)

Low-Level Bucket Access

Bucket exposes NATS KV metadata for consumers that need timestamps or revision info.

from brainlessdb.bucket import Bucket, WatchEntry

bucket = await Bucket.create(js, "UserV1")

# get_entry() — single key with metadata
entry = await bucket.get_entry("some-key")
entry.value       # bytes
entry.revision    # int
entry.created     # datetime (NATS server timestamp)

# all_entries() — all keys with metadata
entries = await bucket.all_entries()  # dict[str, WatchEntry]
for key, entry in entries.items():
    print(key, entry.created)

# watch() — created is included in every event
async for entry in bucket.watch():
    print(entry.key, entry.operation, entry.created)

# watch() with timeout — yields operation="TIMEOUT" on idle, watcher stays alive
async for entry in bucket.watch(timeout=3):
    if entry.operation == "TIMEOUT":
        continue
    ...

get() and all() remain unchanged (return raw bytes).

Dead Letter Queue

Entities that fail to deserialize (e.g. after a schema change) are automatically moved to the brainless_dlq NATS KV bucket and removed from the source bucket. This applies during both initial load and live watch updates.

DLQ key format: {SourceBucket}::{EntityUID}::{Timestamp}

Each entry contains source_bucket, key, error, timestamp, and the original value for recovery.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brainlessdb-0.14.2.tar.gz (60.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

brainlessdb-0.14.2-py3-none-any.whl (16.6 kB view details)

Uploaded Python 3

File details

Details for the file brainlessdb-0.14.2.tar.gz.

File metadata

  • Download URL: brainlessdb-0.14.2.tar.gz
  • Upload date:
  • Size: 60.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.3

File hashes

Hashes for brainlessdb-0.14.2.tar.gz
Algorithm Hash digest
SHA256 f11e5d028836b1aa000958054b84833839a94733fb6dde75290cc583c8810d85
MD5 b3a2a1247304a3767e3e5a10cde68dcf
BLAKE2b-256 9de3a160557379a84136cb4500d6c311655ef71e1d60c6c2598dc664278049b0

See more details on using hashes here.

File details

Details for the file brainlessdb-0.14.2-py3-none-any.whl.

File metadata

File hashes

Hashes for brainlessdb-0.14.2-py3-none-any.whl
Algorithm Hash digest
SHA256 57acde0b92ecc694038acf4a20fc9e7c9f67a33560efa150ec31888dff5d86d3
MD5 cc5862da003d5648e521b2489430113b
BLAKE2b-256 1840e53f9ab53e20b53672820697c45be0071736a757e83dca85c965718cdfeb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page