Skip to main content

Typed collections backed by NATS JetStream KV

Project description

Brainless DB

Typed collections backed by NATS JetStream KV. In-memory with background sync.

Quick Start

from typing import Annotated
from msgspec import Meta
from brainlessdb import BrainlessDB, BrainlessDBFeat, BrainlessBucket

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]
    name: Annotated[str, Meta()] = ""

db = BrainlessDB(nats, namespace="app")
users = db.collection(UserV1)  # sync - just registers
await db.start()  # loads all collections and starts watching

# Create
user = users.add(UserV1(id=1, name="Alice"))

# Update
user.name = "Bob"
user.save()  # marks dirty, schedules flush

# Query (sync - operates on in-memory data)
user = users.find(id=1)  # uses index
all_users = users.filter(lambda u: u.id > 0)

await db.stop()

When to Use What

Class Use for Has UUID Stored in
BrainlessBucket Main entities (User, Order, etc.) Yes Own bucket(s)
BrainlessStruct Nested data, app-local data No Inside parent entity
from brainlessdb import BrainlessBucket, BrainlessStruct

# App-local data - BrainlessStruct (no UUID, not an entity)
class UcsLocal(BrainlessStruct):
    sid: int = 0
    pointer: int = 0

# Nested data - BrainlessStruct
class Address(BrainlessStruct):
    street: str = ""
    city: str = ""

# Main entity - BrainlessBucket (has UUID, stored in NATS)
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    address: Optional[Address] = None  # nested struct
    _: Optional[UcsLocal] = None       # app-local struct

Global API

import brainlessdb

brainlessdb.setup(nats, namespace="app")  # sync
users = brainlessdb.collection(UserV1)  # sync
await brainlessdb.start()  # loads all registered collections
await brainlessdb.flush()  # manual flush
await brainlessdb.stop()

Field Types

Config Fields (default)

Persistent data synced across all instances:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    name: Annotated[str, Meta()] = ""

State Fields

Ephemeral data (separate bucket, faster sync):

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    status: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.STATE})] = 0

Indexed Fields

Fast O(1) lookups:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]

Unique Fields

Enforces uniqueness constraint (also auto-indexed):

class UserV1(BrainlessBucket):
    email: Annotated[Optional[str], Meta(extra={"brainlessdb_flags": BrainlessDBFeat.UNIQUE})] = None

Combine flags with |:

counter: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX | BrainlessDBFeat.STATE})] = 0

App-Local Fields

Data private to each namespace:

from brainlessdb import BrainlessBucket, BrainlessStruct

class UcsLocal(BrainlessStruct):
    sid: int = 0

class AriLocal(BrainlessStruct):
    channel_id: str = ""

# Entity with app-local field
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    _: Union[UcsLocal, AriLocal, None] = None

Each namespace only sees its own local data:

# In UCS app (namespace="ucs")
user = UserV1(id=1, _=UcsLocal(sid=123))
users.add(user)
user._.sid  # 123

# In ARI app (namespace="ari")  
user = users.find(id=1)
user._  # None - no ARI local data yet

CRUD Operations

# Add/update (validates types on add)
item = coll.add(MyStruct(...))

# Update via save()
item.field = value
item.save()

# Get by UUID
item = coll.get(uuid_str)

# Delete
coll.delete(item)
coll.delete(uuid_str)

# Clear all
coll.clear()

# Dict-style access
item = coll[uuid_str]
del coll[item]
len(coll)
for item in coll: ...
item in coll

Filtering

All filter/find methods are sync (operate on in-memory data):

# By predicate
items = coll.filter(lambda i: i.priority > 5)

# By field (uses index if available)
items = coll.filter(status=1)

# Nested fields
items = coll.filter(address__city="Prague")

# Combined
items = coll.filter(lambda i: i.active, status=1, limit=10)

# Find single
item = coll.find(id=123)

# Sort
items = coll.order_by("priority", reverse=True)

Events

Callbacks fire on remote changes by default. Set trigger_local=True to also fire on local changes.

# Any change
coll.on_change(lambda old, new: print(f"{old} -> {new}"))

# Deletion
coll.on_delete(lambda item: print(f"deleted: {item}"))

# Specific property
coll.on_property_change(
    status=lambda item, field, old, new: print(f"{field}: {old} -> {new}")
)

# Also trigger on local changes
coll.on_change(my_callback, trigger_local=True)

Watching

Watch starts automatically with start(). Manual control:

await db.unwatch()  # stop watching all
await db.watch()    # resume watching all

Flush Scheduling

  • Changes schedule flush after flush_interval (default 100ms)
  • Multiple changes batch into single flush
  • flush_interval=0 flushes immediately
  • await db.flush() forces immediate flush

Multi-Bucket Architecture

Each struct uses up to 3 NATS KV buckets:

  • {StructName} - config fields (persistent)
  • {StructName}-State - state fields (ephemeral)
  • {StructName}-{LocalClass} - app-local fields (per namespace)

Example: UserV1 with UCS namespace creates:

  • UserV1 (config)
  • UserV1-State (if state fields exist)
  • UserV1-UcsLocal (local data for UCS app)

Low-Level Bucket Access

Bucket exposes NATS KV metadata for consumers that need timestamps or revision info.

from brainlessdb.bucket import Bucket, WatchEntry

bucket = await Bucket.create(js, "UserV1")

# get_entry() — single key with metadata
entry = await bucket.get_entry("some-key")
entry.value       # bytes
entry.revision    # int
entry.created     # datetime (NATS server timestamp)

# all_entries() — all keys with metadata
entries = await bucket.all_entries()  # dict[str, WatchEntry]
for key, entry in entries.items():
    print(key, entry.created)

# watch() — created is included in every event
async for entry in bucket.watch():
    print(entry.key, entry.operation, entry.created)

# watch() with timeout — yields operation="TIMEOUT" on idle, watcher stays alive
async for entry in bucket.watch(timeout=3):
    if entry.operation == "TIMEOUT":
        continue
    ...

get() and all() remain unchanged (return raw bytes).

Dead Letter Queue

Entities that fail to deserialize (e.g. after a schema change) are automatically moved to the brainless_dlq NATS KV bucket and removed from the source bucket. This applies during both initial load and live watch updates.

DLQ key format: {SourceBucket}__{EntityUID}__{Timestamp} (double underscore — NATS KV keys disallow :)

Each entry contains source_bucket, key, error, timestamp, and the original value for recovery.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brainlessdb-0.14.5.tar.gz (62.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

brainlessdb-0.14.5-py3-none-any.whl (17.1 kB view details)

Uploaded Python 3

File details

Details for the file brainlessdb-0.14.5.tar.gz.

File metadata

  • Download URL: brainlessdb-0.14.5.tar.gz
  • Upload date:
  • Size: 62.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.3

File hashes

Hashes for brainlessdb-0.14.5.tar.gz
Algorithm Hash digest
SHA256 5f7dd25308104bfc1db55bc83735d9c11a364bc20c9e05bda5c3d21edce5ad40
MD5 6678090f01eac5961b8545d1380248c6
BLAKE2b-256 42c58ca31362d2f0be056b905deae5098f75155ff8aa8325f5886d9a43617ed8

See more details on using hashes here.

File details

Details for the file brainlessdb-0.14.5-py3-none-any.whl.

File metadata

File hashes

Hashes for brainlessdb-0.14.5-py3-none-any.whl
Algorithm Hash digest
SHA256 286210d8d675d44d9c016cea92dabe3c062c8cd6d7772ca44d5beb99a49a50ee
MD5 27892f32e80409fdbd7524377e6a8813
BLAKE2b-256 8e95dad9d6a38f6b2eeff450ad889989e3ee850565c8e4eaf25d387c3aa52fac

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page