Skip to main content

Python SDK for Teicor external app integrations

Project description

Teicor Python SDK

For full external integration setup (install flow, auth, core/runtime interaction, and troubleshooting), see:

This SDK helps installed Teicor apps interact with:

  • Runtime proxy APIs
  • Core app-scoped state in Teicor core DB

Install from PyPI

Add to your requirements.txt (pin versions):

    teicor-sdk==0.2.8

Example install command from PyPI:

pip install teicor-sdk==0.2.8

Usage

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

context = client.get_context()
state = client.get_core_state()
state = client.patch_core_state({"cursor": "next-page-token"})
rows = client.runtime_request(method="GET", runtime_path="api/schemas")

runtime_request (and CRUD helpers that call it) now auto-route by mode:

  • proxy mode: calls Teicor proxy (/v1/teams/<team_slug>/runtime/...).
  • direct mode: fetches/refreshes authority token and calls runtime_api_base_url directly.
  • If direct call fails with network/timeout or fallback 5xx statuses, SDK retries once via proxy fallback guidance.

SDK CRUD quickstart (schema/table/column/record)

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

# Context + runtime access descriptor
ctx = client.get_context()
print(ctx.runtime_access_mode)      # proxy (today), direct for eligible installs
print(ctx.runtime_proxy_base_url)   # always available
print(ctx.runtime_api_base_url)     # used for direct mode
print(ctx.runtime_fallback)         # deterministic proxy fallback guidance

if ctx.runtime_access_mode == "direct":
    authority = client.get_runtime_authority_token(scope="write")
    print(authority.runtime_authority_token)
    print(authority.expires_in_seconds)

# ---------- Schema ----------
client.create_schema("app_shopify")
schemas = client.list_schemas()
client.update_schema("app_shopify", new_schema_name="app_shopify_v2")

# ---------- Table ----------
table = client.create_table(
    name="oauth_tokens",
    schema_name="app_shopify_v2",
    is_private=True,
)
table_id = str(table["id"])

tables = client.list_tables()
table_details = client.get_table(table_id)
client.update_table(table_id, name="oauth_tokens_v2")

# ---------- Column ----------
column = client.create_column(
    table_id=table_id,
    name="Provider Account Id",
    column_type="single_line_text",  # see TeicorClient.COLUMN_TYPES
    config={"unique": True},
)
column_id = str(column["id"])

columns = client.list_columns(table_id)
client.update_column(
    table_id=table_id,
    column_id=column_id,
    name="Provider Account ID",
    config={"unique": True},
)

# ---------- Record ----------
created = client.create_record(
    table_id=table_id,
    data={column_id: "acct_123"},
)
record_id = str(created["id"])

records = client.list_records(
    table_id=table_id,
    limit=50,
    offset=0,
    filter_expr={
        "and": [
            {
                "column_id": column_id,
                "op": "contains",  # see TeicorClient.FILTER_OPS
                "value": "acct_",
            }
        ]
    },
    sort=[{"column_id": column_id, "direction": "asc"}],
)

first = client.find_record(
    table_id=table_id,
    filter_expr={
        "and": [
            {"column_id": column_id, "op": "eq", "value": "acct_123"}
        ]
    },
)

client.update_record(
    table_id=table_id,
    record_id=record_id,
    data={column_id: "acct_456"},
)

client.delete_record(table_id=table_id, record_id=record_id)

# ---------- Cleanup ----------
client.delete_column(table_id=table_id, column_id=column_id)
client.delete_table(table_id)
client.delete_schema("app_shopify_v2")

Query APIs in SDK

The SDK now exposes query helpers for external integrations:

  • list_queries(group_id=None, include_all=False, updated_since=None)
  • get_query(query_id)
  • execute_query(query_id)
  • get_query_data(query_id, if_newer_than_watermark=None)
  • sync_queries(...) convenience full-loop helper

sync_queries(...) returns a typed QuerySyncSummary dataclass.

Basic usage:

queries = client.list_queries(include_all=True)
query_id = str(queries[0]["id"])

definition = client.get_query(query_id)
snapshot = client.execute_query(query_id)
incremental = client.get_query_data(
    query_id,
    if_newer_than_watermark=snapshot.get("source_watermark"),
)

Updated-since two-gate sync (recommended)

Use a two-gate strategy to avoid heavy query calls when no fresh data exists.

  1. Definition gate (cheap): call list_queries(updated_since=...).
  2. Data gate (targeted): for changed query IDs, call get_query_data(if_newer_than_watermark=...).

Query Sync Steps

  1. Read persisted state:
    • last_sync_at (team-level ISO timestamp)
    • query_watermarks (map of query_id -> source_watermark)
  2. Call list_queries(include_all=True, updated_since=last_sync_at).
  3. If last_sync_at is empty (first sync), call without updated_since to fetch all query definitions once.
  4. If response is empty, stop (no heavy calls).
  5. For each returned query:
    • read previous watermark for that query
    • call get_query_data(query_id, if_newer_than_watermark=prev)
    • if payload has unchanged=True, skip downstream work
    • else process rows and persist returned source_watermark
  6. Advance last_sync_at to the max updated_at returned in step 2.

End-to-end example

from datetime import datetime, timezone

state = client.get_core_state()
last_sync_at = state.get("last_sync_at")
query_watermarks = state.get("query_watermarks") or {}

changed_queries = client.list_queries(
    include_all=True,
    updated_since=last_sync_at,
)

if not changed_queries:
    print("No query definition updates since last sync.")
else:
    latest_updated_at = last_sync_at
    for query in changed_queries:
        query_id = str(query["id"])
        previous_watermark = query_watermarks.get(query_id)

        data = client.get_query_data(
            query_id,
            if_newer_than_watermark=previous_watermark,
        )

        if data.get("unchanged") is True:
            continue

        rows = data.get("rows") or []
        # TODO: upsert rows into external system

        watermark = data.get("source_watermark")
        if isinstance(watermark, str) and watermark:
            query_watermarks[query_id] = watermark

        updated_at = query.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_updated_at is None or updated_at > latest_updated_at
        ):
            latest_updated_at = updated_at

    next_state = {
        "query_watermarks": query_watermarks,
        "last_sync_at": latest_updated_at
        or datetime.now(timezone.utc).isoformat(),
    }
    client.patch_core_state(next_state)

Query Convenience Method

def handle_query_result(query: dict, result: dict) -> None:
    if result.get("unchanged") is True:
        return
    rows = result.get("rows") or []
    # TODO: sync rows to external system

summary = client.sync_queries(
    persist_state=True,
    on_query_result=handle_query_result,
)
print(summary)

Reliable large payload sync (recommended)

For large writes (for example 70,000+ rows), use chunked sync with retry/backoff:

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

def on_chunk(result, response):
    print(
        f"chunk={result.chunk_index + 1} "
        f"size={result.chunk_size} attempts={result.attempts}"
    )

summary = client.sync_records_bulk(
    table_id="<table-id>",
    records=records,                 # list[dict]
    on_conflict="overwrite",        # idempotent upsert behavior
    chunk_size=500,                  # runtime default max
    max_retries=5,
    retry_base_seconds=0.5,
    retry_max_seconds=8.0,
    retry_jitter_ratio=0.2,
    on_chunk_complete=on_chunk,
)

print(summary)

Resume from core-state checkpoint after interruption:

checkpoint_state = client.get_bulk_sync_checkpoint()

def save_checkpoint(checkpoint):
    client.save_bulk_sync_checkpoint(checkpoint=checkpoint)

summary = client.sync_records_bulk(
    table_id="<table-id>",
    records=records,
    chunk_size=500,
    start_chunk_index=client.parse_bulk_sync_checkpoint(checkpoint_state),
    on_checkpoint=save_checkpoint,
)

if summary.records_processed == summary.total_records:
    client.clear_bulk_sync_checkpoint()

Single-call convenience wrapper (load/save/clear checkpoint automatically):

summary = client.sync_records_bulk_with_core_checkpoint(
    table_id="<table-id>",
    records=records,
    chunk_size=500,
    on_conflict="overwrite",
)

Notes:

  • Keep deterministic unique/primary-key values so retries are safe.
  • Keep chunk size at or below runtime max bulk setting (default 500).
  • Use summary/chunk callbacks to checkpoint progress in your app.

Table two-gate sync (recommended)

Table sync follows the same two-gate pattern:

  1. Table definition gate via list_tables(updated_since=...)
  2. Table data gate via get_table_sync_status(if_newer_than_watermark=...)

SDK helpers:

  • list_tables(updated_since=None)
  • get_table_sync_status(table_id, if_newer_than_watermark=None)
  • list_records(table_id=..., ...) for the actual row fetch after gates pass
  • sync_tables(...) convenience full-loop helper

sync_tables(...) returns a typed TableSyncSummary dataclass.

Table Sync Steps

  1. Read state:
    • tables_last_sync_at (team-level timestamp)
    • table_watermarks (table_id -> source_watermark)
  2. Call list_tables(updated_since=tables_last_sync_at).
  3. If tables_last_sync_at is empty (first sync), call without updated_since to discover all tables once.
  4. If empty, stop.
  5. For each changed table:
    • call get_table_sync_status(table_id, if_newer_than_watermark=...)
    • if has_changes is false, skip row fetch
    • if has_changes is true, call list_records(...) and process rows
    • persist returned source_watermark for that table
  6. Advance tables_last_sync_at to max table updated_at from step 2.

Example

state = client.get_core_state()
tables_last_sync_at = state.get("tables_last_sync_at")
table_watermarks = state.get("table_watermarks") or {}

changed_tables = client.list_tables(updated_since=tables_last_sync_at)

if changed_tables:
    latest_table_updated_at = tables_last_sync_at

    for table in changed_tables:
        table_id = str(table["id"])
        prev_watermark = table_watermarks.get(table_id)

        status = client.get_table_sync_status(
            table_id,
            if_newer_than_watermark=prev_watermark,
        )
        if not status.get("has_changes"):
            continue

        # Fetch rows only when gate 2 says table data changed.
        records = client.list_records(table_id=table_id, limit=200, offset=0)
        # TODO: sync records into external system

        next_watermark = status.get("source_watermark")
        if isinstance(next_watermark, str) and next_watermark:
            table_watermarks[table_id] = next_watermark

        updated_at = table.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_table_updated_at is None
            or updated_at > latest_table_updated_at
        ):
            latest_table_updated_at = updated_at

    client.patch_core_state(
        {
            "tables_last_sync_at": latest_table_updated_at,
            "table_watermarks": table_watermarks,
        }
    )

Table Convenience Method

def handle_table_records(table: dict, records: list[dict]) -> None:
    # TODO: upsert records into external system
    pass

summary = client.sync_tables(
    records_page_size=200,
    persist_state=True,
    on_table_records=handle_table_records,
)
print(summary)

Package maintainers

Build artifacts:

python -m pip install --upgrade build
python -m build

Publish to PyPI:

python -m pip install --upgrade twine
python -m twine upload dist/*

Auth headers used

  • Authorization: Bearer <service_token>
  • X-Teicor-App-Slug: <app_slug>

Notes

  • core/state is app-scoped per team installation.
  • Runtime calls default to Teicor's proxy; eligible installs can use short-lived direct runtime authority tokens.
  • Source package is in sdk/python/src/teicor_sdk.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

teicor_sdk-0.2.8.tar.gz (19.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

teicor_sdk-0.2.8-py3-none-any.whl (14.5 kB view details)

Uploaded Python 3

File details

Details for the file teicor_sdk-0.2.8.tar.gz.

File metadata

  • Download URL: teicor_sdk-0.2.8.tar.gz
  • Upload date:
  • Size: 19.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.2.8.tar.gz
Algorithm Hash digest
SHA256 c922f524218291cb366389313d641657fd31fc6924f190f47a327d5df0588b03
MD5 2cd9f308a8b856e92fb36736d240b555
BLAKE2b-256 720f5dd5497eae2f07eba0b7eaa1cac6a1e7377ae86dcf88b3553e482aeac617

See more details on using hashes here.

File details

Details for the file teicor_sdk-0.2.8-py3-none-any.whl.

File metadata

  • Download URL: teicor_sdk-0.2.8-py3-none-any.whl
  • Upload date:
  • Size: 14.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.2.8-py3-none-any.whl
Algorithm Hash digest
SHA256 5c8d08dbdb03ec454fed8689037cf5d31cb368a51e7556ebbebe804132402f21
MD5 3e8869029269650e98f9e1ceeac22ea8
BLAKE2b-256 31657d99085bcd900b07c0e6ca64ee67c5d620091d1fb92635174ae15e236cfa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page