Python SDK for Teicor external app integrations
Project description
Teicor Python SDK
For full external integration setup (install flow, auth, core/runtime interaction, and troubleshooting), see:
This SDK helps installed Teicor apps interact with:
- Direct runtime APIs via
runtime_url - Core app-scoped state in Teicor core DB
Install from PyPI
Add to your requirements.txt (pin versions):
teicor-sdk==0.3.1
Example install command from PyPI:
pip install teicor-sdk==0.3.1
Usage
from teicor_sdk import TeicorClient
client = TeicorClient(
base_url="https://api.teicor.com",
team_slug="client-a",
app_slug="shopify",
service_token="tkmkt_...",
)
context = client.get_context()
state = client.get_core_state()
state = client.patch_core_state({"cursor": "next-page-token"})
rows = client.runtime_request(method="GET", runtime_path="api/schemas")
runtime_request (and CRUD helpers that call it) uses direct runtime access:
- Reads
runtime_url/auth_urlfromget_context(). - Fetches/caches authority tokens by scope and calls runtime directly.
- On direct
401/403, refreshes token once and retries once.
SDK CRUD quickstart (schema/table/column/record)
from teicor_sdk import TeicorClient
client = TeicorClient(
base_url="https://api.teicor.com",
team_slug="client-a",
app_slug="shopify",
service_token="tkmkt_...",
)
# Context + runtime access descriptor
ctx = client.get_context()
print(ctx.runtime_url)
print(ctx.auth_url)
authority = client.get_runtime_authority_token(scope="write")
print(authority.runtime_authority_token)
print(authority.runtime_url)
print(authority.expires_in_seconds)
# ---------- Schema ----------
client.create_schema("app_shopify")
schemas = client.list_schemas()
client.update_schema("app_shopify", new_schema_name="app_shopify_v2")
# ---------- Table ----------
table = client.create_table(
name="oauth_tokens",
schema_name="app_shopify_v2",
is_private=True,
)
table_id = str(table["id"])
tables = client.list_tables()
table_details = client.get_table(table_id)
client.update_table(table_id, name="oauth_tokens_v2")
# ---------- Column ----------
column = client.create_column(
table_id=table_id,
name="Provider Account Id",
column_type="single_line_text", # see TeicorClient.COLUMN_TYPES
config={"unique": True},
set_as_primary_key=True,
)
column_id = str(column["id"])
columns = client.list_columns(table_id)
client.update_column(
table_id=table_id,
column_id=column_id,
name="Provider Account ID",
config={"unique": True},
set_as_primary_key=True,
)
pk = client.get_primary_key(table_id)
print(pk.get("column_id"))
# Switch primary key back to record-id mode.
client.update_primary_key(table_id=table_id, column_id=None)
# ---------- Record ----------
created = client.create_record(
table_id=table_id,
data={column_id: "acct_123"},
)
record_id = str(created["id"])
records = client.list_records(
table_id=table_id,
limit=50,
offset=0,
filter_expr={
"and": [
{
"column_id": column_id,
"op": "contains", # see TeicorClient.FILTER_OPS
"value": "acct_",
}
]
},
sort=[{"column_id": column_id, "direction": "asc"}],
)
first = client.find_record(
table_id=table_id,
filter_expr={
"and": [
{"column_id": column_id, "op": "eq", "value": "acct_123"}
]
},
)
client.update_record(
table_id=table_id,
record_id=record_id,
data={column_id: "acct_456"},
)
client.delete_record(table_id=table_id, record_id=record_id)
# ---------- Cleanup ----------
client.delete_column(table_id=table_id, column_id=column_id)
client.delete_table(table_id)
client.delete_schema("app_shopify_v2")
Query APIs in SDK
The SDK now exposes query helpers for external integrations:
list_queries(group_id=None, include_all=False, updated_since=None)get_query(query_id)execute_query(query_id)get_query_data(query_id, if_newer_than_watermark=None)sync_queries(...)convenience full-loop helper
sync_queries(...) returns a typed QuerySyncSummary dataclass.
Basic usage:
queries = client.list_queries(include_all=True)
query_id = str(queries[0]["id"])
definition = client.get_query(query_id)
snapshot = client.execute_query(query_id)
incremental = client.get_query_data(
query_id,
if_newer_than_watermark=snapshot.get("source_watermark"),
)
Updated-since two-gate sync (recommended)
Use a two-gate strategy to avoid heavy query calls when no fresh data exists.
- Definition gate (cheap): call
list_queries(updated_since=...). - Data gate (targeted): for changed query IDs, call
get_query_data(if_newer_than_watermark=...).
Query Sync Steps
- Read persisted state:
last_sync_at(team-level ISO timestamp)query_watermarks(map of query_id -> source_watermark)
- Call
list_queries(include_all=True, updated_since=last_sync_at). - If
last_sync_atis empty (first sync), call withoutupdated_sinceto fetch all query definitions once. - If response is empty, stop (no heavy calls).
- For each returned query:
- read previous watermark for that query
- call
get_query_data(query_id, if_newer_than_watermark=prev) - if payload has
unchanged=True, skip downstream work - else process rows and persist returned
source_watermark
- Advance
last_sync_atto the maxupdated_atreturned in step 2.
End-to-end example
from datetime import datetime, timezone
state = client.get_core_state()
last_sync_at = state.get("last_sync_at")
query_watermarks = state.get("query_watermarks") or {}
changed_queries = client.list_queries(
include_all=True,
updated_since=last_sync_at,
)
if not changed_queries:
print("No query definition updates since last sync.")
else:
latest_updated_at = last_sync_at
for query in changed_queries:
query_id = str(query["id"])
previous_watermark = query_watermarks.get(query_id)
data = client.get_query_data(
query_id,
if_newer_than_watermark=previous_watermark,
)
if data.get("unchanged") is True:
continue
rows = data.get("rows") or []
# TODO: upsert rows into external system
watermark = data.get("source_watermark")
if isinstance(watermark, str) and watermark:
query_watermarks[query_id] = watermark
updated_at = query.get("updated_at")
if isinstance(updated_at, str) and (
latest_updated_at is None or updated_at > latest_updated_at
):
latest_updated_at = updated_at
next_state = {
"query_watermarks": query_watermarks,
"last_sync_at": latest_updated_at
or datetime.now(timezone.utc).isoformat(),
}
client.patch_core_state(next_state)
Query Convenience Method
def handle_query_result(query: dict, result: dict) -> None:
if result.get("unchanged") is True:
return
rows = result.get("rows") or []
# TODO: sync rows to external system
summary = client.sync_queries(
persist_state=True,
on_query_result=handle_query_result,
)
print(summary)
Reliable large payload sync (recommended)
For large writes (for example 70,000+ rows), use chunked sync with retry/backoff:
from teicor_sdk import TeicorClient
client = TeicorClient(
base_url="https://api.teicor.com",
team_slug="client-a",
app_slug="shopify",
service_token="tkmkt_...",
)
def on_chunk(result, response):
print(
f"chunk={result.chunk_index + 1} "
f"size={result.chunk_size} attempts={result.attempts}"
)
summary = client.sync_records_bulk(
table_id="<table-id>",
records=records, # list[dict]
on_conflict="overwrite", # idempotent upsert behavior
chunk_size=500, # runtime default max
max_retries=5,
retry_base_seconds=0.5,
retry_max_seconds=8.0,
retry_jitter_ratio=0.2,
on_chunk_complete=on_chunk,
)
print(summary)
Resume from core-state checkpoint after interruption:
checkpoint_state = client.get_bulk_sync_checkpoint()
def save_checkpoint(checkpoint):
client.save_bulk_sync_checkpoint(checkpoint=checkpoint)
summary = client.sync_records_bulk(
table_id="<table-id>",
records=records,
chunk_size=500,
start_chunk_index=client.parse_bulk_sync_checkpoint(checkpoint_state),
on_checkpoint=save_checkpoint,
)
if summary.records_processed == summary.total_records:
client.clear_bulk_sync_checkpoint()
Single-call convenience wrapper (load/save/clear checkpoint automatically):
summary = client.sync_records_bulk_with_core_checkpoint(
table_id="<table-id>",
records=records,
chunk_size=500,
on_conflict="overwrite",
)
Notes:
- Keep deterministic unique/primary-key values so retries are safe.
- Keep chunk size at or below runtime max bulk setting (default
500). - Use summary/chunk callbacks to checkpoint progress in your app.
Table two-gate sync (recommended)
Table sync follows the same two-gate pattern:
- Table definition gate via
list_tables(updated_since=...) - Table data gate via
get_table_sync_status(if_newer_than_watermark=...)
SDK helpers:
list_tables(updated_since=None)get_table_sync_status(table_id, if_newer_than_watermark=None)list_records(table_id=..., ...)for the actual row fetch after gates passsync_tables(...)convenience full-loop helper
sync_tables(...) returns a typed TableSyncSummary dataclass.
Table Sync Steps
- Read state:
tables_last_sync_at(team-level timestamp)table_watermarks(table_id -> source_watermark)
- Call
list_tables(updated_since=tables_last_sync_at). - If
tables_last_sync_atis empty (first sync), call withoutupdated_sinceto discover all tables once. - If empty, stop.
- For each changed table:
- call
get_table_sync_status(table_id, if_newer_than_watermark=...) - if
has_changesis false, skip row fetch - if
has_changesis true, calllist_records(...)and process rows - persist returned
source_watermarkfor that table
- call
- Advance
tables_last_sync_atto max tableupdated_atfrom step 2.
Example
state = client.get_core_state()
tables_last_sync_at = state.get("tables_last_sync_at")
table_watermarks = state.get("table_watermarks") or {}
changed_tables = client.list_tables(updated_since=tables_last_sync_at)
if changed_tables:
latest_table_updated_at = tables_last_sync_at
for table in changed_tables:
table_id = str(table["id"])
prev_watermark = table_watermarks.get(table_id)
status = client.get_table_sync_status(
table_id,
if_newer_than_watermark=prev_watermark,
)
if not status.get("has_changes"):
continue
# Fetch rows only when gate 2 says table data changed.
records = client.list_records(table_id=table_id, limit=200, offset=0)
# TODO: sync records into external system
next_watermark = status.get("source_watermark")
if isinstance(next_watermark, str) and next_watermark:
table_watermarks[table_id] = next_watermark
updated_at = table.get("updated_at")
if isinstance(updated_at, str) and (
latest_table_updated_at is None
or updated_at > latest_table_updated_at
):
latest_table_updated_at = updated_at
client.patch_core_state(
{
"tables_last_sync_at": latest_table_updated_at,
"table_watermarks": table_watermarks,
}
)
Table Convenience Method
def handle_table_records(table: dict, records: list[dict]) -> None:
# TODO: upsert records into external system
pass
summary = client.sync_tables(
records_page_size=200,
persist_state=True,
on_table_records=handle_table_records,
)
print(summary)
Package maintainers
Build artifacts:
python -m pip install --upgrade build
python -m build
Publish to PyPI:
python -m pip install --upgrade twine
python -m twine upload dist/*
Auth headers used
Authorization: Bearer <service_token>X-Teicor-App-Slug: <app_slug>
Notes
core/stateis app-scoped per team installation.- Runtime calls use direct runtime URLs with short-lived authority tokens.
- Source package is in sdk/python/src/teicor_sdk.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file teicor_sdk-0.3.1.tar.gz.
File metadata
- Download URL: teicor_sdk-0.3.1.tar.gz
- Upload date:
- Size: 18.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
22b7ff1fe8645bd0e76e70a05a46bb1a4486de2b88311fe1be770c9be741e528
|
|
| MD5 |
394e4279aeaafed6387bef58e5e620ab
|
|
| BLAKE2b-256 |
15d017383c8d1c4bd0b8ac5a66e9d48e53e79d4c57fb758ae73d6f5b65d2495d
|
File details
Details for the file teicor_sdk-0.3.1-py3-none-any.whl.
File metadata
- Download URL: teicor_sdk-0.3.1-py3-none-any.whl
- Upload date:
- Size: 13.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a47c0f3d601e225351ff56672bbc73f696a689e77c3c0861b5c536d7caa959d0
|
|
| MD5 |
1d001876a2a61a4b263e5bf771a9a670
|
|
| BLAKE2b-256 |
d83e27bca6889a2b806b53891931eb068ae86bd1f7641b848b95696e040e4255
|