Skip to main content

Python SDK for Teicor external app integrations

Project description

Teicor Python SDK

For full external integration setup (install flow, auth, core/runtime interaction, and troubleshooting), see:

This SDK helps installed Teicor apps interact with:

  • Runtime proxy APIs
  • Core app-scoped state in Teicor core DB

Install from PyPI

Add to your requirements.txt (pin versions):

teicor-sdk==0.2.6

Example install command from PyPI:

pip install teicor-sdk==0.2.6

Usage

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

context = client.get_context()
state = client.get_core_state()
state = client.patch_core_state({"cursor": "next-page-token"})
rows = client.runtime_request(method="GET", runtime_path="api/schemas")

SDK CRUD quickstart (schema/table/column/record)

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

# Context + runtime access descriptor
ctx = client.get_context()
print(ctx.runtime_access_mode)      # proxy (today), direct for eligible installs
print(ctx.runtime_proxy_base_url)   # always available
print(ctx.runtime_api_base_url)     # used for direct mode
print(ctx.runtime_fallback)         # deterministic proxy fallback guidance

if ctx.runtime_access_mode == "direct":
    authority = client.get_runtime_authority_token(scope="write")
    print(authority.runtime_authority_token)
    print(authority.expires_in_seconds)

# ---------- Schema ----------
client.create_schema("app_shopify")
schemas = client.list_schemas()
client.update_schema("app_shopify", new_schema_name="app_shopify_v2")

# ---------- Table ----------
table = client.create_table(
    name="oauth_tokens",
    schema_name="app_shopify_v2",
    is_private=True,
)
table_id = str(table["id"])

tables = client.list_tables()
table_details = client.get_table(table_id)
client.update_table(table_id, name="oauth_tokens_v2")

# ---------- Column ----------
column = client.create_column(
    table_id=table_id,
    name="Provider Account Id",
    column_type="single_line_text",  # see TeicorClient.COLUMN_TYPES
    config={"unique": True},
)
column_id = str(column["id"])

columns = client.list_columns(table_id)
client.update_column(
    table_id=table_id,
    column_id=column_id,
    name="Provider Account ID",
    config={"unique": True},
)

# ---------- Record ----------
created = client.create_record(
    table_id=table_id,
    data={column_id: "acct_123"},
)
record_id = str(created["id"])

records = client.list_records(
    table_id=table_id,
    limit=50,
    offset=0,
    filter_expr={
        "and": [
            {
                "column_id": column_id,
                "op": "contains",  # see TeicorClient.FILTER_OPS
                "value": "acct_",
            }
        ]
    },
    sort=[{"column_id": column_id, "direction": "asc"}],
)

first = client.find_record(
    table_id=table_id,
    filter_expr={
        "and": [
            {"column_id": column_id, "op": "eq", "value": "acct_123"}
        ]
    },
)

client.update_record(
    table_id=table_id,
    record_id=record_id,
    data={column_id: "acct_456"},
)

client.delete_record(table_id=table_id, record_id=record_id)

# ---------- Cleanup ----------
client.delete_column(table_id=table_id, column_id=column_id)
client.delete_table(table_id)
client.delete_schema("app_shopify_v2")

Query APIs in SDK

The SDK now exposes query helpers for external integrations:

  • list_queries(group_id=None, include_all=False, updated_since=None)
  • get_query(query_id)
  • execute_query(query_id)
  • get_query_data(query_id, if_newer_than_watermark=None)
  • sync_queries(...) convenience full-loop helper

sync_queries(...) returns a typed QuerySyncSummary dataclass.

Basic usage:

queries = client.list_queries(include_all=True)
query_id = str(queries[0]["id"])

definition = client.get_query(query_id)
snapshot = client.execute_query(query_id)
incremental = client.get_query_data(
    query_id,
    if_newer_than_watermark=snapshot.get("source_watermark"),
)

Updated-since two-gate sync (recommended)

Use a two-gate strategy to avoid heavy query calls when no fresh data exists.

  1. Definition gate (cheap): call list_queries(updated_since=...).
  2. Data gate (targeted): for changed query IDs, call get_query_data(if_newer_than_watermark=...).

Query Sync Steps

  1. Read persisted state:
    • last_sync_at (team-level ISO timestamp)
    • query_watermarks (map of query_id -> source_watermark)
  2. Call list_queries(include_all=True, updated_since=last_sync_at).
  3. If last_sync_at is empty (first sync), call without updated_since to fetch all query definitions once.
  4. If response is empty, stop (no heavy calls).
  5. For each returned query:
    • read previous watermark for that query
    • call get_query_data(query_id, if_newer_than_watermark=prev)
    • if payload has unchanged=True, skip downstream work
    • else process rows and persist returned source_watermark
  6. Advance last_sync_at to the max updated_at returned in step 2.

End-to-end example

from datetime import datetime, timezone

state = client.get_core_state()
last_sync_at = state.get("last_sync_at")
query_watermarks = state.get("query_watermarks") or {}

changed_queries = client.list_queries(
    include_all=True,
    updated_since=last_sync_at,
)

if not changed_queries:
    print("No query definition updates since last sync.")
else:
    latest_updated_at = last_sync_at
    for query in changed_queries:
        query_id = str(query["id"])
        previous_watermark = query_watermarks.get(query_id)

        data = client.get_query_data(
            query_id,
            if_newer_than_watermark=previous_watermark,
        )

        if data.get("unchanged") is True:
            continue

        rows = data.get("rows") or []
        # TODO: upsert rows into external system

        watermark = data.get("source_watermark")
        if isinstance(watermark, str) and watermark:
            query_watermarks[query_id] = watermark

        updated_at = query.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_updated_at is None or updated_at > latest_updated_at
        ):
            latest_updated_at = updated_at

    next_state = {
        "query_watermarks": query_watermarks,
        "last_sync_at": latest_updated_at
        or datetime.now(timezone.utc).isoformat(),
    }
    client.patch_core_state(next_state)

Query Convenience Method

def handle_query_result(query: dict, result: dict) -> None:
    if result.get("unchanged") is True:
        return
    rows = result.get("rows") or []
    # TODO: sync rows to external system

summary = client.sync_queries(
    persist_state=True,
    on_query_result=handle_query_result,
)
print(summary)

Reliable large payload sync (recommended)

For large writes (for example 70,000+ rows), use chunked sync with retry/backoff:

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

def on_chunk(result, response):
    print(
        f"chunk={result.chunk_index + 1} "
        f"size={result.chunk_size} attempts={result.attempts}"
    )

summary = client.sync_records_bulk(
    table_id="<table-id>",
    records=records,                 # list[dict]
    on_conflict="overwrite",        # idempotent upsert behavior
    chunk_size=500,                  # runtime default max
    max_retries=5,
    retry_base_seconds=0.5,
    retry_max_seconds=8.0,
    retry_jitter_ratio=0.2,
    on_chunk_complete=on_chunk,
)

print(summary)

Resume from core-state checkpoint after interruption:

checkpoint_state = client.get_bulk_sync_checkpoint()

def save_checkpoint(checkpoint):
    client.save_bulk_sync_checkpoint(checkpoint=checkpoint)

summary = client.sync_records_bulk(
    table_id="<table-id>",
    records=records,
    chunk_size=500,
    start_chunk_index=client.parse_bulk_sync_checkpoint(checkpoint_state),
    on_checkpoint=save_checkpoint,
)

if summary.records_processed == summary.total_records:
    client.clear_bulk_sync_checkpoint()

Single-call convenience wrapper (load/save/clear checkpoint automatically):

summary = client.sync_records_bulk_with_core_checkpoint(
    table_id="<table-id>",
    records=records,
    chunk_size=500,
    on_conflict="overwrite",
)

Notes:

  • Keep deterministic unique/primary-key values so retries are safe.
  • Keep chunk size at or below runtime max bulk setting (default 500).
  • Use summary/chunk callbacks to checkpoint progress in your app.

Table two-gate sync (recommended)

Table sync follows the same two-gate pattern:

  1. Table definition gate via list_tables(updated_since=...)
  2. Table data gate via get_table_sync_status(if_newer_than_watermark=...)

SDK helpers:

  • list_tables(updated_since=None)
  • get_table_sync_status(table_id, if_newer_than_watermark=None)
  • list_records(table_id=..., ...) for the actual row fetch after gates pass
  • sync_tables(...) convenience full-loop helper

sync_tables(...) returns a typed TableSyncSummary dataclass.

Table Sync Steps

  1. Read state:
    • tables_last_sync_at (team-level timestamp)
    • table_watermarks (table_id -> source_watermark)
  2. Call list_tables(updated_since=tables_last_sync_at).
  3. If tables_last_sync_at is empty (first sync), call without updated_since to discover all tables once.
  4. If empty, stop.
  5. For each changed table:
    • call get_table_sync_status(table_id, if_newer_than_watermark=...)
    • if has_changes is false, skip row fetch
    • if has_changes is true, call list_records(...) and process rows
    • persist returned source_watermark for that table
  6. Advance tables_last_sync_at to max table updated_at from step 2.

Example

state = client.get_core_state()
tables_last_sync_at = state.get("tables_last_sync_at")
table_watermarks = state.get("table_watermarks") or {}

changed_tables = client.list_tables(updated_since=tables_last_sync_at)

if changed_tables:
    latest_table_updated_at = tables_last_sync_at

    for table in changed_tables:
        table_id = str(table["id"])
        prev_watermark = table_watermarks.get(table_id)

        status = client.get_table_sync_status(
            table_id,
            if_newer_than_watermark=prev_watermark,
        )
        if not status.get("has_changes"):
            continue

        # Fetch rows only when gate 2 says table data changed.
        records = client.list_records(table_id=table_id, limit=200, offset=0)
        # TODO: sync records into external system

        next_watermark = status.get("source_watermark")
        if isinstance(next_watermark, str) and next_watermark:
            table_watermarks[table_id] = next_watermark

        updated_at = table.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_table_updated_at is None
            or updated_at > latest_table_updated_at
        ):
            latest_table_updated_at = updated_at

    client.patch_core_state(
        {
            "tables_last_sync_at": latest_table_updated_at,
            "table_watermarks": table_watermarks,
        }
    )

Table Convenience Method

def handle_table_records(table: dict, records: list[dict]) -> None:
    # TODO: upsert records into external system
    pass

summary = client.sync_tables(
    records_page_size=200,
    persist_state=True,
    on_table_records=handle_table_records,
)
print(summary)

Package maintainers

Build artifacts:

python -m pip install --upgrade build
python -m build

Publish to PyPI:

python -m pip install --upgrade twine
python -m twine upload dist/*

Auth headers used

  • Authorization: Bearer <service_token>
  • X-Teicor-App-Slug: <app_slug>

Notes

  • core/state is app-scoped per team installation.
  • Runtime calls default to Teicor's proxy; eligible installs can use short-lived direct runtime authority tokens.
  • Source package is in sdk/python/src/teicor_sdk.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

teicor_sdk-0.2.6.tar.gz (15.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

teicor_sdk-0.2.6-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file teicor_sdk-0.2.6.tar.gz.

File metadata

  • Download URL: teicor_sdk-0.2.6.tar.gz
  • Upload date:
  • Size: 15.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.2.6.tar.gz
Algorithm Hash digest
SHA256 b6211d597dde8db6937fc7d58de12b99fee8923814d9d08c766d73c106de6d77
MD5 72700ebe695a9d87ab1f472d0a224875
BLAKE2b-256 b95d886e3bc75067f9f2f7452391867b8bc6dbf2b56c1c7b77b53c430db0019c

See more details on using hashes here.

File details

Details for the file teicor_sdk-0.2.6-py3-none-any.whl.

File metadata

  • Download URL: teicor_sdk-0.2.6-py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 6fef048d97a6df96c81b89ac8841d43f77e6caff2fa04acdb0f01ad3ccc69e1f
MD5 6d50c51faf033bdaa903c99ee55e597e
BLAKE2b-256 760571cc10970f5c8b40efb1212c81280ceb6230dcde55a7b42d108b05b4f432

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page