Skip to main content

Integration SDK for the Syncropel protocol — async client, grammar enforcement, canonical references, fail-open transport

Project description

syncropel

Python SDK for the Syncropel protocol — emit content-addressed records, query threads, validate grammar, fail open on transport errors.

import asyncio
from syncropel import Client, Identity, Ref

async def main():
    async with Client(
        endpoint="http://localhost:9100",
        identity=Identity.static("did:example:my-app"),
    ) as client:
        result = await client.emit(
            act="PUT",
            kind="music.catalog.track",
            body={"title": "Glow", "artists": ["Zonke"]},
            refs=[Ref.music_track(isrc="USJI19810404")],
            thread="music.library",
        )
        print(result.success, result.record_id)

asyncio.run(main())

One call validates the grammar, builds the record envelope, retries on transient errors, and resolves cleanly on transport failures so a flaky network never crashes your handler.


Features

  • Async clientemit, query, query_thread, intend, fulfill, plus reserved-kind helpers
  • Sync helperemit_sync() for scripts / CLIs without an event loop
  • Grammar enforcementbody.kind validated before any network call
  • Canonical references — 11 community ref constructors (@music.track, @code.file, @social.person, …) for cross-publisher correlation
  • Fail-open transport — every emit returns a result; transport errors never raise
  • Identity-aware — every record signed with the configured DID
  • Single runtime dependencyhttpx for HTTP
  • In-memory MockKernel at syncropel.testing — write adapter tests without a server

Install

pip install syncropel

Python 3.10+.


Quickstart

Start a Syncropel server locally, then emit your first record:

# hello.py
import asyncio
from syncropel import Client, Identity, Ref

async def main():
    async with Client(
        endpoint="http://localhost:9100",
        identity=Identity.static("did:example:me"),
    ) as c:
        result = await c.emit(
            act="PUT",
            kind="music.catalog.track",
            body={"title": "Glow"},
            refs=[Ref.music_track(isrc="USJI19810404")],
            thread="music.library",
        )
        print("emitted:", result.record_id)

        records = await c.query_thread("music.library")
        print(f"thread has {len(records)} record(s)")

asyncio.run(main())
python hello.py

See syncropel.com for installing the Syncropel server (spl) and a full hosted-vs-local guide.


Concepts in 60 seconds

  • Record — the immutable, content-addressed unit. 8 fields; the SDK builds the envelope for you.
  • Kindbody.kind names what the record is about, e.g. music.catalog.track. Follows a strict grammar: scope.category.entity[.version].
  • Thread — a logical conversation / workflow. Records share a thread when they're part of the same activity.
  • Actor — who emitted the record, expressed as a DID.
  • Ref — a canonical pointer to a real-world entity (a song, a file, a person…) so records about the same thing correlate across apps.

API reference

Client

Method Purpose
emit(act, kind, body, thread, refs=, parents=, data_type=, clock=) Primary emit. Validates kind, builds the envelope, retries on 5xx / network errors. Returns EmitResult. Never raises on transport failures.
emit_sync(...) Synchronous variant for scripts / CLIs without an event loop. Uses a persistent httpx.Client across calls so TCP keep-alive amortises setup across bulk emits.
intend(goal, thread=, ...) Open a thread with an INTEND record. Generates a random 64-hex thread id if none supplied; returns it on result.thread.
fulfill(thread, summary, fulfills=, ...) Close a thread with a KNOW record. fulfills accepts a single record id or list.
emit_correction(corrects, revised_fields, reason, thread, ...) Reserved-kind helper for core.correction — supersede earlier records with revised values.
emit_erasure(erases, reason, thread, ...) Reserved-kind helper for core.erasure — mark records as erased (e.g. for compliance).
emit_alias(old_kind, new_kind, reason, thread, ...) Reserved-kind helper for core.alias — declare that one kind supersedes another.
emit_scope_transfer(scope, from_publisher, to_publisher, reason, thread, ...) Reserved-kind helper for core.scope_transfer.
emit_scope_claim(scope, governance_policy, thread, ...) Reserved-kind helper for core.scope_claim — claim a scope with a governance policy.
query_thread(thread, limit=100, since=None) All records in a thread. Fail-open (returns [] on transport error).
query(kind=, actor=, thread=, since=, limit=100, where=None) Filtered records. At least one of kind/actor/thread is required.
health() Server health probe. Fail-open (returns {} on failure).
close() Release the underlying HTTP client.

Constructor kwargs: endpoint, identity (required), timeout=30.0, max_retries=2, backoff_ms=250.0, on_emit=None, api_key=None, transport=None (custom httpx transport — useful for tests).

Identity

Form Status
Identity.static(did) Available
Identity.key(path_or_bytes) Planned — raises NotImplementedError if called today
Identity.federated(...) Planned — raises NotImplementedError if called today

Ref — canonical reference constructors

Constructor Canonical ID scheme
Ref.music_track(isrc= / spotify_id= / apple_id=) @music.track isrc:<ISRC> (preferred)
Ref.code_file(repo= / git_url=, path=) @code.file github:<repo>:<path> or git:<url>:<path>
Ref.ops_incident(pagerduty= / linear= / url=) @ops.incident pagerduty:<id> etc.
Ref.cal_event(uid=) @cal.event icalendar:<uid>
Ref.social_person(did= / email= / handle=+name=) @social.person DID pass-through, email:<x>, <platform>:<handle>
Ref.media_photo(sha256= / url=) @media.photo sha256:<hex> (preferred)
Ref.media_video(youtube= / vimeo= / sha256=) @media.video youtube:<id> etc.
Ref.doc_text(doi= / url= / platform_id=) @doc.text doi:<id> (preferred)
Ref.fin_transaction(stripe= / plaid= / iso20022=) @fin.transaction stripe:<id> etc.
Ref.research_paper(doi= / arxiv= / s2=) @research.paper doi:<id> (preferred)
Ref.core_thread(id=) @core.thread thread:<id>

Each returns {"kind": "@...", "id": "..."}. Pass a list as refs= to emit(); the SDK merges them into body._refs. Two records anywhere in the network sharing the same canonical ref are joinable.

EmitResult

@dataclass
class EmitResult:
    success: bool
    record_id: str | None = None
    clock: int | None = None
    error: str | None = None
    retried: int = 0
    kind: str = ""
    act: str = ""
    thread: str = ""

SyncropelKindError

Raised synchronously from validate_kind() and every emit* method when body.kind violates the grammar. Subclasses ValueError.


Fail-open contract

emit() never raises on network errors, 4xx, 5xx, or timeouts. Every call returns an EmitResult and your code inspects .success:

result = await client.emit(act="PUT", kind="music.catalog.track", body={}, thread="t")
if not result.success:
    # Transient failure — log and keep going.
    log.warning("emit failed: %s (retried %d)", result.error, result.retried)

A flaky network can't bring down your handler. You decide whether to drop the failure, retry, or escalate.

Grammar errors are different. SyncropelKindError always raises — it indicates programmer error (an invalid body.kind), and no retry will fix it. Failing loud at development time is correct.

Observability hook

def on_emit(result: EmitResult) -> None:
    my_metrics.increment("syncropel.emit", tags={"success": result.success})

client = Client(..., on_emit=on_emit)

Fires on every success and failure. Hook exceptions are swallowed with a warning — a broken metrics pipeline can't break your emit path.


Grammar reference

Every record's body.kind follows the scope.category.entity[.version] grammar. The SDK validates at emit time:

Kind Valid?
music.catalog.track ✓ — publisher scope, 3 segments
music.catalog.track.v2 ✓ — versioned
@music.track ✓ — community canonical (2-segment allowed for @<community> and core scopes)
core.alias ✓ — reserved core primitive
music.track_imported ✗ — 2-segment publisher scope forbidden
Music.Catalog.Track ✗ — uppercase forbidden
music.catalog.track.v2.foo ✗ — 5 segments

When a canonical ref exists for your domain, use it:

refs=[Ref.music_track(isrc="USJI19810404")]

This makes your record correlatable with every other music record across publishers. Without refs, nothing breaks — you lose cross-app correlation.


Testing your adapter

The SDK ships a canonical mock at syncropel.testing — exercise your adapter end-to-end without running a server:

from syncropel import Client, Identity, Ref
from syncropel.testing import MockKernel

async def test_my_adapter():
    server = MockKernel()
    client = Client(
        endpoint="http://mock",
        identity=Identity.static("did:test:me"),
        transport=server.transport(),
    )
    await my_adapter.import_library(client, tracks=fixtures)

    tracks = server.records_by_kind("music.catalog.track")
    assert len(tracks) == len(fixtures)
    for t in tracks:
        assert t["body"]["_refs"][0]["kind"] == "@music.track"

Record IDs produced by MockKernel use the same SHA-256-of-canonical-JSON rule as the real server, so result.record_id matches what you'd see in production. The mock also enforces DuplicateClock on (thread, actor, clock).

Failure injection for fail-open coverage: server.fail_next_post(n) and server.fail_next_get(n).


Versioning + stability

Independent semver. Patch releases ship freely; minor versions can change public behaviour with a CHANGELOG note.

SDK version Highlights
0.1.x Current. Async Client, grammar enforcement, canonical refs, reserved-kind helpers, MockKernel, fail-open transport, auto-clock fill.
0.2.x (planned) Subscribe / SSE, batch emit, signing-key identity.
1.0.x (planned) Federated identity, typed kind parameter.

License

Apache-2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

syncropel-0.1.0.tar.gz (44.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

syncropel-0.1.0-py3-none-any.whl (27.3 kB view details)

Uploaded Python 3

File details

Details for the file syncropel-0.1.0.tar.gz.

File metadata

  • Download URL: syncropel-0.1.0.tar.gz
  • Upload date:
  • Size: 44.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for syncropel-0.1.0.tar.gz
Algorithm Hash digest
SHA256 efc44399fc7e2b6d0b9ea0a87586dc0a1272823e3ae5fcd389afe5eb6a69b3ca
MD5 2305997719e8c55a8c1304901d658bd6
BLAKE2b-256 1db02364a103eb86bc3b414e400095641dee86eb54bb10f1eb31fd8abdeaa4ed

See more details on using hashes here.

File details

Details for the file syncropel-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: syncropel-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 27.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.1 {"installer":{"name":"uv","version":"0.11.1","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for syncropel-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 438a897ef38f0682235ffa7040766733f74a651474f793fd4b43f80f04caac9f
MD5 7ae5706ff9398484d7f94bfe25349468
BLAKE2b-256 0b1a6c6e87a950eaf76e7abd3db99134dfd780b51920d3a359f3680c0f83e86a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page