Schema-first async persistence for NATS JetStream KV
Project description
Brainless DB
"Because without brain, you can’t have split-brain!"
Database build on top of NATS JetStream Key/Value Store. The main purpose is to maintain configuration for a multi-service, multi-node system where uptime is more important than strict consistency. Data is held in memory and automatically synchronized to NATS KV.
Quick Start
import brainlessdb
# Connect to NATS and setup
await brainlessdb.setup(nats, namespace="myapp", location="prague-1")
# Add entity - schema inferred from first add()
call = brainlessdb.call.add(channel_id="123", queue_id=5, state="waiting")
# Attribute access with auto dirty tracking
call.state = "ringing" # marked dirty, flushed in background
# Find and filter
call = await brainlessdb.call.find(channel_id="123")
waiting = await brainlessdb.call.filter(state="waiting")
# Delete
del brainlessdb.call[call]
# Cleanup
await brainlessdb.stop()
Type-Safe Usage
Use dataclasses for type hints and IDE support:
from dataclasses import dataclass
from typing import Optional
@dataclass
class Call:
channel_id: str
queue_id: int
state: str
uuid: Optional[str] = None # auto-populated from entity
# Set type for collection - all queries return typed instances
brainlessdb.call.typed(Call)
# Now find/filter/all/order_by return Call instances
call = await brainlessdb.call.find(channel_id="123")
print(call.channel_id) # IDE knows this is str
print(call.uuid) # UUID auto-populated
# Add from dataclass
call = brainlessdb.call.add(Call(channel_id="456", queue_id=1, state="new"))
# Delete works with typed instances (uses uuid attribute)
del brainlessdb.call[call]
if call in brainlessdb.call:
print("still exists")
One-off Type Conversion
Convert individual entities without setting collection type:
entity = await brainlessdb.call.find(channel_id="123")
call = entity.as_type(Call) # Convert to dataclass
uuid = entity.uuid # Still have access to entity
Collections
Collections are created on first access and persist to NATS KV buckets:
# These create/access collections automatically
brainlessdb.call.add(...) # creates 'myapp-call' bucket
brainlessdb.user.add(...) # creates 'myapp-user' bucket
brainlessdb.queue_item.add(...) # creates 'myapp-queue_item' bucket
Adding Entities
# From keyword arguments
call = brainlessdb.call.add(channel_id="123", state="waiting")
# From dictionary
call = brainlessdb.call.add({"channel_id": "456", "state": "ringing"})
# From dataclass instance
call = brainlessdb.call.add(Call(channel_id="789", queue_id=1, state="active"))
# Mixed - dataclass + overrides
call = brainlessdb.call.add(base_call, state="overridden")
Retrieving Entities
# By UUID
call = await brainlessdb.call.get("550e8400-e29b-41d4-a716-446655440000")
# Find first match (or None)
call = await brainlessdb.call.find(channel_id="123")
call = await brainlessdb.call.find(state="waiting", queue_id=5)
# Filter all matches
waiting = await brainlessdb.call.filter(state="waiting")
active_q5 = await brainlessdb.call.filter(queue_id=5, state="active")
# Get all
all_calls = await brainlessdb.call.all()
Nested Filtering
Use double underscore for nested field access (Django-style):
# Filter by nested fields
call = await brainlessdb.call.find(caller__city="Prague")
calls = await brainlessdb.call.filter(contact__address__zip="12345")
# Works with order_by too
calls = await brainlessdb.call.order_by("caller__name")
Sorting
# Ascending
calls = await brainlessdb.call.order_by("priority")
# Descending (minus prefix)
calls = await brainlessdb.call.order_by("-created_at")
# Multiple keys
calls = await brainlessdb.call.order_by("state", "-priority")
# With filter criteria
calls = await brainlessdb.call.order_by("-created_at", state="active")
Deleting Entities
# By UUID string
brainlessdb.call.delete("550e8400-e29b-41d4-a716-446655440000")
# By Entity instance
brainlessdb.call.delete(entity)
# By typed dataclass (uses uuid attribute)
brainlessdb.call.delete(call)
# Dict-style deletion
del brainlessdb.call[call]
del brainlessdb.call["550e8400-..."]
Collection Info
count = brainlessdb.call.count()
count = len(brainlessdb.call)
# Check existence (accepts uuid string, Entity, or typed object)
if "550e8400-..." in brainlessdb.call:
...
if call in brainlessdb.call:
...
# Iteration
for call in brainlessdb.call:
print(call.channel_id)
# Dict-style access by UUID
call = brainlessdb.call["550e8400-..."]
Entity Access
Entities wrap stored data with attribute access and dirty tracking:
# Attribute access
print(call.state)
call.state = "ringing" # auto-marks dirty
# Dict-style access
print(call["state"])
call["state"] = "active"
# Check field exists
if "queue_id" in call:
print(call.queue_id)
# Get data
data = call.to_dict() # {"uuid": "...", "channel_id": "123", ...}
data = call.data # same but without uuid
# Properties
call.uuid # entity UUID
call.dirty # True if modified since last flush
Persistence
Background Flush
Changes are flushed to NATS automatically in the background:
# Default: flush every 0.5 seconds
await brainlessdb.setup(nats, namespace="app")
# Custom interval
await brainlessdb.setup(nats, namespace="app", flush_interval=1.0)
Manual Flush
# Flush all collections
await brainlessdb.flush()
# Flush single collection
await brainlessdb.call.flush()
Graceful Shutdown
Always call stop() for final flush:
await brainlessdb.stop()
Instance-Based Usage
For tests or multiple databases:
from brainlessdb import Brainless
db = Brainless(nats, namespace="test", location="local")
await db.start()
call = db.call.add(channel_id="456", state="waiting")
call = await db.call.find(channel_id="456")
await db.stop()
In-Memory Mode
Works without NATS for testing:
db = Brainless(None, namespace="test")
await db.start()
# All operations work, just no persistence
call = db.call.add(channel_id="123", state="new")
call = await db.call.find(channel_id="123")
await db.stop()
Use Cases
Queue Management
@dataclass
class QueueItem:
priority: int
caller_id: str
queue_id: int
state: str
uuid: Optional[str] = None
brainlessdb.queue.typed(QueueItem)
# Add to queue
item = brainlessdb.queue.add(QueueItem(
priority=1,
caller_id="caller-123",
queue_id=5,
state="waiting"
))
# Get next item by priority
next_item = await brainlessdb.queue.order_by("priority", state="waiting")
if next_item:
next_item[0].state = "processing"
# Remove completed
del brainlessdb.queue[item]
Session Storage
@dataclass
class Session:
user_id: int
token: str
created_at: float
last_seen: float
uuid: Optional[str] = None
brainlessdb.session.typed(Session)
# Create session
session = brainlessdb.session.add(Session(
user_id=42,
token="abc123",
created_at=time.time(),
last_seen=time.time()
))
# Find by token
session = await brainlessdb.session.find(token="abc123")
# Update last seen
session.last_seen = time.time()
# Find stale sessions
stale = await brainlessdb.session.filter(last_seen__lt=time.time() - 3600)
Call State Tracking
@dataclass
class Call:
channel_id: str
state: str
caller: dict
queue_id: Optional[int] = None
agent_id: Optional[int] = None
uuid: Optional[str] = None
brainlessdb.call.typed(Call)
# New call
call = brainlessdb.call.add(Call(
channel_id="chan-123",
state="ringing",
caller={"number": "+1234567890", "city": "Prague"}
))
# Find by nested field
call = await brainlessdb.call.find(caller__city="Prague")
# State transitions
call.state = "answered"
call.agent_id = 5
# Get calls by agent
agent_calls = await brainlessdb.call.filter(agent_id=5, state="answered")
Multi-Location Sync
# Prague datacenter
await brainlessdb.setup(nats, namespace="app", location="prague-1")
# UUIDs include location for conflict resolution
call = brainlessdb.call.add(channel_id="123", state="new")
print(call.uuid) # "550e8400-e29b-41d4-a716-446655440000" (UUID1 with location-based node)
Architecture
- Schema inference: Schema locked after first
add()to collection - Lazy indexing: O(1) lookups, indexes built on first filter/find
- Background flush: Async persistence with configurable interval
- NATS KV storage: One bucket per collection (
{namespace}-{collection}) - UUID1 generation: Time-based UUIDs with location-derived node ID
API Reference
Global Functions
| Function | Description |
|---|---|
await setup(nats, namespace, location, flush_interval) |
Initialize global instance |
await stop() |
Stop and final flush |
await flush() |
Flush all collections |
brainlessdb.{name} |
Access collection by name |
Collection Methods
| Method | Description |
|---|---|
typed(cls) |
Set dataclass type for results |
add(data, **kwargs) |
Add entity (dict, dataclass, or kwargs) |
await get(uuid) |
Get by UUID |
await find(**criteria) |
Find first match |
await filter(**criteria) |
Find all matches |
await all() |
Get all entities |
await order_by(*keys, **criteria) |
Sorted results |
delete(entity) |
Delete by uuid/entity/typed object |
count() / len() |
Entity count |
await load() |
Load from NATS |
await flush() |
Flush to NATS |
clear() |
Clear in-memory (not NATS) |
Entity Properties & Methods
| Property/Method | Description |
|---|---|
uuid |
Entity UUID |
data |
Raw data dict (no uuid) |
dirty |
Modified since flush |
to_dict() |
Data with uuid |
as_type(cls) |
Convert to dataclass |
Status
🚧 Work in progress
- Collections with add/get/delete/filter/find
- Entity dirty tracking
- NATS KV bucket integration
- Background flush
- Lazy auto-indexing for O(1) lookups
- Sorted iteration with order_by
- Type-safe dataclass conversion
- UUID auto-population in typed results
- Schema storage in NATS
- Watch for changes from other nodes
- CRDTs for conflict resolution
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file brainlessdb-0.1.1.tar.gz.
File metadata
- Download URL: brainlessdb-0.1.1.tar.gz
- Upload date:
- Size: 20.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fae9559def62c3e3053821d082559f52548dbe1dfe0444f6df375b277c3bbde0
|
|
| MD5 |
131ed7b1af76d8a210c04d0b82a0b90d
|
|
| BLAKE2b-256 |
be722a8db1f64bff7de3a24c5dd16e2d2b86bbd1f657e8e5dedc43c6c6646532
|
File details
Details for the file brainlessdb-0.1.1-py3-none-any.whl.
File metadata
- Download URL: brainlessdb-0.1.1-py3-none-any.whl
- Upload date:
- Size: 19.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.21 {"installer":{"name":"uv","version":"0.9.21","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Arch Linux","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
38d4c1931389616834cecbcba9d94681285600dcd5fded2df9d711b43eebb378
|
|
| MD5 |
b06eb03c2fb6677e0934ec054836bf82
|
|
| BLAKE2b-256 |
4c2857696bd1b8235347e3b025a145e30df72e638d95dbf2d6a59ffec4a42709
|