Typed collections backed by NATS JetStream KV
Project description
Brainless DB
Typed collections backed by NATS JetStream KV. In-memory with background sync.
Quick Start
from typing import Annotated
from msgspec import Meta
from brainlessdb import BrainlessDB, BrainlessDBFeat, BrainlessBucket
class UserV1(BrainlessBucket):
id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]
name: Annotated[str, Meta()] = ""
db = BrainlessDB(nats, namespace="app")
users = db.collection(UserV1) # sync - just registers
await db.start() # loads all collections and starts watching
# Create
user = users.add(UserV1(id=1, name="Alice"))
# Update
user.name = "Bob"
user.save() # marks dirty, schedules flush
# Query (sync - operates on in-memory data)
user = users.find(id=1) # uses index
all_users = users.filter(lambda u: u.id > 0)
await db.stop()
When to Use What
| Class | Use for | Has UUID | Stored in |
|---|---|---|---|
BrainlessBucket |
Main entities (User, Order, etc.) | Yes | Own bucket(s) |
BrainlessStruct |
Nested data, app-local data | No | Inside parent entity |
from brainlessdb import BrainlessBucket, BrainlessStruct
# App-local data - BrainlessStruct (no UUID, not an entity)
class UcsLocal(BrainlessStruct):
sid: int = 0
pointer: int = 0
# Nested data - BrainlessStruct
class Address(BrainlessStruct):
street: str = ""
city: str = ""
# Main entity - BrainlessBucket (has UUID, stored in NATS)
class UserV1(BrainlessBucket):
id: Annotated[int, Meta()]
address: Optional[Address] = None # nested struct
_: Optional[UcsLocal] = None # app-local struct
Global API
import brainlessdb
brainlessdb.setup(nats, namespace="app") # sync
users = brainlessdb.collection(UserV1) # sync
await brainlessdb.start() # loads all registered collections
await brainlessdb.flush() # manual flush
await brainlessdb.stop()
Field Types
Config Fields (default)
Persistent data synced across all instances:
class UserV1(BrainlessBucket):
id: Annotated[int, Meta()]
name: Annotated[str, Meta()] = ""
State Fields
Ephemeral data (separate bucket, faster sync):
class UserV1(BrainlessBucket):
id: Annotated[int, Meta()]
status: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.STATE})] = 0
Indexed Fields
Fast O(1) lookups:
class UserV1(BrainlessBucket):
id: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX})]
Unique Fields
Enforces uniqueness constraint (also auto-indexed):
class UserV1(BrainlessBucket):
email: Annotated[Optional[str], Meta(extra={"brainlessdb_flags": BrainlessDBFeat.UNIQUE})] = None
Combine flags with |:
counter: Annotated[int, Meta(extra={"brainlessdb_flags": BrainlessDBFeat.INDEX | BrainlessDBFeat.STATE})] = 0
App-Local Fields
Data private to each namespace:
from brainlessdb import BrainlessBucket, BrainlessStruct
class UcsLocal(BrainlessStruct):
sid: int = 0
class AriLocal(BrainlessStruct):
channel_id: str = ""
# Entity with app-local field
class UserV1(BrainlessBucket):
id: Annotated[int, Meta()]
_: Union[UcsLocal, AriLocal, None] = None
Each namespace only sees its own local data:
# In UCS app (namespace="ucs")
user = UserV1(id=1, _=UcsLocal(sid=123))
users.add(user)
user._.sid # 123
# In ARI app (namespace="ari")
user = users.find(id=1)
user._ # None - no ARI local data yet
CRUD Operations
# Add/update (validates types on add)
item = coll.add(MyStruct(...))
# Update via save()
item.field = value
item.save()
# Get by UUID
item = coll.get(uuid_str)
# Delete
coll.delete(item)
coll.delete(uuid_str)
# Clear all
coll.clear()
# Dict-style access
item = coll[uuid_str]
del coll[item]
len(coll)
for item in coll: ...
item in coll
Filtering
All filter/find methods are sync (operate on in-memory data):
# By predicate
items = coll.filter(lambda i: i.priority > 5)
# By field (uses index if available)
items = coll.filter(status=1)
# Nested fields
items = coll.filter(address__city="Prague")
# Combined
items = coll.filter(lambda i: i.active, status=1, limit=10)
# Find single
item = coll.find(id=123)
# Sort
items = coll.order_by("priority", reverse=True)
Events
Callbacks fire on remote changes by default. Set trigger_local=True to also fire on local changes.
# Any change
coll.on_change(lambda old, new: print(f"{old} -> {new}"))
# Deletion
coll.on_delete(lambda item: print(f"deleted: {item}"))
# Specific property
coll.on_property_change(
status=lambda item, field, old, new: print(f"{field}: {old} -> {new}")
)
# Also trigger on local changes
coll.on_change(my_callback, trigger_local=True)
Watching
Watch starts automatically with start(). Manual control:
await db.unwatch() # stop watching all
await db.watch() # resume watching all
Flush Scheduling
- Changes schedule flush after
flush_interval(default 100ms) - Multiple changes batch into single flush
flush_interval=0flushes immediatelyawait db.flush()forces immediate flush
Multi-Bucket Architecture
Each struct uses up to 3 NATS KV buckets:
{StructName}- config fields (persistent){StructName}-State- state fields (ephemeral){StructName}-{LocalClass}- app-local fields (per namespace)
Example: UserV1 with UCS namespace creates:
UserV1(config)UserV1-State(if state fields exist)UserV1-UcsLocal(local data for UCS app)
Low-Level Bucket Access
Bucket exposes NATS KV metadata for consumers that need timestamps or revision info.
from brainlessdb.bucket import Bucket, WatchEntry
bucket = await Bucket.create(js, "UserV1")
# get_entry() — single key with metadata
entry = await bucket.get_entry("some-key")
entry.value # bytes
entry.revision # int
entry.created # datetime (NATS server timestamp)
# all_entries() — all keys with metadata
entries = await bucket.all_entries() # dict[str, WatchEntry]
for key, entry in entries.items():
print(key, entry.created)
# watch() — created is included in every event
async for entry in bucket.watch():
print(entry.key, entry.operation, entry.created)
# watch() with timeout — yields operation="TIMEOUT" on idle, watcher stays alive
async for entry in bucket.watch(timeout=3):
if entry.operation == "TIMEOUT":
continue
...
get() and all() remain unchanged (return raw bytes).
Dead Letter Queue
Entities that fail to deserialize (e.g. after a schema change) are automatically moved to the brainless_dlq NATS KV bucket and removed from the source bucket. This applies during both initial load and live watch updates.
DLQ key format: {SourceBucket}::{EntityUID}::{Timestamp}
Each entry contains source_bucket, key, error, timestamp, and the original value for recovery.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file brainlessdb-0.14.2.tar.gz.
File metadata
- Download URL: brainlessdb-0.14.2.tar.gz
- Upload date:
- Size: 60.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f11e5d028836b1aa000958054b84833839a94733fb6dde75290cc583c8810d85
|
|
| MD5 |
b3a2a1247304a3767e3e5a10cde68dcf
|
|
| BLAKE2b-256 |
9de3a160557379a84136cb4500d6c311655ef71e1d60c6c2598dc664278049b0
|
File details
Details for the file brainlessdb-0.14.2-py3-none-any.whl.
File metadata
- Download URL: brainlessdb-0.14.2-py3-none-any.whl
- Upload date:
- Size: 16.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
57acde0b92ecc694038acf4a20fc9e7c9f67a33560efa150ec31888dff5d86d3
|
|
| MD5 |
cc5862da003d5648e521b2489430113b
|
|
| BLAKE2b-256 |
1840e53f9ab53e20b53672820697c45be0071736a757e83dca85c965718cdfeb
|