Skip to main content

Typed collections backed by NATS JetStream KV

Project description

Brainless DB

Typed collections backed by NATS JetStream KV. In-memory with background sync.

Quick Start

from typing import Annotated
from msgspec import Meta
from brainlessdb import BrainlessDB, BrainlessDBFeat, BrainlessBucket

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]
    name: Annotated[str, Meta()] = ""

db = BrainlessDB(nats, namespace="app")
users = db.collection(UserV1)  # sync - just registers
await db.start()  # loads all collections and starts watching

# Create
user = users.add(UserV1(id=1, name="Alice"))

# Update
user.name = "Bob"
user.save()  # marks dirty, schedules flush

# Query (sync - operates on in-memory data)
user = users.find(id=1)  # uses index
all_users = users.filter(lambda u: u.id > 0)

await db.stop()

When to Use What

Class Use for Has UUID Stored in
BrainlessBucket Main entities (User, Order, etc.) Yes Own bucket(s)
BrainlessStruct Nested data, app-local data No Inside parent entity
from brainlessdb import BrainlessBucket, BrainlessStruct

# App-local data - BrainlessStruct (no UUID, not an entity)
class UcsLocal(BrainlessStruct):
    sid: int = 0
    pointer: int = 0

# Nested data - BrainlessStruct
class Address(BrainlessStruct):
    street: str = ""
    city: str = ""

# Main entity - BrainlessBucket (has UUID, stored in NATS)
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    address: Optional[Address] = None  # nested struct
    _: Optional[UcsLocal] = None       # app-local struct

Global API

import brainlessdb

brainlessdb.setup(nats, namespace="app")  # sync
users = brainlessdb.collection(UserV1)  # sync
await brainlessdb.start()  # loads all registered collections
await brainlessdb.flush()  # manual flush
await brainlessdb.stop()

Field Types

Config Fields (default)

Persistent data synced across all instances:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    name: Annotated[str, Meta()] = ""

State Fields

Ephemeral data (separate bucket, faster sync):

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    status: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.STATE})] = 0

Indexed Fields

Fast O(1) lookups:

class UserV1(BrainlessBucket):
    id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]

Unique Fields

Enforces uniqueness constraint (also auto-indexed):

class UserV1(BrainlessBucket):
    email: Annotated[Optional[str], Meta(extra={"brainlessdb_flags": BrainlessDBFeat.UNIQUE})] = None

Combine flags with |:

counter: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX | BrainlessDBFeat.STATE})] = 0

App-Local Fields

Data private to each namespace:

from brainlessdb import BrainlessBucket, BrainlessStruct

class UcsLocal(BrainlessStruct):
    sid: int = 0

class AriLocal(BrainlessStruct):
    channel_id: str = ""

# Entity with app-local field
class UserV1(BrainlessBucket):
    id: Annotated[int, Meta()]
    _: Union[UcsLocal, AriLocal, None] = None

Each namespace only sees its own local data:

# In UCS app (namespace="ucs")
user = UserV1(id=1, _=UcsLocal(sid=123))
users.add(user)
user._.sid  # 123

# In ARI app (namespace="ari")  
user = users.find(id=1)
user._  # None - no ARI local data yet

CRUD Operations

# Add/update (validates types on add)
item = coll.add(MyStruct(...))

# Update via save()
item.field = value
item.save()

# Get by UUID
item = coll.get(uuid_str)

# Delete
coll.delete(item)
coll.delete(uuid_str)

# Clear all
coll.clear()

# Dict-style access
item = coll[uuid_str]
del coll[item]
len(coll)
for item in coll: ...
item in coll

Filtering

All filter/find methods are sync (operate on in-memory data):

# By predicate
items = coll.filter(lambda i: i.priority > 5)

# By field (uses index if available)
items = coll.filter(status=1)

# Nested fields
items = coll.filter(address__city="Prague")

# Combined
items = coll.filter(lambda i: i.active, status=1, limit=10)

# Find single
item = coll.find(id=123)

# Sort
items = coll.order_by("priority", reverse=True)

Events

Callbacks fire on remote changes by default. Set trigger_local=True to also fire on local changes.

# Any change
coll.on_change(lambda old, new: print(f"{old} -> {new}"))

# Deletion
coll.on_delete(lambda item: print(f"deleted: {item}"))

# Specific property
coll.on_property_change(
    status=lambda item, field, old, new: print(f"{field}: {old} -> {new}")
)

# Also trigger on local changes
coll.on_change(my_callback, trigger_local=True)

Watching

Watch starts automatically with start(). Manual control:

await db.unwatch()  # stop watching all
await db.watch()    # resume watching all

Flush Scheduling

  • Changes schedule flush after flush_interval (default 100ms)
  • Multiple changes batch into single flush
  • flush_interval=0 flushes immediately
  • await db.flush() forces immediate flush

Multi-Bucket Architecture

Each struct uses up to 3 NATS KV buckets:

  • {StructName} - config fields (persistent)
  • {StructName}-State - state fields (ephemeral)
  • {StructName}-{LocalClass} - app-local fields (per namespace)

Example: UserV1 with UCS namespace creates:

  • UserV1 (config)
  • UserV1-State (if state fields exist)
  • UserV1-UcsLocal (local data for UCS app)

Low-Level Bucket Access

Bucket exposes NATS KV metadata for consumers that need timestamps or revision info.

from brainlessdb.bucket import Bucket, WatchEntry

bucket = await Bucket.create(js, "UserV1")

# get_entry() — single key with metadata
entry = await bucket.get_entry("some-key")
entry.value       # bytes
entry.revision    # int
entry.created     # datetime (NATS server timestamp)

# all_entries() — all keys with metadata
entries = await bucket.all_entries()  # dict[str, WatchEntry]
for key, entry in entries.items():
    print(key, entry.created)

# watch() — created is included in every event
async for entry in bucket.watch():
    print(entry.key, entry.operation, entry.created)

# watch() with timeout — yields operation="TIMEOUT" on idle, watcher stays alive
async for entry in bucket.watch(timeout=3):
    if entry.operation == "TIMEOUT":
        continue
    ...

get() and all() remain unchanged (return raw bytes).

Dead Letter Queue

Entities that fail to deserialize (e.g. after a schema change) are automatically moved to the brainless_dlq NATS KV bucket and removed from the source bucket. This applies during both initial load and live watch updates.

DLQ key format: {SourceBucket}::{EntityUID}::{Timestamp}

Each entry contains source_bucket, key, error, timestamp, and the original value for recovery.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

brainlessdb-0.14.1.tar.gz (60.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

brainlessdb-0.14.1-py3-none-any.whl (16.6 kB view details)

Uploaded Python 3

File details

Details for the file brainlessdb-0.14.1.tar.gz.

File metadata

  • Download URL: brainlessdb-0.14.1.tar.gz
  • Upload date:
  • Size: 60.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.3

File hashes

Hashes for brainlessdb-0.14.1.tar.gz
Algorithm Hash digest
SHA256 9601c26090d42be1d71cf880107d4ba8281e0091335ef6599be61fc230b68e6c
MD5 3198ff6c1f7a7038fb80ccaa39e24a0c
BLAKE2b-256 78f5094dd27648805381204dce68fbc62acbf30372d566a669355aed752d08a0

See more details on using hashes here.

File details

Details for the file brainlessdb-0.14.1-py3-none-any.whl.

File metadata

File hashes

Hashes for brainlessdb-0.14.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ab9a41f8b3999e01f8c8cd241e98ff85b115e16396fcea6eeb2655ddc5face46
MD5 4fe199b7e995f4fb16d608e9ea391540
BLAKE2b-256 3a8df850f521f1ae1f4076bc729bd86a60daa12c7e17b6a3737ec90e55cd3db0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page