Skip to main content

Python binding for the obj embedded document database.

Project description

obj — Python binding

Python bindings for obj, the embedded document database.

The wheel exposes a single extension module named obj. The Rust crate name is obj-py; the import name is obj.

import obj

with obj.Db("app.obj") as db:
    with db.transaction() as tx:
        doc_id = tx.insert("orders", b"<your payload bytes>")

    with db.read_transaction() as tx:
        payload = tx.get("orders", doc_id)
        for (id_, bytes_) in tx.iter_all("orders"):
            ...

Payload contract

obj-py ships two Python surfaces side by side:

  • Bytes API on WriteTxn / ReadTxn. Payloads cross the boundary as bytes / bytearray in and bytes out. The library does NOT serialise dicts, dataclasses, or JSON for you on this path — encode your payloads however you like (json, msgpack, postcard, pickle, ...) and pass the resulting bytes through. This mirrors the obj C ABI's contract.
  • Typed-document API on Db and WriteTxn (Phase 6.5 + issue #1). Wrap a @dataclass with @obj.document(collection="orders", version=1) and the ergonomic methods db.insert(order) / db.get(Order, id) / db.update(Order, id, fn) / db.all(Order) route through a schema-driven Dynamic codec that produces postcard bytes byte-identical to Rust's #[derive(Document)] writer for the same logical schema. db.update(...) is an atomic read-modify-write: the read and the write-back happen inside one write transaction (no lost-update window), and a raising fn rolls the change back.
from dataclasses import dataclass
import obj

@obj.document(collection="orders", version=1)
@dataclass
class Order:
    customer_id: int
    total: float
    status: str

with obj.Db("app.obj") as db:
    doc_id = db.insert(Order(customer_id=1, total=99.5, status="pending"))
    order = db.get(Order, doc_id)
    for (oid, o) in db.all(Order):
        ...

Typed docs inside an explicit transaction

WriteTxn overloads its CRUD methods by argument type, so typed documents compose with explicit transactions. Pass a @obj.document instance (or class) for the typed path, or a collection str plus bytes for the raw path. This lets you batch many typed writes into a single commit / single WAL fsync instead of one transaction per db.insert:

with obj.Db("app.obj") as db:
    with db.transaction() as tx:
        for i in range(1000):
            tx.insert(Order(customer_id=i, total=float(i), status="new"))
        # one commit + one fsync for the whole batch on __exit__

        # reads inside the txn see its own uncommitted writes:
        first = tx.get(Order, 1)
        tx.update(Order, 1, lambda o: setattr(o, "status", "shipped"))
        tx.upsert(Order, 2, Order(customer_id=2, total=2.0, status="done"))
        tx.delete(Order, 3)

        # the raw-bytes overload still works on the same handle:
        tx.insert("audit_log", b"<raw bytes>")

The typed WriteTxn methods reuse the exact encode/decode pipeline that Db uses, so on-disk bytes are identical regardless of which surface wrote them. Passing a value that is not a @obj.document to the typed path raises obj.InvalidArgumentError with a clear message.

For ad-hoc dict-shaped writes (no @document boilerplate), call the same CRUD methods with a collection str as the first argument (dict-native overload):

doc_id = db.insert("events", {"event": "click", "user_id": 42})
event = db.get("events", doc_id)

Per-document lazy migration mirrors Rust's Migrate trait via a history=[...] arg and a cls.migrate(doc, from_version) classmethod.

Secondary indexes

Declare indexes on an @obj.document dataclass with typing.Annotated field markers — obj.Index (standard), obj.Unique, obj.Each (multi-value, on a list[...] field) — plus a decorator indexes=[obj.Composite((...), name=...)] for composite indexes. This mirrors Rust's #[obj(index ...)] derive attributes. The typed write path builds and maintains the index B-trees on every insert/update/delete/upsert:

from dataclasses import dataclass
from typing import Annotated
import obj

@obj.document(
    collection="orders", version=1,
    indexes=[obj.Composite(("region", "status"), name="by_region_status")],
)
@dataclass
class Order:
    email:  Annotated[str, obj.Unique]        # unique index "email"
    region: Annotated[str, obj.Index]         # standard index "region"
    tags:   Annotated[list[str], obj.Each]    # multi-value index "tags"
    status: str = "new"

with obj.Db("app.obj") as db:
    db.insert(Order(email="a@b.com", region="us", tags=["vip", "new"]))

    # exact lookup on a unique index (native value or pre-encoded bytes):
    order = db.find_unique(Order, "email", "a@b.com")

    # half-open range scan over a standard/composite index, in key order:
    for oid, o in db.index_range(Order, "region", "us", "us"):
        ...

A duplicate Unique key raises obj.InvalidArgumentError and rolls the write back atomically (the primary record is not inserted). Updating an indexed field moves its entry; deleting removes it.

Note. obj's on-disk encoding is positional (schema-driven, no field names in the bytes), and the schema registry is process-global per (collection, version). Two Db handles in one process that declare the same collection name with a different shape will raise obj.InvalidArgumentError rather than risk a silent mis-encode — use distinct collection names (or bump version=) per shape.

Querying

db.query(...) returns a lazy, immutable builder. Pass an @obj.document class for typed results or a collection str for dict-native results:

# typed: returns Order instances
top = (db.query(Order)
         .filter(lambda o: o.status == "shipped")   # Python predicate, AND-combined
         .sort_by(lambda o: o.total)                 # order-preserving key
         .limit(10)
         .fetch())                                   # -> list[tuple[int, Order]]

# count() uses the engine's no-decode fast path when there is no filter:
open_count = db.query(Order).filter(lambda o: o.total > 100).count()

# scan a declared index slice, then compose:
us = db.query(Order).index_range("region", "us", "us").fetch()

Each builder call returns a fresh Query (the original is unchanged). The sort buffer is bounded (obj.MAX_SORT_BUFFER, overridable per query with .sort_buffer_limit(n)); an over-cap sort raises obj.InvalidArgumentError rather than allocating without limit.

Typed collection handles

Inside an explicit transaction, tx.collection(Cls) binds a class once and exposes typed CRUD scoped to that transaction:

with db.transaction() as tx:
    orders = tx.collection(Order)
    oid = orders.insert(Order(email="c@d.com", region="eu", tags=[]))
    assert orders.get(oid).region == "eu"
    orders.count_all()        # reflects this txn's own uncommitted writes

Multi-file attach

Open a second .obj file's collections read-only under a namespace, addressed as "namespace.collection":

with obj.Db("app.obj") as db:
    db.attach("archive.obj", "archive")
    # reads route to the attached file; declare a class with the
    # namespaced collection name (matching the archive's shape) to decode:
    archived = list(db.all("archive.orders"))    # or db.get / db.query
    db.detach("archive")

Attachments are read-only — a write to a namespaced collection raises obj.InvalidArgumentError. A namespaced read needs its schema registered under the namespaced name (declare a class with collection="archive.orders", or read raw bytes); an unregistered namespaced read fails loud with obj.InvalidArgumentError rather than returning garbage.

Async

obj.AsyncDb mirrors the blocking Db for asyncio callers. It is a thin wrapper that offloads each blocking call to a thread executor (the GIL is released around the engine work); there is no new runtime dependency.

import asyncio
import obj

async def main():
    adb = await obj.AsyncDb.open("app.obj")
    oid = await adb.insert(Order(email="e@f.com", region="us", tags=[]))
    order = await adb.get(Order, oid)
    async for oid, o in adb.all(Order):
        ...
    results = await adb.query(Order).filter(lambda o: o.region == "us").fetch()
    async with adb.transaction() as tx:        # commits on clean exit
        await tx.insert(Order(email="g@h.com", region="eu", tags=[]))
    await adb.close()

asyncio.run(main())

Each async transaction pins a single worker thread for its lifetime (the underlying txn handle is not Send), so async with adb.transaction() is safe to drive op-by-op.

Diagnostics

stat = db.stat()
for cs in stat.collections:
    print(cs.name, cs.doc_count, cs.file_size_bytes)
    for idx in cs.indexes:                 # secondary-index descriptors
        print(" ", idx.name, idx.kind, idx.key_paths, idx.status)

# low-level, type-erased dump of a collection's primary B-tree:
for rec in db.dump_raw("orders", max_records=1000):
    rec.id              # document id
    rec.header          # DocumentHeader (collection_id, type_version, ...)
    rec.payload         # raw postcard bytes

Checkpointing

Writes land in a write-ahead log (<db>.obj-wal) first; the main <db>.obj file stays sparse until a checkpoint folds the committed WAL pages into it and resets the WAL back to its 64-byte header. A checkpoint fires automatically once the WAL reaches ~1000 frames, but after a handful of writes the data lives entirely in the -wal file. Call db.checkpoint() to fold it on demand:

with obj.Db("app.obj") as db:
    for note in notes:
        db.insert(note)
    db.checkpoint()   # fold the WAL into app.obj, reset app.obj-wal

checkpoint() is a harmless no-op when there is nothing to fold, and is deferred (partial / no-op) if a concurrent reader has pinned a snapshot below the end of the WAL — the frames that reader still needs stay in place. Retry once the reader has finished. It raises obj.ObjError on a read-only handle or on an I/O failure.

Checkpoint on clean close

You usually do not need an explicit checkpoint(): a clean close() — including a with obj.Db(...) as db: block that exits without raising — folds the WAL into the main file for you, so the .obj file is self-contained after a normal shutdown.

with obj.Db("app.obj") as db:
    db.insert(note)
# block exited cleanly -> WAL folded into app.obj, app.obj-wal reset

The close-time checkpoint is best-effort and non-fatal: a failure (reader-pinned deferral, I/O error during shutdown, read-only handle) is swallowed and never turns a successful with block into a raised error — the committed data is already durable in the WAL, so a failed fold loses nothing.

If the block exits via an exception, the checkpoint is skipped and the exception is propagated unchanged — the close-time fold never masks your error.

Trade-off: every clean close ends in an fsync. If you open and close many short-lived handles on a hot path, that is one fsync per close; prefer a single long-lived handle (and an occasional explicit checkpoint()) when the per-close fsync is a bottleneck.

Local development loop

# One-time setup: a fresh venv + maturin + pytest.
python3 -m venv .venv
source .venv/bin/activate
pip install maturin pytest

# From the workspace root:
cd crates/obj-py
maturin develop          # builds the cdylib + installs it editable
                         # into the active venv.
pytest tests/ -v         # run the Python test suite.

maturin develop rebuilds the extension module on every invocation; the typical dev loop is "edit Rust → maturin developpytest".

For a release-style wheel:

maturin build --release          # writes target/wheels/obj-*.whl
pip install target/wheels/obj-*.whl

Exception hierarchy

All obj operations raise instances of obj.ObjError. The sub-exceptions narrow the diagnosis:

Exception When raised
obj.NotFoundError document / collection / index / namespace absent
obj.BusyError lock contention (pager mutex, writer lock, cross-process)
obj.CorruptionError on-disk format / checksum / B-tree invariant violation
obj.IntegrityError Db.integrity_check() found at least one failure
obj.InvalidArgumentError caller-side argument problem (encoding, range, type, schema)
obj.EncryptionError missing / wrong / mismatched encryption key
obj.FeatureUnsupportedError file uses a build-time feature this wheel was compiled without

ObjError itself is the catch-all base — subclasses Exception. Use except obj.ObjError if you don't care which sub-arm fired; use the narrow ones to recover.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

obj_db-1.1.0.tar.gz (738.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

obj_db-1.1.0-cp39-abi3-macosx_11_0_arm64.whl (718.2 kB view details)

Uploaded CPython 3.9+macOS 11.0+ ARM64

File details

Details for the file obj_db-1.1.0.tar.gz.

File metadata

  • Download URL: obj_db-1.1.0.tar.gz
  • Upload date:
  • Size: 738.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.13.3

File hashes

Hashes for obj_db-1.1.0.tar.gz
Algorithm Hash digest
SHA256 746a5797987c0ae8a59fc5bc75f69fbb9e71b6b4580516f8452c211f0af809a2
MD5 e54643c63f41ca475481e930b8a2b7b3
BLAKE2b-256 9f929f4a7e5cc37f638b3dfca8c728e21fe82fde7b9ed8e05e7962723ede5e7c

See more details on using hashes here.

File details

Details for the file obj_db-1.1.0-cp39-abi3-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for obj_db-1.1.0-cp39-abi3-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 72722942a131c85d01d135a1a07c8015a12f6d3d7ec62969b5ffb6f8144d1639
MD5 730e2cceafea04d4020f0728d2a71eb8
BLAKE2b-256 29071c3a7f4dfa876ecba10cc4111fe94d5436d007c082dc911d451e74c814e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page