Integration SDK for the Syncropel protocol — async client, grammar enforcement, canonical references, fail-open transport
Project description
syncropel
Python SDK for the Syncropel protocol — emit content-addressed records, query threads, validate grammar, fail open on transport errors.
import asyncio
from syncropel import Client, Identity, Ref
async def main():
async with Client(
endpoint="http://localhost:9100",
identity=Identity.static("did:example:my-app"),
) as client:
result = await client.emit(
act="PUT",
kind="music.catalog.track",
body={"title": "Glow", "artists": ["Zonke"]},
refs=[Ref.music_track(isrc="USJI19810404")],
thread="music.library",
)
print(result.success, result.record_id)
asyncio.run(main())
One call validates the grammar, builds the record envelope, retries on transient errors, and resolves cleanly on transport failures so a flaky network never crashes your handler.
Features
- Async client —
emit,query,query_thread,intend,fulfill, plus reserved-kind helpers - Sync helper —
emit_sync()for scripts / CLIs without an event loop - Grammar enforcement —
body.kindvalidated before any network call - Canonical references — 11 community ref constructors (
@music.track,@code.file,@social.person, …) for cross-publisher correlation - Fail-open transport — every emit returns a result; transport errors never raise
- Identity-aware — every record signed with the configured DID
- Single runtime dependency —
httpxfor HTTP - In-memory
MockKernelatsyncropel.testing— write adapter tests without a server
Install
pip install syncropel
Python 3.10+.
Quickstart
Start a Syncropel server locally, then emit your first record:
# hello.py
import asyncio
from syncropel import Client, Identity, Ref
async def main():
async with Client(
endpoint="http://localhost:9100",
identity=Identity.static("did:example:me"),
) as c:
result = await c.emit(
act="PUT",
kind="music.catalog.track",
body={"title": "Glow"},
refs=[Ref.music_track(isrc="USJI19810404")],
thread="music.library",
)
print("emitted:", result.record_id)
records = await c.query_thread("music.library")
print(f"thread has {len(records)} record(s)")
asyncio.run(main())
python hello.py
See syncropel.com for installing the Syncropel server (spl) and a full hosted-vs-local guide.
Concepts in 60 seconds
- Record — the immutable, content-addressed unit. 8 fields; the SDK builds the envelope for you.
- Kind —
body.kindnames what the record is about, e.g.music.catalog.track. Follows a strict grammar:scope.category.entity[.version]. - Thread — a logical conversation / workflow. Records share a thread when they're part of the same activity.
- Actor — who emitted the record, expressed as a DID.
- Ref — a canonical pointer to a real-world entity (a song, a file, a person…) so records about the same thing correlate across apps.
API reference
Client
| Method | Purpose |
|---|---|
emit(act, kind, body, thread, refs=, parents=, data_type=, clock=) |
Primary emit. Validates kind, builds the envelope, retries on 5xx / network errors. Returns EmitResult. Never raises on transport failures. |
emit_sync(...) |
Synchronous variant for scripts / CLIs without an event loop. Uses a persistent httpx.Client across calls so TCP keep-alive amortises setup across bulk emits. |
intend(goal, thread=, ...) |
Open a thread with an INTEND record. Generates a random 64-hex thread id if none supplied; returns it on result.thread. |
fulfill(thread, summary, fulfills=, ...) |
Close a thread with a KNOW record. fulfills accepts a single record id or list. |
emit_correction(corrects, revised_fields, reason, thread, ...) |
Reserved-kind helper for core.correction — supersede earlier records with revised values. |
emit_erasure(erases, reason, thread, ...) |
Reserved-kind helper for core.erasure — mark records as erased (e.g. for compliance). |
emit_alias(old_kind, new_kind, reason, thread, ...) |
Reserved-kind helper for core.alias — declare that one kind supersedes another. |
emit_scope_transfer(scope, from_publisher, to_publisher, reason, thread, ...) |
Reserved-kind helper for core.scope_transfer. |
emit_scope_claim(scope, governance_policy, thread, ...) |
Reserved-kind helper for core.scope_claim — claim a scope with a governance policy. |
query_thread(thread, limit=100, since=None) |
All records in a thread. Fail-open (returns [] on transport error). |
query(kind=, actor=, thread=, since=, limit=100, where=None) |
Filtered records. At least one of kind/actor/thread is required. |
health() |
Server health probe. Fail-open (returns {} on failure). |
close() |
Release the underlying HTTP client. |
Constructor kwargs: endpoint, identity (required), timeout=30.0, max_retries=2, backoff_ms=250.0, on_emit=None, api_key=None, transport=None (custom httpx transport — useful for tests).
Identity
| Form | Status |
|---|---|
Identity.static(did) |
Available |
Identity.key(path_or_bytes) |
Planned — raises NotImplementedError if called today |
Identity.federated(...) |
Planned — raises NotImplementedError if called today |
Ref — canonical reference constructors
| Constructor | Canonical | ID scheme |
|---|---|---|
Ref.music_track(isrc= / spotify_id= / apple_id=) |
@music.track |
isrc:<ISRC> (preferred) |
Ref.code_file(repo= / git_url=, path=) |
@code.file |
github:<repo>:<path> or git:<url>:<path> |
Ref.ops_incident(pagerduty= / linear= / url=) |
@ops.incident |
pagerduty:<id> etc. |
Ref.cal_event(uid=) |
@cal.event |
icalendar:<uid> |
Ref.social_person(did= / email= / handle=+name=) |
@social.person |
DID pass-through, email:<x>, <platform>:<handle> |
Ref.media_photo(sha256= / url=) |
@media.photo |
sha256:<hex> (preferred) |
Ref.media_video(youtube= / vimeo= / sha256=) |
@media.video |
youtube:<id> etc. |
Ref.doc_text(doi= / url= / platform_id=) |
@doc.text |
doi:<id> (preferred) |
Ref.fin_transaction(stripe= / plaid= / iso20022=) |
@fin.transaction |
stripe:<id> etc. |
Ref.research_paper(doi= / arxiv= / s2=) |
@research.paper |
doi:<id> (preferred) |
Ref.core_thread(id=) |
@core.thread |
thread:<id> |
Each returns {"kind": "@...", "id": "..."}. Pass a list as refs= to emit(); the SDK merges them into body._refs. Two records anywhere in the network sharing the same canonical ref are joinable.
EmitResult
@dataclass
class EmitResult:
success: bool
record_id: str | None = None
clock: int | None = None
error: str | None = None
retried: int = 0
kind: str = ""
act: str = ""
thread: str = ""
SyncropelKindError
Raised synchronously from validate_kind() and every emit* method when body.kind violates the grammar. Subclasses ValueError.
Fail-open contract
emit() never raises on network errors, 4xx, 5xx, or timeouts. Every call returns an EmitResult and your code inspects .success:
result = await client.emit(act="PUT", kind="music.catalog.track", body={}, thread="t")
if not result.success:
# Transient failure — log and keep going.
log.warning("emit failed: %s (retried %d)", result.error, result.retried)
A flaky network can't bring down your handler. You decide whether to drop the failure, retry, or escalate.
Grammar errors are different. SyncropelKindError always raises — it indicates programmer error (an invalid body.kind), and no retry will fix it. Failing loud at development time is correct.
Observability hook
def on_emit(result: EmitResult) -> None:
my_metrics.increment("syncropel.emit", tags={"success": result.success})
client = Client(..., on_emit=on_emit)
Fires on every success and failure. Hook exceptions are swallowed with a warning — a broken metrics pipeline can't break your emit path.
Grammar reference
Every record's body.kind follows the scope.category.entity[.version] grammar. The SDK validates at emit time:
| Kind | Valid? |
|---|---|
music.catalog.track |
✓ — publisher scope, 3 segments |
music.catalog.track.v2 |
✓ — versioned |
@music.track |
✓ — community canonical (2-segment allowed for @<community> and core scopes) |
core.alias |
✓ — reserved core primitive |
music.track_imported |
✗ — 2-segment publisher scope forbidden |
Music.Catalog.Track |
✗ — uppercase forbidden |
music.catalog.track.v2.foo |
✗ — 5 segments |
When a canonical ref exists for your domain, use it:
refs=[Ref.music_track(isrc="USJI19810404")]
This makes your record correlatable with every other music record across publishers. Without refs, nothing breaks — you lose cross-app correlation.
Testing your adapter
The SDK ships a canonical mock at syncropel.testing — exercise your adapter end-to-end without running a server:
from syncropel import Client, Identity, Ref
from syncropel.testing import MockKernel
async def test_my_adapter():
server = MockKernel()
client = Client(
endpoint="http://mock",
identity=Identity.static("did:test:me"),
transport=server.transport(),
)
await my_adapter.import_library(client, tracks=fixtures)
tracks = server.records_by_kind("music.catalog.track")
assert len(tracks) == len(fixtures)
for t in tracks:
assert t["body"]["_refs"][0]["kind"] == "@music.track"
Record IDs produced by MockKernel use the same SHA-256-of-canonical-JSON rule as the real server, so result.record_id matches what you'd see in production. The mock also enforces DuplicateClock on (thread, actor, clock).
Failure injection for fail-open coverage: server.fail_next_post(n) and server.fail_next_get(n).
Versioning + stability
Independent semver. Patch releases ship freely; minor versions can change public behaviour with a CHANGELOG note.
| SDK version | Highlights |
|---|---|
| 0.1.x | Current. Async Client, grammar enforcement, canonical refs, reserved-kind helpers, MockKernel, fail-open transport, auto-clock fill. |
| 0.2.x (planned) | Subscribe / SSE, batch emit, signing-key identity. |
| 1.0.x (planned) | Federated identity, typed kind parameter. |
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file syncropel-0.2.0.tar.gz.
File metadata
- Download URL: syncropel-0.2.0.tar.gz
- Upload date:
- Size: 51.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d3971febf3cb29b2460c6075a94860ddb5d6386f18cd2b5405129f48eb0a913e
|
|
| MD5 |
b7358c35566c0add87eb0abe502dcf18
|
|
| BLAKE2b-256 |
d07d2693567ac041efe0400df1774e77e3546fb755a4e6df2fc1d1a3c23d0e5d
|
File details
Details for the file syncropel-0.2.0-py3-none-any.whl.
File metadata
- Download URL: syncropel-0.2.0-py3-none-any.whl
- Upload date:
- Size: 32.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3deeb66d18de95db61f67807c9e4b0fbdf98292d7943f68dfc2ce3cd3372dec
|
|
| MD5 |
a9aaf3231f7b76a2c1cfa57e054737fa
|
|
| BLAKE2b-256 |
dbd4a8cfbf7a80c327190e065897d98a404fc09d764f78062fbdfdddf02a1c0e
|