Python client for the State-Oriented Data Protocol
Project description
sodp
Python asyncio client for the State-Oriented Data Protocol (SODP) — a WebSocket-based protocol for continuous state synchronization.
Instead of polling or request/response, SODP streams every change as a minimal delta to all connected subscribers. One mutation to a 100-field object sends exactly the changed fields.
Install
pip install sodp-client
Requires Python 3.11+ and a running asyncio event loop.
Quick start
import asyncio
from sodp import SodpClient
async def main():
client = SodpClient("ws://localhost:7777")
await client.ready
# Subscribe
def on_player(value, meta):
print(f"player: {value} version={meta.version}")
unsub = client.watch("game.player", on_player)
# Mutate
await client.set("game.player", {"name": "Alice", "health": 100})
await client.patch("game.player", {"health": 80}) # only health changes
await asyncio.sleep(1)
unsub() # remove this callback
client.close() # close the connection
asyncio.run(main())
Authentication
# Static token
client = SodpClient("wss://sodp.example.com", token="eyJhbG...")
# Dynamic token provider — called on every connect/reconnect
async def get_token() -> str:
async with aiohttp.ClientSession() as s:
return await (await s.get("/api/sodp-token")).text()
client = SodpClient("wss://sodp.example.com", token_provider=get_token)
# Sync provider is also accepted
client = SodpClient(url, token_provider=lambda: os.environ["SODP_TOKEN"])
API reference
SodpClient(url, *, ...)
client = SodpClient(
url, # WebSocket URL, e.g. "ws://localhost:7777"
token=None, # static JWT string
token_provider=None, # callable → str | Awaitable[str]; supersedes token
reconnect=True, # auto-reconnect on disconnect
reconnect_delay=1.0, # base reconnect delay in seconds (doubles per attempt)
max_reconnect_delay=30.0, # maximum reconnect delay in seconds
on_connect=None, # called each time the connection is established
on_disconnect=None, # called each time the connection drops
)
The client connects immediately in the background. Use await client.ready (or await client) to wait for the first successful authentication before sending commands.
await client.ready
Awaitable that resolves once the client is connected and authenticated. You can also await client directly:
await client.ready # explicit
await client # same thing
client.watch(key, callback) → unsub
Subscribe to a state key. callback(value, meta) fires on every update and immediately with the cached value if the key is already known.
-
value— current state (any JSON-compatible type), orNoneif the key has no value yet -
meta.version— monotonically increasing version number (int) -
meta.initialized—Falsewhen the key has never been written to the server -
meta.source— origin of this callback invocation:"cache"— fired synchronously fromwatch()with an already-cached value"init"— the server'sSTATE_INITbaseline (initial load or post-reconnect)"delta"— an incremental mutation (DELTAframe)
Use
meta.source— notmeta.initialized— to distinguish the initial baseline from subsequent changes.initializedonly tells you whether the key has ever been written on the server.
callback may be a plain function or an async function.
Returns an unsubscribe callable. Multiple watch() calls for the same key share a single server subscription.
def on_player(value, meta):
if not meta.initialized:
return
print(value["name"], value["health"])
unsub = client.watch("game.player", on_player)
# Async callback also works:
async def on_score(value, meta):
await db.update_score(value)
unsub2 = client.watch("game.score", on_score)
client.state(key) → StateRef
Returns a key-scoped handle for cleaner per-key code:
player = client.state("game.player")
unsub = player.watch(lambda v, m: print(v))
await player.set({"name": "Alice", "health": 100, "position": {"x": 0, "y": 0}})
await player.patch({"health": 80}) # only health changes
await player.set_in("/position/x", 5) # atomic nested field update
await player.delete() # remove the key entirely
await player.presence("/alice", {"line": 1}) # session-lifetime path
current = player.get() # cached snapshot
player.unwatch() # cancel subscription
await client.call(method, args) → data
Invoke a built-in server method:
| Method | Args | Effect |
|---|---|---|
"state.set" |
{"state": key, "value": v} |
Replace full value |
"state.patch" |
{"state": key, "patch": {...}} |
Deep-merge partial update |
"state.set_in" |
{"state": key, "path": "/a/b", "value": v} |
Set nested field by JSON Pointer |
"state.delete" |
{"state": key} |
Remove key entirely |
"state.presence" |
{"state": key, "path": "/p", "value": v} |
Session-lifetime path |
await client.call("state.set", {"state": "game.score", "value": {"value": 0}})
Convenience methods
await client.set("game.score", {"value": 42})
await client.patch("game.player", {"health": 80})
await client.presence("collab.cursors", "/alice", {"name": "Alice", "line": 3})
client.unwatch(key)
Cancel the server subscription and clear all local state for a key.
client.get(key) → Any
Synchronously read the cached value without subscribing. Returns None if the key is not being watched or has no value.
client.is_watching(key) → bool
Returns True if this client has an active subscription for key.
client.close()
Gracefully close the connection and stop reconnecting.
Presence
Presence binds a nested path to the session lifetime. The server automatically removes it and notifies all watchers when the client disconnects for any reason — no ghost cursors or stale "online" flags:
# Bind cursor to this session — auto-removed if the process crashes or disconnects
await client.presence("collab.cursors", "/alice", {"name": "Alice", "line": 1})
# Or via StateRef:
cursors = client.state("collab.cursors")
await cursors.presence("/alice", {"name": "Alice", "line": 1})
Auto-reconnect & RESUME
The client reconnects with exponential backoff (1 s → 2 s → 4 s → … → 30 s). After reconnecting:
- Keys with a known version send
RESUME— the server replays missed deltas, then resumes live streaming - Keys never seen yet send
WATCH— you receive the current snapshot
No data is lost during short disconnections as long as the server's delta log is not full (1 000 deltas per key).
StateRef API summary
ref = client.state("my.key")
ref.watch(callback) # subscribe; returns unsub callable
ref.get() # cached value
ref.is_watching() # True if subscribed
ref.unwatch() # cancel subscription + clear local state
await ref.set(value) # replace full value
await ref.patch(partial) # deep-merge partial
await ref.set_in(path, val) # set nested field by JSON Pointer
await ref.delete() # remove key from server
await ref.presence(path, v) # session-lifetime path binding
FastAPI example
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sodp import SodpClient
client: SodpClient
@asynccontextmanager
async def lifespan(app: FastAPI):
global client
client = SodpClient("ws://sodp-server:7777", token=os.environ["SODP_TOKEN"])
await client.ready
yield
client.close()
app = FastAPI(lifespan=lifespan)
@app.post("/score/{value}")
async def set_score(value: int):
await client.set("game.score", {"value": value})
return {"ok": True}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sodp_client-0.2.1.tar.gz.
File metadata
- Download URL: sodp_client-0.2.1.tar.gz
- Upload date:
- Size: 16.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8fefaf27cac36d003d34e3cc285b1f6d96f080976159ddd0d9963aac3b8882ff
|
|
| MD5 |
a58feb0a360718680958ac6c73f8608b
|
|
| BLAKE2b-256 |
69c0f20472707788e3961c0dd9e30f2ea6fc1f15c058abdb9107e3d978ab94b5
|
Provenance
The following attestation bundles were made for sodp_client-0.2.1.tar.gz:
Publisher:
release-py.yml on orkestri/SODP
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sodp_client-0.2.1.tar.gz -
Subject digest:
8fefaf27cac36d003d34e3cc285b1f6d96f080976159ddd0d9963aac3b8882ff - Sigstore transparency entry: 1491266435
- Sigstore integration time:
-
Permalink:
orkestri/SODP@67dc7ae789d30a82cda0a90728453ea79321e27f -
Branch / Tag:
refs/tags/py/v0.2.1 - Owner: https://github.com/orkestri
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-py.yml@67dc7ae789d30a82cda0a90728453ea79321e27f -
Trigger Event:
push
-
Statement type:
File details
Details for the file sodp_client-0.2.1-py3-none-any.whl.
File metadata
- Download URL: sodp_client-0.2.1-py3-none-any.whl
- Upload date:
- Size: 12.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
17e6bedb2d784a89bd393192fc8673a61ab1b7e8ca82e44e6e6d8c1fb3e141f1
|
|
| MD5 |
67d4947edf0da73fc5eda2fd82b4cb32
|
|
| BLAKE2b-256 |
78b07a032bc51900a2744f190eba6ad1817e8404cf3c212e8c7fdb8030ec9ff9
|
Provenance
The following attestation bundles were made for sodp_client-0.2.1-py3-none-any.whl:
Publisher:
release-py.yml on orkestri/SODP
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
sodp_client-0.2.1-py3-none-any.whl -
Subject digest:
17e6bedb2d784a89bd393192fc8673a61ab1b7e8ca82e44e6e6d8c1fb3e141f1 - Sigstore transparency entry: 1491266511
- Sigstore integration time:
-
Permalink:
orkestri/SODP@67dc7ae789d30a82cda0a90728453ea79321e27f -
Branch / Tag:
refs/tags/py/v0.2.1 - Owner: https://github.com/orkestri
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release-py.yml@67dc7ae789d30a82cda0a90728453ea79321e27f -
Trigger Event:
push
-
Statement type: