Skip to main content

Keep your tools in sync on autopilot — durable, idempotent data sync from any source into any destination, built on Temporal.

Project description

durable-sync

Keep the tools your team lives in automatically in sync — your events, videos, repos, and published content flowing into the catalog or tracker you actually use — without the brittle script that silently dies at 2am.

Teams end up copy-pasting between tools, or babysitting a homegrown script that breaks the moment an API hiccups or a token expires. durable-sync is a small Python library for building syncs that just keep running: pull records from a source (your YouTube channel, your Luma events, a Contentful CMS, a GitHub org) and keep them continuously, accurately mirrored into a destination (a Notion database, an Asana project). For example —

  • every new YouTube video shows up as a row in your Notion content database,
  • your Luma events stay mirrored into an Asana project,
  • your published Contentful articles land in a marketing calendar.

You write a little Python to say where to read and where to write; the library makes it durable. Built on Temporal. GitHub → Notion is the reference wiring.

Why bother (vs. a quick script)

A weekend script works until it doesn't. durable-sync gives you, out of the box:

  • It just stays current. Each sync runs on its own schedule, forever, keeping itself up to date — no cron job to babysit.
  • No duplicates, ever. Re-runs and retries update the existing row instead of creating a second copy (every record carries a stable id).
  • It survives outages. If your machine restarts or a service goes down mid-sync, it resumes exactly where it left off.
  • It waits instead of flailing. When a login expires or is revoked, the sync pauses and tells you — rather than hammering a dead credential.
  • No admin required. For tools like Notion you can authorize as yourself (no IT-issued API key), and your login is refreshed safely in the background.
  • It scales. From 10 records to hundreds of thousands, it pages through them without falling over.

(Under the hood that's durable orchestration, idempotent upserts, headless OAuth, and rate-limit backoff — all inherited from the library, none of it your problem.)

The mental model: two seams

  Source.fetch(spec) ─► [Record, …] ─► Destination upserts (idempotent, keyed on primary_key)
  • Record = {primary_key, properties, body}. properties are neutral Python values (str/int/bool/list/date/datetime); the destination owns all wire-encoding, so a source author never learns a destination's quirks.
  • primary_key is the immutable idempotency key (a repo id, an event id) — never a name or URL. This is the single most important field: it's what makes retries safe.

Everything else — orchestration, OAuth, backoff — lives in the "spine" and is shared by every connector.

Requirements

Quickstart: see it run in two minutes

This runs the whole spine end-to-end with a network-free in-memory destination — no tokens, no external services.

pip install "durable-sync[all,dev]"

# In one terminal: a local Temporal dev server
temporal server start-dev

# In another: the offline spine smoke (fetches fake records, upserts them twice,
# proves the second pass updates instead of duplicating)
PYTHONPATH=. python tests/smoke_spine.py

You should see a first pass create rows and a second pass update the same rows — idempotency in action. Open the Temporal UI (http://localhost:8233) to watch the workflow.

Wire your own sync

A source and a destination are just two small classes. Here's a complete, runnable sketch:

import asyncio
from contextlib import asynccontextmanager

from durable_sync.core import Record, SourceSpec
from durable_sync.worker import run_worker
from durable_sync.bootstrap import start_sources


# 1) A SOURCE: produce neutral Records, keyed on a stable primary_key.
class TasksSource:
    name = "tasks"

    def specs(self):
        # One SourceSpec per independent unit of work — each gets its own workflow.
        return [SourceSpec(key="all", interval_minutes=15)]

    async def fetch(self, spec, only_items=None):
        rows = await my_api.list_tasks()        # however you read your data
        return [
            Record(primary_key=str(r["id"]),    # immutable id — NOT the title
                   properties={"Title": r["title"], "Done": r["completed"]})
            for r in rows
        ]


# 2) A DESTINATION: idempotent upsert. query_existing_ids() decides create vs update.
class PrinterDestination:
    name = "printer"
    configured = True                            # spine refuses to sync if False
    config_hint = "(always configured)"
    create_only_properties = set()               # props written once, never overwritten

    @asynccontextmanager
    async def connect(self):
        yield self                               # this object is also the session

    async def query_existing_ids(self):
        return {}                                # {primary_key: destination_id} already present

    async def create(self, record, synced_at):
        print("CREATE", record.primary_key, record.properties); return True

    async def update(self, existing_id, record, synced_at):
        print("UPDATE", existing_id, record.properties); return True

    @staticmethod
    def is_auth_error(err):
        return False                             # no interactive auth to break


SOURCE, DESTINATION = TasksSource(), PrinterDestination()

async def main():
    await start_sources(SOURCE)                  # ensure one entity workflow per spec (idempotent)
    await run_worker(SOURCE, DESTINATION)        # host the workflow + activities; runs forever

asyncio.run(main())

Operate the running sync from the Temporal CLI — the workflow id is durable-sync:<spec.key>:

# Trigger a sync now instead of waiting for the interval:
temporal workflow signal --workflow-id "durable-sync:all" --name sync_now --input '[]'

# See when it last ran, its stats, and any error:
temporal workflow query  --workflow-id "durable-sync:all" --type status

That's the whole contract. For the real interfaces (optional body, the destination session split, source enrichment hooks, paginated fetch_page, the transform seam), see CONTRIBUTING.md.

Connectors

Reuse a built-in connector instead of writing your own. Each lives in durable_sync/connectors/<system>/:

System Source Destination Notes
GitHub Orgs + named repos; per-repo enrichment hook
YouTube A channel's uploads
Luma Calendar events (REST); destination needs a LinkStore
Contentful REST source (CDA/CMA); destination via REST CMA or MCP-over-OAuth for SSO-blocked spaces
Spotify Liked Songs, keyed on ISRC; workflow-owned OAuth (PKCE, no admin token)
Notion MCP transport + workflow-owned OAuth (no admin token needed)
Asana Direct REST + a self-serve personal token

A connector is grouped by system, not direction, because a system is often both a source and a destination and the two sides share a client + auth. Under the hood, a connector composes a transport (MCP or REST/http.py) with an auth mechanism (workflow-owned OAuth, or an inline token) — the two axes are independent.

Key concepts

  • One workflow per source unit. Source.specs() returns a list of SourceSpecs; each becomes a long-lived entity workflow that is its own timer (sleeps interval_minutes, wakes early on a sync_now signal) and uses continue-as-new to bound history. No external scheduler.
  • Idempotency is keyed, never inferred. The upsert does query_existing_ids() → update-or-create per primary_key. Sync only ever creates/updates rows it fetched — it never deletes — so hand-added data survives.
  • OAuth as a workflow. For services where you can't get an admin token, a OAuthTokenWorkflow owns the rotating refresh token, serializes refreshes (no rotation race), and serves fresh access tokens via query so the secret stays out of history. (Pair with the opt-in AES-GCM payload codec to encrypt secrets at rest in history too.)
  • LinkStore for FK-less destinations. Some systems (Luma, Contentful over MCP) can't store your primary_key on their own objects, so the correspondence lives in an app-provided durable store. In-memory and SQLite references ship; use a real datastore in production.
  • Scales by paging. Large sources implement fetch_page so the spine fetches + upserts page-by-page, keeping every payload under Temporal's limits. See the Scaling section of CONTRIBUTING.md.

Install

pip install "durable-sync[notion]"     # a destination: notion / asana
pip install "durable-sync[github]"     # a source: github / luma / youtube / contentful / spotify
pip install "durable-sync[crypto]"     # opt-in AES-GCM payload encryption
pip install "durable-sync[all,dev]"    # everything + test deps

Configuration

All runtime config is environment variables (see durable_sync/config.py):

Variable Purpose
TEMPORAL_ADDRESS / TEMPORAL_NAMESPACE Cluster to connect to (defaults to localhost:7233 / default)
TEMPORAL_API_KEY Set for Temporal Cloud (enables TLS)
DURABLE_SYNC_TASK_QUEUE Task queue name
DURABLE_SYNC_ENC_KEY base64 AES-256 key to encrypt payloads in history (python -m durable_sync.codec generates one)
DURABLE_SYNC_BUILD_ID Opt-in Worker Versioning for safe redeploys of the long-lived workflows

Connector-specific config (which org, which Notion database, which token env var) lives in the source/destination you wire up — never in config.py.

Project layout

durable_sync/
├── core.py             Record + Source/Destination protocols (the contract)
├── activities.py       generic fetch_source / sync_records
├── workflows/sync.py   SourceSyncWorkflow — one durable entity workflow per source unit
├── worker.py           run_worker(SOURCE, DESTINATION)
├── bootstrap.py        start_sources(SOURCE) — one workflow per spec (idempotent)
├── codec.py            opt-in AES-GCM payload codec
├── auth/oauth/         OAuth-as-a-workflow toolkit (token-owner workflow + flow)
├── transport/mcp.py    generic Model Context Protocol transport (Notion + Contentful)
├── http.py             shared httpx retry/backoff for REST connectors
├── linkstore.py        idempotency map for FK-less destinations
├── route.py            Route = source -> (transform, field ownership) -> destination
└── connectors/         one subpackage per system (github, youtube, luma, contentful, notion, asana)

Contributing

CONTRIBUTING.md is the authoritative guide for adding a source, destination, auth mechanism, or transformation — with real signatures, the testing pattern, and the hard-won gotchas (workflow determinism, signal handlers, history limits, scaling).

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

durable_sync-0.2.0.tar.gz (115.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

durable_sync-0.2.0-py3-none-any.whl (129.2 kB view details)

Uploaded Python 3

File details

Details for the file durable_sync-0.2.0.tar.gz.

File metadata

  • Download URL: durable_sync-0.2.0.tar.gz
  • Upload date:
  • Size: 115.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for durable_sync-0.2.0.tar.gz
Algorithm Hash digest
SHA256 fd92a3a18b3a85c63c3f0e6c0eb9145b090bf9a0f807b7e72b4a7d5cd4981d16
MD5 b15dfd2c6289ae7ca80f3b26aa543fb2
BLAKE2b-256 aae3faa5d5dcbb4debef5461c79a8bc03dbe16cb60c78a17fdaf977504bd8e34

See more details on using hashes here.

Provenance

The following attestation bundles were made for durable_sync-0.2.0.tar.gz:

Publisher: publish.yml on temporal-community/durable-sync

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file durable_sync-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: durable_sync-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 129.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for durable_sync-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 76d19c9c803ddec34cbf94bbeb420a7cad867c04f3924f6405aee29ee5657671
MD5 4d6de6b6059f5fb5dc36db34f068f7b9
BLAKE2b-256 94af160b29905074ff20f82bd6d66ea46482cdf3d83223daf23fd2d7c2b37d5d

See more details on using hashes here.

Provenance

The following attestation bundles were made for durable_sync-0.2.0-py3-none-any.whl:

Publisher: publish.yml on temporal-community/durable-sync

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page