Skip to main content

Integration SDK for the Syncropel protocol — async client, grammar enforcement, canonical references, fail-open transport

Project description

syncropel

Python SDK for the Syncropel protocol — emit content-addressed records, query threads, validate grammar, fail open on transport errors.

import asyncio
from syncropel import Client, Identity, Ref

async def main():
    async with Client(
        endpoint="http://localhost:9100",
        identity=Identity.static("did:example:my-app"),
    ) as client:
        result = await client.emit(
            act="PUT",
            kind="music.catalog.track",
            body={"title": "Glow", "artists": ["Zonke"]},
            refs=[Ref.music_track(isrc="USJI19810404")],
            thread="music.library",
        )
        print(result.success, result.record_id)

asyncio.run(main())

One call validates the grammar, builds the record envelope, retries on transient errors, and resolves cleanly on transport failures so a flaky network never crashes your handler.


Features

  • Async clientemit, query, query_thread, intend, fulfill, plus reserved-kind helpers
  • Sync helperemit_sync() for scripts / CLIs without an event loop
  • Grammar enforcementbody.kind validated before any network call
  • Canonical references — 11 community ref constructors (@music.track, @code.file, @social.person, …) for cross-publisher correlation
  • Fail-open transport — every emit returns a result; transport errors never raise
  • Identity-aware — every record signed with the configured DID
  • Single runtime dependencyhttpx for HTTP
  • In-memory MockKernel at syncropel.testing — write adapter tests without a server

Install

pip install syncropel

Python 3.10+.


Quickstart

Start a Syncropel server locally, then emit your first record:

# hello.py
import asyncio
from syncropel import Client, Identity, Ref

async def main():
    async with Client(
        endpoint="http://localhost:9100",
        identity=Identity.static("did:example:me"),
    ) as c:
        result = await c.emit(
            act="PUT",
            kind="music.catalog.track",
            body={"title": "Glow"},
            refs=[Ref.music_track(isrc="USJI19810404")],
            thread="music.library",
        )
        print("emitted:", result.record_id)

        records = await c.query_thread("music.library")
        print(f"thread has {len(records)} record(s)")

asyncio.run(main())
python hello.py

See syncropel.com for installing the Syncropel server (spl) and a full hosted-vs-local guide.


Concepts in 60 seconds

  • Record — the immutable, content-addressed unit. 8 fields; the SDK builds the envelope for you.
  • Kindbody.kind names what the record is about, e.g. music.catalog.track. Follows a strict grammar: scope.category.entity[.version].
  • Thread — a logical conversation / workflow. Records share a thread when they're part of the same activity.
  • Actor — who emitted the record, expressed as a DID.
  • Ref — a canonical pointer to a real-world entity (a song, a file, a person…) so records about the same thing correlate across apps.

API reference

Client

Method Purpose
emit(act, kind, body, thread, refs=, parents=, data_type=, clock=) Primary emit. Validates kind, builds the envelope, retries on 5xx / network errors. Returns EmitResult. Never raises on transport failures.
emit_sync(...) Synchronous variant for scripts / CLIs without an event loop. Uses a persistent httpx.Client across calls so TCP keep-alive amortises setup across bulk emits.
intend(goal, thread=, ...) Open a thread with an INTEND record. Generates a random 64-hex thread id if none supplied; returns it on result.thread.
fulfill(thread, summary, fulfills=, ...) Close a thread with a KNOW record. fulfills accepts a single record id or list.
emit_correction(corrects, revised_fields, reason, thread, ...) Reserved-kind helper for core.correction — supersede earlier records with revised values.
emit_erasure(erases, reason, thread, ...) Reserved-kind helper for core.erasure — mark records as erased (e.g. for compliance).
emit_alias(old_kind, new_kind, reason, thread, ...) Reserved-kind helper for core.alias — declare that one kind supersedes another.
emit_scope_transfer(scope, from_publisher, to_publisher, reason, thread, ...) Reserved-kind helper for core.scope_transfer.
emit_scope_claim(scope, governance_policy, thread, ...) Reserved-kind helper for core.scope_claim — claim a scope with a governance policy.
query_thread(thread, limit=100, since=None) All records in a thread. Fail-open (returns [] on transport error).
query(kind=, actor=, thread=, since=, limit=100, where=None) Filtered records. At least one of kind/actor/thread is required.
health() Server health probe. Fail-open (returns {} on failure).
close() Release the underlying HTTP client.

Constructor kwargs: endpoint, identity (required), timeout=30.0, max_retries=2, backoff_ms=250.0, on_emit=None, api_key=None, transport=None (custom httpx transport — useful for tests).

Identity

Form Status
Identity.static(did) Available
Identity.key(path_or_bytes) Planned — raises NotImplementedError if called today
Identity.federated(...) Planned — raises NotImplementedError if called today

Ref — canonical reference constructors

Constructor Canonical ID scheme
Ref.music_track(isrc= / spotify_id= / apple_id=) @music.track isrc:<ISRC> (preferred)
Ref.code_file(repo= / git_url=, path=) @code.file github:<repo>:<path> or git:<url>:<path>
Ref.ops_incident(pagerduty= / linear= / url=) @ops.incident pagerduty:<id> etc.
Ref.cal_event(uid=) @cal.event icalendar:<uid>
Ref.social_person(did= / email= / handle=+name=) @social.person DID pass-through, email:<x>, <platform>:<handle>
Ref.media_photo(sha256= / url=) @media.photo sha256:<hex> (preferred)
Ref.media_video(youtube= / vimeo= / sha256=) @media.video youtube:<id> etc.
Ref.doc_text(doi= / url= / platform_id=) @doc.text doi:<id> (preferred)
Ref.fin_transaction(stripe= / plaid= / iso20022=) @fin.transaction stripe:<id> etc.
Ref.research_paper(doi= / arxiv= / s2=) @research.paper doi:<id> (preferred)
Ref.core_thread(id=) @core.thread thread:<id>

Each returns {"kind": "@...", "id": "..."}. Pass a list as refs= to emit(); the SDK merges them into body._refs. Two records anywhere in the network sharing the same canonical ref are joinable.

EmitResult

@dataclass
class EmitResult:
    success: bool
    record_id: str | None = None
    clock: int | None = None
    error: str | None = None
    retried: int = 0
    kind: str = ""
    act: str = ""
    thread: str = ""

SyncropelKindError

Raised synchronously from validate_kind() and every emit* method when body.kind violates the grammar. Subclasses ValueError.


Fail-open contract

emit() never raises on network errors, 4xx, 5xx, or timeouts. Every call returns an EmitResult and your code inspects .success:

result = await client.emit(act="PUT", kind="music.catalog.track", body={}, thread="t")
if not result.success:
    # Transient failure — log and keep going.
    log.warning("emit failed: %s (retried %d)", result.error, result.retried)

A flaky network can't bring down your handler. You decide whether to drop the failure, retry, or escalate.

Grammar errors are different. SyncropelKindError always raises — it indicates programmer error (an invalid body.kind), and no retry will fix it. Failing loud at development time is correct.

Observability hook

def on_emit(result: EmitResult) -> None:
    my_metrics.increment("syncropel.emit", tags={"success": result.success})

client = Client(..., on_emit=on_emit)

Fires on every success and failure. Hook exceptions are swallowed with a warning — a broken metrics pipeline can't break your emit path.


Grammar reference

Every record's body.kind follows the scope.category.entity[.version] grammar. The SDK validates at emit time:

Kind Valid?
music.catalog.track ✓ — publisher scope, 3 segments
music.catalog.track.v2 ✓ — versioned
@music.track ✓ — community canonical (2-segment allowed for @<community> and core scopes)
core.alias ✓ — reserved core primitive
music.track_imported ✗ — 2-segment publisher scope forbidden
Music.Catalog.Track ✗ — uppercase forbidden
music.catalog.track.v2.foo ✗ — 5 segments

When a canonical ref exists for your domain, use it:

refs=[Ref.music_track(isrc="USJI19810404")]

This makes your record correlatable with every other music record across publishers. Without refs, nothing breaks — you lose cross-app correlation.


Testing your adapter

The SDK ships a canonical mock at syncropel.testing — exercise your adapter end-to-end without running a server:

from syncropel import Client, Identity, Ref
from syncropel.testing import MockKernel

async def test_my_adapter():
    server = MockKernel()
    client = Client(
        endpoint="http://mock",
        identity=Identity.static("did:test:me"),
        transport=server.transport(),
    )
    await my_adapter.import_library(client, tracks=fixtures)

    tracks = server.records_by_kind("music.catalog.track")
    assert len(tracks) == len(fixtures)
    for t in tracks:
        assert t["body"]["_refs"][0]["kind"] == "@music.track"

Record IDs produced by MockKernel use the same SHA-256-of-canonical-JSON rule as the real server, so result.record_id matches what you'd see in production. The mock also enforces DuplicateClock on (thread, actor, clock).

Failure injection for fail-open coverage: server.fail_next_post(n) and server.fail_next_get(n).


Versioning + stability

Independent semver. Patch releases ship freely; minor versions can change public behaviour with a CHANGELOG note.

SDK version Highlights
0.1.x Current. Async Client, grammar enforcement, canonical refs, reserved-kind helpers, MockKernel, fail-open transport, auto-clock fill.
0.2.x (planned) Subscribe / SSE, batch emit, signing-key identity.
1.0.x (planned) Federated identity, typed kind parameter.

License

Apache-2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

syncropel-0.2.0.tar.gz (51.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

syncropel-0.2.0-py3-none-any.whl (32.3 kB view details)

Uploaded Python 3

File details

Details for the file syncropel-0.2.0.tar.gz.

File metadata

  • Download URL: syncropel-0.2.0.tar.gz
  • Upload date:
  • Size: 51.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for syncropel-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d3971febf3cb29b2460c6075a94860ddb5d6386f18cd2b5405129f48eb0a913e
MD5 b7358c35566c0add87eb0abe502dcf18
BLAKE2b-256 d07d2693567ac041efe0400df1774e77e3546fb755a4e6df2fc1d1a3c23d0e5d

See more details on using hashes here.

File details

Details for the file syncropel-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: syncropel-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 32.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for syncropel-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b3deeb66d18de95db61f67807c9e4b0fbdf98292d7943f68dfc2ce3cd3372dec
MD5 a9aaf3231f7b76a2c1cfa57e054737fa
BLAKE2b-256 dbd4a8cfbf7a80c327190e065897d98a404fc09d764f78062fbdfdddf02a1c0e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page