Skip to main content

Python SDK for Teicor external app integrations

Project description

Teicor Python SDK

For full external integration setup (install flow, auth, core/runtime interaction, and troubleshooting), see:

This SDK helps installed Teicor apps interact with:

  • Runtime proxy APIs
  • Core app-scoped state in Teicor core DB

Install from PyPI

Add to your requirements.txt (pin versions):

teicor-sdk==0.2.4

Example install command from PyPI:

pip install teicor-sdk==0.2.4

Usage

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

context = client.get_context()
state = client.get_core_state()
state = client.patch_core_state({"cursor": "next-page-token"})
rows = client.runtime_request(method="GET", runtime_path="api/schemas")

SDK CRUD quickstart (schema/table/column/record)

from teicor_sdk import TeicorClient

client = TeicorClient(
    base_url="https://api.teicor.com",
    team_slug="client-a",
    app_slug="shopify",
    service_token="tkmkt_...",
)

# Context + runtime access descriptor
ctx = client.get_context()
print(ctx.runtime_access_mode)      # proxy (today)
print(ctx.runtime_proxy_base_url)   # use now
print(ctx.runtime_api_base_url)     # future direct mode

# ---------- Schema ----------
client.create_schema("app_shopify")
schemas = client.list_schemas()
client.update_schema("app_shopify", new_schema_name="app_shopify_v2")

# ---------- Table ----------
table = client.create_table(
    name="oauth_tokens",
    schema_name="app_shopify_v2",
    is_private=True,
)
table_id = str(table["id"])

tables = client.list_tables()
table_details = client.get_table(table_id)
client.update_table(table_id, name="oauth_tokens_v2")

# ---------- Column ----------
column = client.create_column(
    table_id=table_id,
    name="Provider Account Id",
    column_type="single_line_text",  # see TeicorClient.COLUMN_TYPES
    config={"unique": True},
    set_as_primary_key=True,
)
column_id = str(column["id"])

columns = client.list_columns(table_id)
client.update_column(
    table_id=table_id,
    column_id=column_id,
    name="Provider Account ID",
    config={"unique": True},
    set_as_primary_key=True,
)

# You can also manage PK explicitly:
client.get_primary_key(table_id)
client.update_primary_key(table_id=table_id, column_id=column_id)

# ---------- Record ----------
created = client.create_record(
    table_id=table_id,
    data={column_id: "acct_123"},
)
record_id = str(created["id"])

records = client.list_records(
    table_id=table_id,
    limit=50,
    offset=0,
    filter_expr={
        "and": [
            {
                "column_id": column_id,
                "op": "contains",  # see TeicorClient.FILTER_OPS
                "value": "acct_",
            }
        ]
    },
    sort=[{"column_id": column_id, "direction": "asc"}],
)

first = client.find_record(
    table_id=table_id,
    filter_expr={
        "and": [
            {"column_id": column_id, "op": "eq", "value": "acct_123"}
        ]
    },
)

client.update_record(
    table_id=table_id,
    record_id=record_id,
    data={column_id: "acct_456"},
)

client.delete_record(table_id=table_id, record_id=record_id)

# ---------- Bulk upsert ----------
rows = [
    {column_id: "acct_789"},
    {column_id: "acct_999"},
]

# Uses runtime bulk create with on_conflict=overwrite by default
# (upsert semantics for primary-key conflicts).
client.upsert_records_bulk(
    table_id=table_id,
    records=rows,
)

# Optional conflict policies: overwrite (default), error, keep_both
client.upsert_records_bulk(
    table_id=table_id,
    records=rows,
    conflict_policy="keep_both",
)

# ---------- Cleanup ----------
client.delete_column(table_id=table_id, column_id=column_id)
client.delete_table(table_id)
client.delete_schema("app_shopify_v2")

Query APIs in SDK

The SDK now exposes query helpers for external integrations:

  • list_queries(group_id=None, include_all=False, updated_since=None)
  • get_query(query_id)
  • execute_query(query_id)
  • get_query_data(query_id, if_newer_than_watermark=None)
  • sync_queries(...) convenience full-loop helper

sync_queries(...) returns a typed QuerySyncSummary dataclass.

Basic usage:

queries = client.list_queries(include_all=True)
query_id = str(queries[0]["id"])

definition = client.get_query(query_id)
snapshot = client.execute_query(query_id)
incremental = client.get_query_data(
    query_id,
    if_newer_than_watermark=snapshot.get("source_watermark"),
)

Updated-since two-gate sync (recommended)

Use a two-gate strategy to avoid heavy query calls when no fresh data exists.

  1. Definition gate (cheap): call list_queries(updated_since=...).
  2. Data gate (targeted): for changed query IDs, call get_query_data(if_newer_than_watermark=...).

Query Sync Steps

  1. Read persisted state:
    • last_sync_at (team-level ISO timestamp)
    • query_watermarks (map of query_id -> source_watermark)
  2. Call list_queries(include_all=True, updated_since=last_sync_at).
  3. If last_sync_at is empty (first sync), call without updated_since to fetch all query definitions once.
  4. If response is empty, stop (no heavy calls).
  5. For each returned query:
    • read previous watermark for that query
    • call get_query_data(query_id, if_newer_than_watermark=prev)
    • if payload has unchanged=True, skip downstream work
    • else process rows and persist returned source_watermark
  6. Advance last_sync_at to the max updated_at returned in step 2.

End-to-end example

from datetime import datetime, timezone

state = client.get_core_state()
last_sync_at = state.get("last_sync_at")
query_watermarks = state.get("query_watermarks") or {}

changed_queries = client.list_queries(
    include_all=True,
    updated_since=last_sync_at,
)

if not changed_queries:
    print("No query definition updates since last sync.")
else:
    latest_updated_at = last_sync_at
    for query in changed_queries:
        query_id = str(query["id"])
        previous_watermark = query_watermarks.get(query_id)

        data = client.get_query_data(
            query_id,
            if_newer_than_watermark=previous_watermark,
        )

        if data.get("unchanged") is True:
            continue

        rows = data.get("rows") or []
        # TODO: upsert rows into external system

        watermark = data.get("source_watermark")
        if isinstance(watermark, str) and watermark:
            query_watermarks[query_id] = watermark

        updated_at = query.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_updated_at is None or updated_at > latest_updated_at
        ):
            latest_updated_at = updated_at

    next_state = {
        "query_watermarks": query_watermarks,
        "last_sync_at": latest_updated_at
        or datetime.now(timezone.utc).isoformat(),
    }
    client.patch_core_state(next_state)

Query Convenience Method

def handle_query_result(query: dict, result: dict) -> None:
    if result.get("unchanged") is True:
        return
    rows = result.get("rows") or []
    # TODO: sync rows to external system

summary = client.sync_queries(
    persist_state=True,
    on_query_result=handle_query_result,
)
print(summary)

Table two-gate sync (recommended)

Table sync follows the same two-gate pattern:

  1. Table definition gate via list_tables(updated_since=...)
  2. Table data gate via get_table_sync_status(if_newer_than_watermark=...)

SDK helpers:

  • list_tables(updated_since=None)
  • get_table_sync_status(table_id, if_newer_than_watermark=None)
  • list_records(table_id=..., ...) for the actual row fetch after gates pass
  • sync_tables(...) convenience full-loop helper

sync_tables(...) returns a typed TableSyncSummary dataclass.

Table Sync Steps

  1. Read state:
    • tables_last_sync_at (team-level timestamp)
    • table_watermarks (table_id -> source_watermark)
  2. Call list_tables(updated_since=tables_last_sync_at).
  3. If tables_last_sync_at is empty (first sync), call without updated_since to discover all tables once.
  4. If empty, stop.
  5. For each changed table:
    • call get_table_sync_status(table_id, if_newer_than_watermark=...)
    • if has_changes is false, skip row fetch
    • if has_changes is true, call list_records(...) and process rows
    • persist returned source_watermark for that table
  6. Advance tables_last_sync_at to max table updated_at from step 2.

Example

state = client.get_core_state()
tables_last_sync_at = state.get("tables_last_sync_at")
table_watermarks = state.get("table_watermarks") or {}

changed_tables = client.list_tables(updated_since=tables_last_sync_at)

if changed_tables:
    latest_table_updated_at = tables_last_sync_at

    for table in changed_tables:
        table_id = str(table["id"])
        prev_watermark = table_watermarks.get(table_id)

        status = client.get_table_sync_status(
            table_id,
            if_newer_than_watermark=prev_watermark,
        )
        if not status.get("has_changes"):
            continue

        # Fetch rows only when gate 2 says table data changed.
        records = client.list_records(table_id=table_id, limit=200, offset=0)
        # TODO: sync records into external system

        next_watermark = status.get("source_watermark")
        if isinstance(next_watermark, str) and next_watermark:
            table_watermarks[table_id] = next_watermark

        updated_at = table.get("updated_at")
        if isinstance(updated_at, str) and (
            latest_table_updated_at is None
            or updated_at > latest_table_updated_at
        ):
            latest_table_updated_at = updated_at

    client.patch_core_state(
        {
            "tables_last_sync_at": latest_table_updated_at,
            "table_watermarks": table_watermarks,
        }
    )

Table Convenience Method

def handle_table_records(table: dict, records: list[dict]) -> None:
    # TODO: upsert records into external system
    pass

summary = client.sync_tables(
    records_page_size=200,
    persist_state=True,
    on_table_records=handle_table_records,
)
print(summary)

Package maintainers

Build artifacts:

python -m pip install --upgrade build
python -m build

Publish to PyPI:

python -m pip install --upgrade twine
python -m twine upload dist/*

Auth headers used

  • Authorization: Bearer <service_token>
  • X-Teicor-App-Slug: <app_slug>

Notes

  • core/state is app-scoped per team installation.
  • Runtime calls are proxied through Teicor backend.
  • Source package is in sdk/python/src/teicor_sdk.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

teicor_sdk-0.2.4.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

teicor_sdk-0.2.4-py3-none-any.whl (10.0 kB view details)

Uploaded Python 3

File details

Details for the file teicor_sdk-0.2.4.tar.gz.

File metadata

  • Download URL: teicor_sdk-0.2.4.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.2.4.tar.gz
Algorithm Hash digest
SHA256 029d9bb48b9597ad5e7c0ad0399c29d67fba32b57eefd64248beda3d3268d81d
MD5 edef57055ca8822465ed7fc8a8d60933
BLAKE2b-256 b144893eb485fe5c7bf23964c066df0c7c3521f1aa97f20c048f0c8896c0541b

See more details on using hashes here.

File details

Details for the file teicor_sdk-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: teicor_sdk-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 10.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for teicor_sdk-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 1fd228e3a966b23c2b32d2759e6cc7073379dcd8866b4fb1f4cffb8d167bd4c0
MD5 ceb0c02ae54da41b5c4c0a0e4541fd2b
BLAKE2b-256 c84e06ce23f98006b1dc8c20174974ff421058c9fc70b37af4bf67cfdd0eaa1e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page