Skip to main content

Python SDK for Teicor external app integrations

Project description

Teicor Python SDK

For full external integration setup (install flow, auth, core/runtime interaction, and troubleshooting), see:

This SDK helps installed Teicor apps interact with:

  • Direct runtime APIs via runtime_url
  • Core app-scoped state in Teicor core DB

Install from PyPI

Add to your requirements.txt (pin versions):

    teicor-sdk==0.3.0

Example install command from PyPI:

pip install teicor-sdk==0.3.0

Usage

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

context = client.get_context()
state = client.get_core_state()
state = client.patch_core_state({"cursor": "next-page-token"})
rows = client.runtime_request(method="GET", runtime_path="api/schemas")

runtime_request (and CRUD helpers that call it) uses direct runtime access:

  • Reads runtime_url/auth_url from get_context().
  • Fetches/caches authority tokens by scope and calls runtime directly.
  • On direct 401/403, refreshes token once and retries once.

SDK CRUD quickstart (schema/table/column/record)

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

# Context + runtime access descriptor
ctx = client.get_context()
print(ctx.runtime_url)
print(ctx.auth_url)

authority = client.get_runtime_authority_token(scope="write")
print(authority.runtime_authority_token)
print(authority.runtime_url)
print(authority.expires_in_seconds)

# ---------- Schema ----------
client.create_schema("app_shopify")
schemas = client.list_schemas()
client.update_schema("app_shopify", new_schema_name="app_shopify_v2")

# ---------- Table ----------
table = client.create_table(
    name="oauth_tokens",
    schema_name="app_shopify_v2",
    is_private=True,
)
table_id = str(table["id"])

tables = client.list_tables()
table_details = client.get_table(table_id)
client.update_table(table_id, name="oauth_tokens_v2")

# ---------- Column ----------
column = client.create_column(
    table_id=table_id,
    name="Provider Account Id",
    column_type="single_line_text",  # see TeicorClient.COLUMN_TYPES
    config={"unique": True},
)
column_id = str(column["id"])

columns = client.list_columns(table_id)
client.update_column(
    table_id=table_id,
    column_id=column_id,
    name="Provider Account ID",
    config={"unique": True},
)

# ---------- Record ----------
created = client.create_record(
    table_id=table_id,
    data={column_id: "acct_123"},
)
record_id = str(created["id"])

records = client.list_records(
    table_id=table_id,
    limit=50,
    offset=0,
    filter_expr={
        "and": [
            {
                "column_id": column_id,
                "op": "contains",  # see TeicorClient.FILTER_OPS
                "value": "acct_",
            }
        ]
    },
    sort=[{"column_id": column_id, "direction": "asc"}],
)

first = client.find_record(
    table_id=table_id,
    filter_expr={
        "and": [
            {"column_id": column_id, "op": "eq", "value": "acct_123"}
        ]
    },
)

client.update_record(
    table_id=table_id,
    record_id=record_id,
    data={column_id: "acct_456"},
)

client.delete_record(table_id=table_id, record_id=record_id)

# ---------- Cleanup ----------
client.delete_column(table_id=table_id, column_id=column_id)
client.delete_table(table_id)
client.delete_schema("app_shopify_v2")

Query APIs in SDK

The SDK now exposes query helpers for external integrations:

  • list_queries(group_id=None, include_all=False, updated_since=None)
  • get_query(query_id)
  • execute_query(query_id)
  • get_query_data(query_id, if_newer_than_watermark=None)
  • sync_queries(...) convenience full-loop helper

sync_queries(...) returns a typed QuerySyncSummary dataclass.

Basic usage:

queries = client.list_queries(include_all=True)
query_id = str(queries[0]["id"])

definition = client.get_query(query_id)
snapshot = client.execute_query(query_id)
incremental = client.get_query_data(
    query_id,
    if_newer_than_watermark=snapshot.get("source_watermark"),
)

Updated-since two-gate sync (recommended)

Use a two-gate strategy to avoid heavy query calls when no fresh data exists.

  1. Definition gate (cheap): call list_queries(updated_since=...).
  2. Data gate (targeted): for changed query IDs, call get_query_data(if_newer_than_watermark=...).

Query Sync Steps

  1. Read persisted state:
    • last_sync_at (team-level ISO timestamp)
    • query_watermarks (map of query_id -> source_watermark)
  2. Call list_queries(include_all=True, updated_since=last_sync_at).
  3. If last_sync_at is empty (first sync), call without updated_since to fetch all query definitions once.
  4. If response is empty, stop (no heavy calls).
  5. For each returned query:
    • read previous watermark for that query
    • call get_query_data(query_id, if_newer_than_watermark=prev)
    • if payload has unchanged=True, skip downstream work
    • else process rows and persist returned source_watermark
  6. Advance last_sync_at to the max updated_at returned in step 2.

End-to-end example

from datetime import datetime, timezone

state = client.get_core_state()
last_sync_at = state.get("last_sync_at")
query_watermarks = state.get("query_watermarks") or {}

changed_queries = client.list_queries(
    include_all=True,
    updated_since=last_sync_at,
)

if not changed_queries:
    print("No query definition updates since last sync.")
else:
    latest_updated_at = last_sync_at
    for query in changed_queries:
        query_id = str(query["id"])
        previous_watermark = query_watermarks.get(query_id)

        data = client.get_query_data(
            query_id,
            if_newer_than_watermark=previous_watermark,
        )

        if data.get("unchanged") is True:
            continue

        rows = data.get("rows") or []
        # TODO: upsert rows into external system

        watermark = data.get("source_watermark")
        if isinstance(watermark, str) and watermark:
            query_watermarks[query_id] = watermark

        updated_at = query.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_updated_at is None or updated_at > latest_updated_at
        ):
            latest_updated_at = updated_at

    next_state = {
        "query_watermarks": query_watermarks,
        "last_sync_at": latest_updated_at
        or datetime.now(timezone.utc).isoformat(),
    }
    client.patch_core_state(next_state)

Query Convenience Method

def handle_query_result(query: dict, result: dict) -> None:
    if result.get("unchanged") is True:
        return
    rows = result.get("rows") or []
    # TODO: sync rows to external system

summary = client.sync_queries(
    persist_state=True,
    on_query_result=handle_query_result,
)
print(summary)

Reliable large payload sync (recommended)

For large writes (for example 70,000+ rows), use chunked sync with retry/backoff:

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

def on_chunk(result, response):
    print(
        f"chunk={result.chunk_index + 1} "
        f"size={result.chunk_size} attempts={result.attempts}"
    )

summary = client.sync_records_bulk(
    table_id="<table-id>",
    records=records,                 # list[dict]
    on_conflict="overwrite",        # idempotent upsert behavior
    chunk_size=500,                  # runtime default max
    max_retries=5,
    retry_base_seconds=0.5,
    retry_max_seconds=8.0,
    retry_jitter_ratio=0.2,
    on_chunk_complete=on_chunk,
)

print(summary)

Resume from core-state checkpoint after interruption:

checkpoint_state = client.get_bulk_sync_checkpoint()

def save_checkpoint(checkpoint):
    client.save_bulk_sync_checkpoint(checkpoint=checkpoint)

summary = client.sync_records_bulk(
    table_id="<table-id>",
    records=records,
    chunk_size=500,
    start_chunk_index=client.parse_bulk_sync_checkpoint(checkpoint_state),
    on_checkpoint=save_checkpoint,
)

if summary.records_processed == summary.total_records:
    client.clear_bulk_sync_checkpoint()

Single-call convenience wrapper (load/save/clear checkpoint automatically):

summary = client.sync_records_bulk_with_core_checkpoint(
    table_id="<table-id>",
    records=records,
    chunk_size=500,
    on_conflict="overwrite",
)

Notes:

  • Keep deterministic unique/primary-key values so retries are safe.
  • Keep chunk size at or below runtime max bulk setting (default 500).
  • Use summary/chunk callbacks to checkpoint progress in your app.

Table two-gate sync (recommended)

Table sync follows the same two-gate pattern:

  1. Table definition gate via list_tables(updated_since=...)
  2. Table data gate via get_table_sync_status(if_newer_than_watermark=...)

SDK helpers:

  • list_tables(updated_since=None)
  • get_table_sync_status(table_id, if_newer_than_watermark=None)
  • list_records(table_id=..., ...) for the actual row fetch after gates pass
  • sync_tables(...) convenience full-loop helper

sync_tables(...) returns a typed TableSyncSummary dataclass.

Table Sync Steps

  1. Read state:
    • tables_last_sync_at (team-level timestamp)
    • table_watermarks (table_id -> source_watermark)
  2. Call list_tables(updated_since=tables_last_sync_at).
  3. If tables_last_sync_at is empty (first sync), call without updated_since to discover all tables once.
  4. If empty, stop.
  5. For each changed table:
    • call get_table_sync_status(table_id, if_newer_than_watermark=...)
    • if has_changes is false, skip row fetch
    • if has_changes is true, call list_records(...) and process rows
    • persist returned source_watermark for that table
  6. Advance tables_last_sync_at to max table updated_at from step 2.

Example

state = client.get_core_state()
tables_last_sync_at = state.get("tables_last_sync_at")
table_watermarks = state.get("table_watermarks") or {}

changed_tables = client.list_tables(updated_since=tables_last_sync_at)

if changed_tables:
    latest_table_updated_at = tables_last_sync_at

    for table in changed_tables:
        table_id = str(table["id"])
        prev_watermark = table_watermarks.get(table_id)

        status = client.get_table_sync_status(
            table_id,
            if_newer_than_watermark=prev_watermark,
        )
        if not status.get("has_changes"):
            continue

        # Fetch rows only when gate 2 says table data changed.
        records = client.list_records(table_id=table_id, limit=200, offset=0)
        # TODO: sync records into external system

        next_watermark = status.get("source_watermark")
        if isinstance(next_watermark, str) and next_watermark:
            table_watermarks[table_id] = next_watermark

        updated_at = table.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_table_updated_at is None
            or updated_at > latest_table_updated_at
        ):
            latest_table_updated_at = updated_at

    client.patch_core_state(
        {
            "tables_last_sync_at": latest_table_updated_at,
            "table_watermarks": table_watermarks,
        }
    )

Table Convenience Method

def handle_table_records(table: dict, records: list[dict]) -> None:
    # TODO: upsert records into external system
    pass

summary = client.sync_tables(
    records_page_size=200,
    persist_state=True,
    on_table_records=handle_table_records,
)
print(summary)

Package maintainers

Build artifacts:

python -m pip install --upgrade build
python -m build

Publish to PyPI:

python -m pip install --upgrade twine
python -m twine upload dist/*

Auth headers used

  • Authorization: Bearer <service_token>
  • X-Teicor-App-Slug: <app_slug>

Notes

  • core/state is app-scoped per team installation.
  • Runtime calls use direct runtime URLs with short-lived authority tokens.
  • Source package is in sdk/python/src/teicor_sdk.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

teicor_sdk-0.3.0.tar.gz (18.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

teicor_sdk-0.3.0-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file teicor_sdk-0.3.0.tar.gz.

File metadata

  • Download URL: teicor_sdk-0.3.0.tar.gz
  • Upload date:
  • Size: 18.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.3.0.tar.gz
Algorithm Hash digest
SHA256 f7f5483e2e9df735f10a641a4a606a5d39f9061a4d62987e05e36f77406e4abd
MD5 af17478e5c8221a55b6d54c9096f2531
BLAKE2b-256 5afb1bf26ed3ba193f5375aaaccfe33e125b99c49b171b38cbe9f70740d7baf3

See more details on using hashes here.

File details

Details for the file teicor_sdk-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: teicor_sdk-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6be350d5bd9e7af5265a0b399e50d95d6e2b678fa2b63457786ac177b5f5640a
MD5 a18807b4490e011a699ae57deb0ad082
BLAKE2b-256 6d12e27d64187e9e32dbdb11d2927985bca11c87b730b4ccc1b4f82081db0a9c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page