Skip to main content

Python client for the State-Oriented Data Protocol

Project description

sodp

PyPI Python license

Python asyncio client for the State-Oriented Data Protocol (SODP) — a WebSocket-based protocol for continuous state synchronization.

Instead of polling or request/response, SODP streams every change as a minimal delta to all connected subscribers. One mutation to a 100-field object sends exactly the changed fields.

Protocol spec & server


Install

pip install sodp-client

Requires Python 3.11+ and a running asyncio event loop.


Quick start

import asyncio
from sodp import SodpClient

async def main():
    client = SodpClient("ws://localhost:7777")
    await client.ready

    # Subscribe
    def on_player(value, meta):
        print(f"player: {value}  version={meta.version}")

    unsub = client.watch("game.player", on_player)

    # Mutate
    await client.set("game.player", {"name": "Alice", "health": 100})
    await client.patch("game.player", {"health": 80})   # only health changes

    await asyncio.sleep(1)

    unsub()        # remove this callback
    client.close() # close the connection

asyncio.run(main())

Authentication

# Static token
client = SodpClient("wss://sodp.example.com", token="eyJhbG...")

# Dynamic token provider — called on every connect/reconnect
async def get_token() -> str:
    async with aiohttp.ClientSession() as s:
        return await (await s.get("/api/sodp-token")).text()

client = SodpClient("wss://sodp.example.com", token_provider=get_token)

# Sync provider is also accepted
client = SodpClient(url, token_provider=lambda: os.environ["SODP_TOKEN"])

API reference

SodpClient(url, *, ...)

client = SodpClient(
    url,                        # WebSocket URL, e.g. "ws://localhost:7777"
    token=None,                 # static JWT string
    token_provider=None,        # callable → str | Awaitable[str]; supersedes token
    reconnect=True,             # auto-reconnect on disconnect
    reconnect_delay=1.0,        # base reconnect delay in seconds (doubles per attempt)
    max_reconnect_delay=30.0,   # maximum reconnect delay in seconds
    on_connect=None,            # called each time the connection is established
    on_disconnect=None,         # called each time the connection drops
)

The client connects immediately in the background. Use await client.ready (or await client) to wait for the first successful authentication before sending commands.


await client.ready

Awaitable that resolves once the client is connected and authenticated. You can also await client directly:

await client.ready  # explicit
await client        # same thing

client.watch(key, callback) → unsub

Subscribe to a state key. callback(value, meta) fires on every update and immediately with the cached value if the key is already known.

  • value — current state (any JSON-compatible type), or None if the key has no value yet

  • meta.version — monotonically increasing version number (int)

  • meta.initializedFalse when the key has never been written to the server

  • meta.source — origin of this callback invocation:

    • "cache" — fired synchronously from watch() with an already-cached value
    • "init" — the server's STATE_INIT baseline (initial load or post-reconnect)
    • "delta" — an incremental mutation (DELTA frame)

    Use meta.source — not meta.initialized — to distinguish the initial baseline from subsequent changes. initialized only tells you whether the key has ever been written on the server.

callback may be a plain function or an async function.

Returns an unsubscribe callable. Multiple watch() calls for the same key share a single server subscription.

def on_player(value, meta):
    if not meta.initialized:
        return
    print(value["name"], value["health"])

unsub = client.watch("game.player", on_player)

# Async callback also works:
async def on_score(value, meta):
    await db.update_score(value)

unsub2 = client.watch("game.score", on_score)

client.state(key) → StateRef

Returns a key-scoped handle for cleaner per-key code:

player = client.state("game.player")

unsub = player.watch(lambda v, m: print(v))

await player.set({"name": "Alice", "health": 100, "position": {"x": 0, "y": 0}})
await player.patch({"health": 80})           # only health changes
await player.set_in("/position/x", 5)        # atomic nested field update
await player.delete()                        # remove the key entirely
await player.presence("/alice", {"line": 1}) # session-lifetime path

current = player.get()                       # cached snapshot
player.unwatch()                             # cancel subscription

await client.call(method, args) → data

Invoke a built-in server method:

Method Args Effect
"state.set" {"state": key, "value": v} Replace full value
"state.patch" {"state": key, "patch": {...}} Deep-merge partial update
"state.set_in" {"state": key, "path": "/a/b", "value": v} Set nested field by JSON Pointer
"state.delete" {"state": key} Remove key entirely
"state.presence" {"state": key, "path": "/p", "value": v} Session-lifetime path
await client.call("state.set", {"state": "game.score", "value": {"value": 0}})

Convenience methods

await client.set("game.score", {"value": 42})
await client.patch("game.player", {"health": 80})
await client.presence("collab.cursors", "/alice", {"name": "Alice", "line": 3})

client.unwatch(key)

Cancel the server subscription and clear all local state for a key.


client.get(key) → Any

Synchronously read the cached value without subscribing. Returns None if the key is not being watched or has no value.


client.is_watching(key) → bool

Returns True if this client has an active subscription for key.


client.close()

Gracefully close the connection and stop reconnecting.


Presence

Presence binds a nested path to the session lifetime. The server automatically removes it and notifies all watchers when the client disconnects for any reason — no ghost cursors or stale "online" flags:

# Bind cursor to this session — auto-removed if the process crashes or disconnects
await client.presence("collab.cursors", "/alice", {"name": "Alice", "line": 1})

# Or via StateRef:
cursors = client.state("collab.cursors")
await cursors.presence("/alice", {"name": "Alice", "line": 1})

Auto-reconnect & RESUME

The client reconnects with exponential backoff (1 s → 2 s → 4 s → … → 30 s). After reconnecting:

  • Keys with a known version send RESUME — the server replays missed deltas, then resumes live streaming
  • Keys never seen yet send WATCH — you receive the current snapshot

No data is lost during short disconnections as long as the server's delta log is not full (1 000 deltas per key).


StateRef API summary

ref = client.state("my.key")

ref.watch(callback)          # subscribe; returns unsub callable
ref.get()                    # cached value
ref.is_watching()            # True if subscribed
ref.unwatch()                # cancel subscription + clear local state
await ref.set(value)         # replace full value
await ref.patch(partial)     # deep-merge partial
await ref.set_in(path, val)  # set nested field by JSON Pointer
await ref.delete()           # remove key from server
await ref.presence(path, v)  # session-lifetime path binding

FastAPI example

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sodp import SodpClient

client: SodpClient

@asynccontextmanager
async def lifespan(app: FastAPI):
    global client
    client = SodpClient("ws://sodp-server:7777", token=os.environ["SODP_TOKEN"])
    await client.ready
    yield
    client.close()

app = FastAPI(lifespan=lifespan)

@app.post("/score/{value}")
async def set_score(value: int):
    await client.set("game.score", {"value": value})
    return {"ok": True}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sodp_client-0.2.1.tar.gz (16.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sodp_client-0.2.1-py3-none-any.whl (12.2 kB view details)

Uploaded Python 3

File details

Details for the file sodp_client-0.2.1.tar.gz.

File metadata

  • Download URL: sodp_client-0.2.1.tar.gz
  • Upload date:
  • Size: 16.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sodp_client-0.2.1.tar.gz
Algorithm Hash digest
SHA256 8fefaf27cac36d003d34e3cc285b1f6d96f080976159ddd0d9963aac3b8882ff
MD5 a58feb0a360718680958ac6c73f8608b
BLAKE2b-256 69c0f20472707788e3961c0dd9e30f2ea6fc1f15c058abdb9107e3d978ab94b5

See more details on using hashes here.

Provenance

The following attestation bundles were made for sodp_client-0.2.1.tar.gz:

Publisher: release-py.yml on orkestri/SODP

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file sodp_client-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: sodp_client-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 12.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for sodp_client-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 17e6bedb2d784a89bd393192fc8673a61ab1b7e8ca82e44e6e6d8c1fb3e141f1
MD5 67d4947edf0da73fc5eda2fd82b4cb32
BLAKE2b-256 78b07a032bc51900a2744f190eba6ad1817e8404cf3c212e8c7fdb8030ec9ff9

See more details on using hashes here.

Provenance

The following attestation bundles were made for sodp_client-0.2.1-py3-none-any.whl:

Publisher: release-py.yml on orkestri/SODP

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page