Lightweight REST client for the DeltaCAT API server.
Project description
DeltaCAT Client
deltacat-client is the Python REST client for DeltaCAT.
It provides a lightweight way to interact with a DeltaCAT API server from external applications, workers, agents, and orchestration systems without installing the full DeltaCAT storage, compute, or server runtime stack.
Overview
The client is organized around a small root Client object with
resource-oriented subclients:
client.jobs— job submission, claiming, lifecycleclient.catalog— namespace/table CRUD, read planning, data placementclient.subscriptions— incremental data processing pipelinesclient.publications— crawler and data publishing workflowsclient.transforms— source-to-sink data transformationsclient.pipelines— multi-stage pipeline orchestration
This Client facade wraps the lower-level REST bindings generated from
DeltaCAT's OpenAPI schema.
Installation
uv pip install deltacat-client
deltacat-client depends on the shared deltacat-io-core package for local
plan execution and local file materialization. You normally install the client
extras below rather than installing deltacat-io-core directly.
Package boundary:
deltacat-clientdepends ondeltacat-io-coredeltacat-clientanddeltacat-io-coreare intended to work without the thickdeltacatpackage being installed- use the thick
deltacatpackage only when you explicitly want the native catalog/storage/compute runtime or want to execute thin plans through the thick public API
Naming convention:
- package/distribution name:
deltacat-io-core - Python import module:
deltacat_io_core
Optional extras:
deltacat-client[io]for local plan execution and local file-based read/write helpersdeltacat-client[pandas]for Pandas dataframe input toclient.catalog.write(...)deltacat-client[polars]for Polars dataframe input and AVRO materializationdeltacat-client[lance]for local Lance dataset read/write supportdeltacat-client[all]for the full client-side read/write UX stack
High-level client.catalog.write(...) supports:
- PyArrow tables
- Pandas DataFrames
- Polars DataFrames
- Daft DataFrames
- NumPy arrays
- Ray Datasets
- local Parquet / CSV / TSV / PSV / Feather / JSON / ORC / AVRO files
- local Lance dataset directories
Quick Start
First-Run Setup
Before using the client, ensure a DeltaCAT API server is running. For local development:
pip install deltacat
python -m deltacat.server --catalog-root /tmp/my-catalog --transport http --port 8080
For an existing production server, get the base URL from your team
(e.g., https://deltacat-api.example.com).
Connect and Explore
from deltacat_client import Client
# Local development
client = Client("http://localhost:8080")
# Production (with auth)
client = Client(
"https://deltacat-api.example.com",
catalog="robotics-prod",
user_id="researcher@example.com",
bearer_token="...",
)
# Discover what's available
catalogs = client.catalog.list_catalogs()
namespaces = client.catalog.list_namespaces()
tables = client.catalog.list_tables(namespace="robotics")
# Inspect a table
desc = client.catalog.describe_table(namespace="robotics", table="episodes")
print(f"Schema: {desc.schema}")
print(f"Rows: {desc.total_records}")
Read Data
# Direct read: the client plans and executes inline
pandas_df = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
filter_predicate={"eq": ["task", "pick_screwdriver"]},
limit=5000,
offset=0,
)
# Advanced: build a reusable read plan when you want to inspect or reuse it
plan = client.catalog.plan(namespace="robotics", table="episodes")
print(f"Files: {len(plan.all_files())}")
print(f"Partitions: {len(plan.partition_summaries())}")
# Execute the reusable plan directly through the client
reused_df = client.catalog.read(plan=plan, read_as="pandas")
# The thick deltacat package can also execute the same thin plan
import deltacat as dc
dataset = dc.read_table("episodes", namespace="robotics", plan=plan)
For schemaless tables, client.catalog.read(...) follows the same contract as
DeltaCAT itself and returns a flattened manifest table rather than the file
contents. The manifest rows include file paths and metadata such as content
type, sizes, and record counts. Materializing actual rows from a schemaless
table remains an explicit second step via
client.catalog.from_manifest_table(...).
Write Data
import pyarrow as pa
# High-level write: the client manages staging and commit internally
result = client.catalog.write(
pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]}),
namespace="robotics",
table="predictions",
mode="add",
format="parquet",
)
Transactions
Within a transaction context, supported catalog operations bind to the active
transaction automatically. You only need to pass transaction=... when you
want to override the ambient context explicitly.
import pyarrow as pa
with client.transaction(commit_message="Create and backfill predictions") as tx:
client.catalog.create_namespace(namespace="robotics_staging")
client.catalog.create_table(
namespace="robotics_staging",
table="predictions",
schema_def=[
{"name": "episode_id", "type": "int64", "is_merge_key": True},
{"name": "score", "type": "float64"},
],
content_type="parquet",
auto_create_namespace=False,
)
client.catalog.write(
pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]}),
namespace="robotics_staging",
table="predictions",
mode="add",
format="parquet",
)
preview = client.catalog.read(
namespace="robotics_staging",
table="predictions",
read_as="pandas",
)
The transaction context manager heartbeats while it is open, commits on normal exit, and aborts on exception.
Use an explicit transaction handle when you want to control commit and abort yourself:
import pyarrow as pa
tx = client.transaction(commit_message="Backfill prediction scores")
try:
client.catalog.write(
pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]}),
namespace="robotics",
table="predictions",
mode="add",
format="parquet",
transaction=tx,
)
preview = client.catalog.read(
namespace="robotics",
table="predictions",
read_as="pandas",
transaction=tx,
)
tx.commit()
except Exception:
tx.abort(reason="backfill failed")
raise
Historic transactions are also supported for time-travel reads:
checkpoint_ns = 1712697600000000000
with client.transaction(as_of=checkpoint_ns) as historic_tx:
historic_df = client.catalog.read(
namespace="robotics",
table="predictions",
read_as="pandas",
transaction=historic_tx,
)
An as_of transaction reads a historical snapshot and is read-only. Write
operations against a historic transaction are rejected.
Transaction-aware catalog APIs include:
- namespace operations:
create_namespace(...)get_namespace(...)namespace_exists(...)alter_namespace(...)drop_namespace(..., purge=False)list_namespaces(...)
- table operations:
create_table(...)describe_table(...)table_exists(...)alter_table(...)rename_table(...)drop_table(..., purge=False)list_tables(...)list_table_versions(...)
- data operations:
plan(...)read(...)stage(...)write(...)commit(...)
Transaction-bound mutation results include transaction_id and
transaction_version so callers can observe the authoritative server-side
version when needed.
Fetched transactions and write sessions remain bound to their originating catalog and transaction metadata:
tx = client.transaction(catalog="robotics-prod", commit_message="stage files")
session = client.catalog.stage(
namespace="robotics",
table="predictions",
mode="merge",
format="parquet",
transaction=tx,
)
fetched_session = client.catalog.get_write_session(session.session_id)
fetched_tx = client.catalog.get_transaction(tx.id)
attached = fetched_session.commit(
entries=[{"path": "/tmp/output.parquet", "record_count": 1024}]
)
latest_tx = fetched_tx.get()
Important transaction rules:
- within
with client.transaction(...):, supported catalog operations bind to the ambient transaction automatically - explicit
transaction=...overrides the ambient transaction, but it must belong to the sameClientinstance catalog=...is not allowed when an active transaction context is already in scope- fetched write sessions and fetched transactions keep their bound catalog; a
conflicting explicit
catalog=...is rejected - some catalog APIs intentionally remain non-transactional and raise if called
inside a transaction context, including
write_data(...),truncate_table(...),rollback_table(...), placement APIs, retention, and langolier / compaction status helpers
Typical next steps after discovery are:
- inspect a specific table with
client.catalog.describe_table(...) - read directly with
client.catalog.read(...) - build a read plan with
client.catalog.plan(...)when you want to inspect, page, cache, or reuse it - create or manage orchestration resources such as subscriptions and pipelines
When used as a context manager, client.transaction(...) automatically
heartbeats in the background while the transaction is open and idle.
Likewise, a standalone WriteSession returned by client.catalog.stage(...)
can be used as a context manager to auto-heartbeat while files are being
written, and it will abort the session on exception.
When you want an explicit reusable plan, plan(...) supports:
filter_predicate: structured JSON AST pushed into DeltaCAT partition pruning, file-stat pruning, and Polars row filteringmax_files/file_offset: page very large plans instead of returning every file path in one responseinclude_column_stats: opt in only when the client actually needs per-file statistics
client.catalog.read(...) returns a PyArrow table by default and also accepts:
read_as="pandas"read_as="polars"read_as="numpy"read_as="lance"for a lazy Lance dataset when the plan resolves to a single Lance dataset pathread_as="pyarrow_parquet"for a lazypyarrow.parquet.ParquetFilewhen the plan resolves to a single Parquet fileread_as="daft"read_as="ray_dataset"
For schemaless tables, those supported read_as conversions apply to the
flattened manifest table, not to the underlying file contents.
Use client.catalog.from_manifest_table(...) when you explicitly want to
materialize the referenced files into a structured dataset.
Supported Orchestration Path
The DeltaCAT client is the supported control plane for running managed work.
client.jobs.submit_compaction(...)client.jobs.submit_langolier(...)client.jobs.submit_retention(...)client.jobs.submit_gc_scan(...)client.jobs.submit_gc_full(...)client.jobs.submit_gc_outbox(...)client.jobs.submit_gc_archive(...)client.jobs.submit_gc_purge(...)client.jobs.submit_gc_restore(...)client.jobs.recover_pending_delete(...)client.jobs.audit_pending_delete(...)client.jobs.get_gc_stats(...)client.jobs.list_gc_exempt(...)client.jobs.submit_janitor(...)client.publications.create_crawler(...)+client.publications.run(...)client.transforms.create(...)withDiffSubscriber+client.transforms.run(...)ortrigger(...)client.catalog.place(...)+client.catalog.replication_status(...)+client.catalog.unplace(...)
Managed Workflows
Dispatch modes
Use DispatchMode for dispatch_mode: LOCAL (run on the API host), CUSTOM (enqueue a job for external workers-the usual production choice), or OSMO (NVIDIA OSMO). For CUSTOM, workers pull work with client.jobs.claim(). See Subscriptions, publications, transforms, and pipelines for the full lifecycle.
Periodic triggers
Trigger configuration is part of each subscription, publication, and transform.
Create or update each subscription, publication, or transform with:
trigger={"mode": ...}trigger={"mode": "schedule", "schedule": {...}}for scheduled resources
Schedule shapes are strict:
{"kind": "interval", "interval_seconds": N}only acceptsinterval_seconds{"kind": "cron", "cron": "...", "timezone": "Area/City"}acceptscronplus optionaltimezone- mixed interval/cron fields are rejected instead of being silently ignored
Use the trigger types as follows:
manual: never auto-trigger; onlyrun(...)/trigger(...)starts workevent: auto-trigger on upstream table updates (subscriptions and transforms)schedule: auto-trigger on the server-managed periodic schedulercustom: publication-only; intended for external orchestrators
Defaults:
- subscriptions default to
event - transforms default to
event - root publications default to
manual
The API server runs the built-in pipeline schedule manager. Scheduled subscriptions, publications, and transforms do not require scheduler enable flags on the server, and each resource controls its own schedule independently. Scheduled work uses the same durable job path as manual triggers, so DispatchMode.CUSTOM produces claimable jobs for external workers.
You can change trigger policy after creation:
client.subscriptions.update(
"events-sub",
trigger={
"mode": "schedule",
"schedule": {"kind": "interval", "interval_seconds": 300},
},
)
client.publications.update(
"crawl-root",
trigger={
"mode": "schedule",
"schedule": {
"kind": "cron",
"cron": "30 8,12 * * *",
"timezone": "America/Los_Angeles",
},
},
)
client.transforms.update(
"events-diff",
trigger={"mode": "manual"},
)
Auth for trigger-policy changes follows the resource auth model:
- subscriptions: creator, managers, or ADMIN
- publications: creator, managers, or ADMIN
- transforms: creator, managers, or ADMIN
The annotations example below uses a scheduled root crawl publication plus event transforms so each crawl completion updates the catalog and downstream stages fire automatically.
Crawl as a Publication
Use a crawler publication when the crawl should be a named, managed DeltaCAT workflow:
from deltacat_client import DispatchMode
pub = client.publications.create_crawler(
publication_id="crawl-openstack-swift-primary",
name="OpenStack Swift PRIMARY Crawl",
sink_namespace="crawled",
sink_table="openstack_swift_primary",
crawl_params={
"seed_paths": [{"path": "s3://data-home"}],
"filesystem_id": "openstack_swift_fs",
"storage_backend": "openstack_swift",
"crawl_strategy": "adaptive",
"max_parallelism": 32,
},
dispatch_mode=DispatchMode.CUSTOM,
)
run = client.publications.run(pub.publication_id)
print(run.success, run.job_id if hasattr(run, "job_id") else None)
The declared publication sink is authoritative. The client/server force the
crawler to write to sink_namespace.sink_table, even if the embedded
crawl_params contains a different namespace or table name.
Diff as a Transform
Use a first-class transform when the diff should be part of a managed CDC pipeline:
import json
from deltacat_client import DispatchMode
xform = client.transforms.create(
transform_id="openstack-swift-primary-diff",
name="OpenStack Swift PRIMARY Diff",
source_tables=[{"namespace": "crawled", "table": "openstack_swift_primary"}],
sink_tables=[{"namespace": "changes", "table": "openstack_swift_primary_events"}],
subscriber_class=(
"deltacat.compute.pipelines.subscriber.diff_subscriber.DiffSubscriber"
),
subscriber_mode="version",
config_json=json.dumps(
{
"subscriber_kwargs": {
"primary_key": "file_path",
"version_key": "modified_at",
}
}
),
dispatch_mode=DispatchMode.CUSTOM,
)
run = client.transforms.run(xform.transform_id)
print(run.success)
Maintenance Jobs
Use client.jobs for maintenance and operational workflows:
client.jobs.submit_gc_scan(
seed_paths=["s3://bucket-a", "s3://bucket-b"],
filesystem_id="amlfs01",
max_depth=5,
)
client.jobs.submit_gc_full(
pending_delete_root="s3://bucket/pending-delete",
dry_run=True,
source_types=["gc_scanner"],
)
client.jobs.submit_gc_outbox(
outbox_root="s3://bucket/outbox",
dry_run=True,
)
client.jobs.submit_gc_purge(
pending_delete_root="s3://bucket/pending-delete",
dry_run=True,
)
recoverable = client.jobs.recover_pending_delete(
pending_delete_root="s3://bucket/pending-delete",
)
audit = client.jobs.audit_pending_delete(
pending_delete_root="s3://bucket/pending-delete",
)
stats = client.jobs.get_gc_stats(filesystem_id="amlfs01")
exempt_dirs = client.jobs.list_gc_exempt()
client.jobs.submit_langolier(namespace="robotics", table="episodes")
client.jobs.submit_retention(namespace="robotics", table="episodes")
client.jobs.submit_janitor(dry_run=True)
AI Agent / MCP Integration
DeltaCAT exposes an MCP (Model Context Protocol) server that enables AI agents like Claude, Cursor, and other LLM-powered tools to interact with the catalog through natural language. The MCP server exposes 46+ tools for catalog management, data operations, and pipeline orchestration.
Starting the MCP Server
# For Claude Code / Cursor:
claude mcp add deltacat -- python -m deltacat.server --catalog-root /path/to/catalog
# For HTTP-based MCP clients:
python -m deltacat.server --catalog-root /path/to/catalog --transport http --port 8080
Key MCP Tools for Agents
Discovery and inspection:
| Tool | Purpose |
|---|---|
deltacat_list_catalogs |
List available catalogs |
deltacat_list_namespaces |
List namespaces in a catalog |
deltacat_list_tables |
Discover tables in a namespace |
deltacat_describe_table |
Inspect schema, row count, size, properties |
deltacat_list_table_versions |
List version history for a table |
deltacat_preview_data |
Bounded server-side data sample |
deltacat_guide |
Get contextual help about DeltaCAT concepts |
Catalog management:
| Tool | Purpose |
|---|---|
deltacat_create_namespace |
Create a new namespace |
deltacat_create_table |
Create a table with schema, properties, content type |
deltacat_alter_table |
Update table properties or description |
deltacat_rename_table |
Rename a table |
deltacat_drop_table |
Delete a table (with optional data purge) |
deltacat_drop_namespace |
Delete a namespace |
deltacat_rollback |
Restore a table to a prior point in time |
deltacat_truncate_table |
Remove all data from a table |
Reading and writing data:
| Tool | Purpose |
|---|---|
deltacat_plan_read |
Build a read plan for client-side execution |
deltacat_write_data |
Small inline writes (< 10k rows) |
deltacat_stage_write |
Start a staged write session (large writes) |
deltacat_commit_write |
Commit staged files to the catalog |
Placement and replication:
| Tool | Purpose |
|---|---|
deltacat_manage_placement |
Set, query, or remove data placement policies across roots |
Pipelines and orchestration:
| Tool | Purpose |
|---|---|
deltacat_create_subscription |
Set up incremental data processing |
deltacat_create_transform |
Define a data transformation |
deltacat_create_publication |
Define sink tables |
deltacat_create_pipeline |
Group pipeline nodes into a named DAG |
deltacat_trigger_subscription |
Trigger async processing |
deltacat_run_subscription |
Run processing synchronously |
Jobs and workers:
| Tool | Purpose |
|---|---|
deltacat_claim_job |
Claim a pending job (with worker tag affinity) |
deltacat_submit_preprocessing_job |
Submit a PackDS preprocessing job |
Example: Building the Annotations Pipeline via MCP
An AI agent can build a complete annotations pipeline described in lab design docs using MCP tools alone:
-
Create the raw annotations table using
deltacat_stage_write+deltacat_commit_write(the agent writes schema-conformant Parquet files) -
Create a crawler subscription to watch for new annotation files:
deltacat_create_subscription( subscriber_id="annotations-crawler", source_tables=[{"namespace": "vision", "table": "crawler_root"}], subscriber_type="custom", subscriber_mode="version", dispatch_mode="custom", dispatch_config='{"required_worker_tags": ["cpu"]}' ) -
Create a diff subscription for change detection:
deltacat_create_subscription( subscriber_id="annotations-diff", source_tables=[{"namespace": "vision", "table": "crawler_output"}], subscriber_type="diff", primary_key="episode_id", version_key="modified_timestamp_ns" ) -
Create a transform for curated output:
deltacat_create_transform( transform_id="annotations-curate", name="Curate Annotations", source_tables=[{"namespace": "vision", "table": "annotations_diff"}], sink_tables=[{"namespace": "vision", "table": "annotations"}], select_columns=["episode_id", "key", "value", "start_time_ns", "end_time_ns"] ) -
Wire into a pipeline and trigger:
deltacat_create_pipeline( pipeline_id="annotations-pipeline", name="Annotations Pipeline", node_ids=["annotations-curate"] ) deltacat_trigger_subscription("annotations-crawler")
For the complete MCP tool reference and configuration guide, see deltacat/docs/mcp/README.md.
Common Workflows
Catalog Management
Create, inspect, and manage namespaces and tables.
from deltacat_client import Client
client = Client("http://localhost:8080")
# --- Namespaces ---
# Create a namespace
client.catalog.create_namespace(namespace="robotics")
# Check existence
if client.catalog.namespace_exists(namespace="robotics"):
print("Namespace exists")
# Get namespace details
ns = client.catalog.get_namespace(namespace="robotics")
print(ns.properties)
# Update namespace properties
client.catalog.alter_namespace(
namespace="robotics",
properties={"team": "manipulation", "env": "prod"},
)
# --- Tables ---
# Create a table with properties
client.catalog.create_table(
namespace="robotics",
table="episodes",
content_type="parquet",
table_properties={"owner": "alice", "retention_days": "90"},
)
# Check existence
if client.catalog.table_exists(namespace="robotics", table="episodes"):
print("Table exists")
# Inspect a table
desc = client.catalog.describe_table(namespace="robotics", table="episodes")
print(f"Schema: {desc.schema}")
print(f"Rows: {desc.total_records}")
print(f"Properties: {desc.properties}")
# Update table properties
client.catalog.alter_table(
namespace="robotics",
table="episodes",
table_properties={"retention_days": "180"},
)
# List table version history
versions = client.catalog.list_table_versions(
namespace="robotics", table="episodes",
)
for v in versions:
print(f"Version: {v.version}")
# Rename a table
client.catalog.rename_table(
namespace="robotics",
table="episodes",
new_name="episodes_v1",
)
# Rollback to a prior point in time (nanosecond timestamp)
client.catalog.rollback_table(
namespace="robotics",
table="episodes_v1",
as_of=1711497600000000000, # nanosecond epoch
)
# Truncate all data (keeps table metadata)
client.catalog.truncate_table(namespace="robotics", table="episodes_v1")
# Drop a table (purge=True also deletes data files)
client.catalog.drop_table(namespace="robotics", table="episodes_v1", purge=True)
# Drop a namespace
client.catalog.drop_namespace(namespace="robotics", purge=True)
Data Operations
Direct reads, optional read plans, inline writes, retention, and compaction status.
# Read directly through the client
dataset = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
)
# Build a reusable plan when you want file-level metadata
plan = client.catalog.plan(namespace="robotics", table="episodes")
print(f"Files: {plan.total_files}, Records: {plan.total_records}")
# The full deltacat package can also execute the same thin plan
import deltacat as dc
thick_dataset = dc.read_table("episodes", namespace="robotics", plan=plan)
# Small inline write (< 10k rows, agent-generated data)
client.catalog.write_data(
namespace="robotics",
table="predictions",
data=[{"episode_id": 1, "score": 0.95}],
)
# Check compaction status
status = client.catalog.compaction_status(
namespace="robotics", table="episodes",
)
# Run retention cleanup (dry_run=True previews without deleting)
result = client.catalog.langolier_cleanup(
namespace="robotics",
table="episodes",
dry_run=True,
)
# Get effective retention policy
policy = client.catalog.get_retention_policy(
namespace="robotics", table="episodes",
)
Auth
Inspect your identity and manage permissions.
# Check your identity
me = client.auth.whoami()
print(f"User: {me.user_id}, Auth configured: {me.auth_configured}")
# Grant a role (admin only)
client.auth.grant(user_id="bob@example.com", role="WRITER")
# Grant scoped to a namespace
client.auth.grant(
user_id="bob@example.com",
role="READER",
resource_type="namespace",
resource_name="robotics",
)
# List grants
grants = client.auth.list_grants(user_id="bob@example.com")
for g in grants:
print(f"{g.user_id}: {g.role} on {g.scope_type}/{g.scope_name}")
# The convenience facade returns only grant items. The raw REST API may also
# include informational metadata when auth is not configured.
# Revoke
client.auth.revoke(
user_id="bob@example.com",
resource_type="namespace",
resource_name="robotics",
)
Jobs
This workflow is intended for external workers. A worker claims a queued job, keeps the job alive with heartbeats while processing, and reports the result back to the DeltaCAT API server.
from deltacat_client import Client
client = Client(
"https://deltacat-api.example.com",
worker_id="gpu-01",
worker_tags=["gpu", "arm64"],
bearer_token="worker-token-1", # Required in Direct Auth mode
)
job = client.jobs.claim()
if job is not None:
# The claim response includes a job_token (when DELTACAT_JOB_SECRET is set
# on the server). The client passes it automatically on subsequent calls.
print(f"Claimed job {job.job_id}, token: {job.job_token}")
# Keep the job lease alive while processing.
client.jobs.start_heartbeat(job.job_id, interval_seconds=30)
# Read source files from the job context, perform work, then report success.
# Source-consuming jobs must include the updated watermark on completion.
client.jobs.complete(
job.job_id,
records_processed=50000,
watermark={"partition_watermarks": {"slot-a": 123}, "known_partitions": ["slot-a"]},
)
If you need a blocking poller instead of a background heartbeat loop, use
client.jobs.wait(job_id).
For richer progress inspection, use client.jobs.get_progress(job_id).
The response separates worker heartbeat progress from structured server-side
progress events:
progress = client.jobs.get_progress(job.job_id)
print(progress.progress_fraction) # 0.5
print(progress.progress_message) # "halfway"
print(progress.progress_events) # {"source_node_run": JobProgressEntry(...)}
How jobs are created
Workers do not create jobs themselves. A job is the universal work primitive in DeltaCAT: a typed work unit with a claim/heartbeat/complete lifecycle. Jobs enter the shared claim pool through two submission paths:
Pipeline-scheduled jobs (declarative, incremental): Subscriptions, transforms, and publications define repeatable workflows that watch for data changes and process them incrementally. When a pipeline action (trigger, run, redrive) fires, it creates a job. Crawler, diff, relay, and custom subscriber jobs all use this path.
Ad-hoc submitted jobs (imperative, one-shot):
submit_preprocessing_job() creates a one-shot job for a specific
snapshot with specific parameters. Episode index builds are auto-
submitted on PackDS table commits. Future system operations (compaction,
GC, langolier) will also use ad-hoc submission.
Both paths produce jobs in the same claim pool. Workers see all pending jobs and claim based on worker tags and dispatch config. The worker-side lifecycle is always the same: claim, heartbeat, process, then complete or fail.
Target architecture: Jobs are the foundational primitive. Pipelines are orchestration over jobs. Every work unit -- whether from a pipeline trigger, an ad-hoc submit, a compaction dispatch, or a GC scan -- is a job with full lifecycle tracking. Some execution paths bypass the job store, including inline local dispatch and fire-and-forget queue messages.
Routing jobs to the right workers
For DispatchMode.CUSTOM, set dispatch_config when you
create the resource and pass matching tags when workers claim jobs. The
job system uses a shared claim pool, so these filters are the main way to
separate CPU-only work from GPU-heavy work.
import json
from deltacat_client import DispatchMode
transform = client.transforms.create(
transform_id="annotations-transform",
name="Annotations Transform",
source_tables=[{"namespace": "vision", "table": "annotations_raw"}],
sink_tables=[{"namespace": "vision", "table": "annotations_curated"}],
subscriber_class="example.transforms.AnnotationTransform",
dispatch_mode=DispatchMode.CUSTOM,
dispatch_config=json.dumps(
{
"required_worker_tags": ["gpu", "arm64"],
"allowed_worker_ids": ["trainer-usw2-pod-17"],
}
),
)
job = client.jobs.claim(worker_tags=["gpu", "arm64"])
Supported affinity keys today:
required_worker_tagsallowed_worker_ids
Job Store Durability
The server defaults to a durable job store (DELTACAT_JOB_STORE_BACKEND=deltacat)
that persists job state across process restarts and supports multi-instance HA
behind a load balancer. Jobs go through five states: PENDING, RUNNING,
FINALIZING, COMPLETED, FAILED. The FINALIZING state is a crash-recovery
checkpoint for server-side catalog mutations after worker completion.
Custom worker jobs use a shared claim pool with required_worker_tags and
allowed_worker_ids as routing filters. This is sufficient for the initial
deployment but is not a full multi-queue scheduler.
Choosing a worker_id
worker_id is a stable identifier for the worker process claiming jobs. It is
not pre-registered with the server and it does not reserve work ahead of time.
A good value is something stable for the lifetime of the worker, such as:
- a Kubernetes pod name
- a VM or instance identifier
- a hostname plus process role
- a generated UUID persisted for the life of the process
Example:
client = Client(
"https://deltacat-api.example.com",
worker_id="trainer-usw2-pod-17",
)
Can multiple workers use the same worker_id?
They should not.
Jobs are not associated with any worker until they are claimed. After a worker
claims a job, the server records the worker_id and expects subsequent
heartbeats, completion, and failure reports to come from that same identifier.
Using the same worker_id from multiple concurrent workers makes ownership and
recovery behavior ambiguous.
Job Authentication
When auth is enabled on the server, claiming a job requires WRITER permission.
In the default Direct Auth mode, workers satisfy that with a bearer token. In
Trusted Proxy mode, the upstream proxy supplies the authenticated identity
header and the client does not need to send bearer_token. The claim response
includes a per-job HMAC token (job_token) that is required on all subsequent
heartbeat, complete, and fail calls. The client handles this automatically:
the token is stored on the ClaimedJob object and passed as an X-Job-Token
header by the transport.
Tokens are bound to the specific claim instance (job_id, worker_id, claimed_at)
and auto-invalidate if the job is reclaimed (e.g., after heartbeat expiry).
client = Client(
"https://deltacat-api.example.com",
worker_id="gpu-01",
bearer_token="worker-token-1", # Authenticates the claim
)
job = client.jobs.claim()
# job.job_token is automatically passed on heartbeat/complete/fail
Current limitation: no generic query-job submit API
deltacat-client does not yet expose a first-class submit_query_job() or
submit_dataframe_script() API. Supported paths are:
plan(...)for scalable reads that your code executes client-sidestage(...)+commit(...)for scalable writes- subscriptions/publications/transforms/pipelines for repeatable managed flows
If you need to run arbitrary remote Python or dataframe code on a managed cluster, use a dedicated server-side job contract.
Catalog Read Planning
plan is the scalable path for reading table data. It resolves metadata
server-side and returns a typed Plan describing the files, schema, and
partition layout needed to execute the read client-side.
from deltacat_client import Client
client = Client("https://deltacat-api.example.com", catalog="robotics-prod")
plan = client.catalog.plan(
namespace="robotics",
table="episodes",
filter_predicate={
"and": [
{"eq": ["task", "pick_screwdriver"]},
{"gte": ["episode_length", 32]},
]
},
limit=10000,
include_stats=False,
)
all_files = plan.all_files()
image_files = plan.files_with_extension(".jpg", ".png")
partitions = plan.partition_summaries()
The structured filter_predicate format is a JSON AST. Supported operators:
eq,neq,gt,gte,lt,lteinbetweenand,or,notis_null
Examples:
{"eq": ["task", "pick"]}
{"in": ["embodiment", ["gr1", "franka"]]}
{"between": ["episode_length", 32, 128]}
{"and": [{"eq": ["task", "pick"]}, {"gte": ["success_rate", 0.9]}]}
Most client code should call client.catalog.read(...) directly and let the
client generate the plan inline:
from deltacat_client import Client
client = Client("https://deltacat-api.example.com", catalog="robotics-prod")
dataset = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
)
This path plans and executes through the shared deltacat-io-core
execution layer, including MOR, lazy Parquet/Lance reads, schema alignment,
sort-aware ordering, and distributed reader outputs where supported.
Use an explicit ReadPlan when you want to inspect or reuse the metadata:
plan = client.catalog.plan(namespace="robotics", table="episodes")
dataset = client.catalog.read(plan=plan, read_as="pandas")
The thick deltacat package can also execute the same thin plan:
import deltacat as dc
dataset = dc.read_table("episodes", namespace="robotics", plan=plan)
[!NOTE]
deltacat-clientintentionally does not depend on the fulldeltacatruntime.client.catalog.read(...)does not require the thickdeltacatpackage to be installed.
Typical uses for a ReadPlan:
- pass the plan into
client.catalog.read(plan=plan)for direct execution - keep planning separate from execution when metadata and data scans happen in different stages of your workflow
- pass the same plan into
dc.read_table(plan=plan)when the fulldeltacatpackage is available and you want to stay on the thick public API - iterate over file paths from
plan.all_files() - group work by logical partition with
plan.partition_summaries() - inspect modality or file type via
DataFile.content_typeandDataFile.extension - hand the plan to a downstream reader that performs the actual data scan
client.catalog.read(...) executes through the shared thin-client execution
layer whether the plan was generated inline or passed explicitly. The plan
contract carries the metadata needed for direct execution. There is no separate
runtime bridge back into thick DeltaCAT for thin plan execution.
[!IMPORTANT] The shared thin-client data execution/materialization path currently supports local paths and
s3://paths only. Plans or staged writes that requiregs://oraz://on the shared IO path fail deterministically until dedicated backend support lands.
For very large tables:
- request plans in pages with
limitandoffset - leave
include_stats=Falseunless the client needs them - keep the API server on the metadata path only and execute the read client-side
For PackDS/training tables at scale (1PB+):
- use
include_files=Falseto skip file enumeration entirely — returns onlysnapshot_id,packds_uri,episode_index,shard_manifest, andreplication_status(O(deltas), not O(files)) - use
preferred_rootto resolve file paths through a replicated root (see Data Placement and Replication)
Staged Writes
Most applications should use client.catalog.write(...) and let the client
manage staging and commit automatically.
Use stage(...) when you need explicit control over where files are written,
when they are committed, or how prewritten files are registered. It returns a
typed WriteSession with the destination directory and write mode.
from deltacat_client import Client
client = Client("https://deltacat-api.example.com", catalog="robotics-prod")
session = client.catalog.stage(
namespace="robotics",
table="embeddings",
mode="merge",
format="parquet",
)
entry = session.entry_for(
"embeddings/output.parquet",
records=1024,
)
result = client.catalog.commit(session=session, entries=[entry])
In practice, an application or worker using staged writes will:
- call
stage(...) - write one or more output files under
session.data_dir - build
WriteEntryvalues for those files - call
commit(...)
client.catalog.write(...) is the higher-level convenience path. It accepts:
- PyArrow tables
- Pandas DataFrames
- Polars DataFrames
- local Parquet, Feather, ORC, or AVRO file paths or lists of those file paths
- local Lance dataset directories
It also manages session liveness automatically while local files are being materialized, and temporarily heartbeats a bound transaction during the same window when needed.
Data Placement and Replication
DeltaCAT supports multi-root catalogs where data files are stored across multiple storage backends (e.g., S3 in us-east-2, OpenStack Swift in PRIMARY, local Lustre). The placement API lets you declare which roots should have copies of a table's data and triggers replication automatically.
Core concepts:
- Data Root: A named storage backend in the catalog's
DataRootConfig(e.g.,"aws_s3_iad","openstack_swift_primary"). Each root has a URI prefix and optional connection configuration. - Placement Policy: A table property that declares which data roots should have copies. Once set, new writes are automatically replicated by a background daemon. Stored on the table, applies to all table versions.
- Backfill: One-shot replication of existing data to a new root. Runs as a chunked, checkpointed orchestration job on dedicated planner/relay workers.
- Replication Status: Per-root progress tracking. Each delta (unit of data) is tracked individually in the Replica Ledger system table.
- Preferred Root: When reading via
plan, you can request file paths resolved through a specific root. For training tables (PackDS), this uses a whole-snapshot gate: either ALL data + sidecars are available on the preferred root, or everything stays on the primary root.
from deltacat_client import Client
client = Client("https://deltacat-api.example.com", catalog="robotics-prod")
# Set a placement policy: replicate to S3 IAD and backfill existing data
result = client.catalog.place(
namespace="vision",
table="annotations",
roots=["aws_s3_iad"],
backfill=True, # replicate existing data (latest active version)
set_policy=True, # set ongoing policy for new writes
)
print(result)
# {"table": "annotations", "namespace": "vision", "roots": ["aws_s3_iad"],
# "policy_set": true, "subscriptions_created": 1, "backfill_jobs": ["replica_backfill:..."]}
Checking replication progress:
status = client.catalog.replication_status(
namespace="vision",
table="annotations",
)
print(status)
# {"table_ref": "vision.annotations",
# "by_root": {"aws_s3_iad": {"complete": 95, "pending": 5, "failed": 0, "total": 100}},
# "replicated_roots": [],
# "policy": {"target_roots": ["aws_s3_iad"], "include_sidecars": true},
# "backfill_status_by_root": {
# "aws_s3_iad": {
# "planner_job": {"job_id": "replica_backfill:...", "state": "completed"},
# "transfer_shards": {"pending": 0, "running": 0, "finalizing": 0,
# "completed": 100, "failed": 0, "total": 100},
# "checkpoint": {"last_partition_id": "...", "last_stream_position": 100},
# "scope_total_deltas": 100,
# "completed_deltas": 100,
# "contiguous_completed_deltas": 100,
# "blocked_on_gap": false
# }
# }}
# Scope to a specific table version:
status = client.catalog.replication_status(
namespace="vision",
table="annotations",
table_version="42",
)
Reading from the closest root:
# plan with preferred_root resolves file paths through the replicated root
plan = client.catalog.plan(
namespace="vision",
table="annotations",
preferred_root="aws_s3_iad",
include_files=False, # PackDS training: skip file enumeration, O(deltas) only
)
# plan.packds_uri -> "s3://training-data-iad/.../v42.packds" (if fully replicated)
# plan.preferred_root_used -> true/false (whether the gate passed)
Targeted backfill with selectors:
# Backfill a specific table version without setting ongoing policy:
client.catalog.place(
namespace="vision",
table="annotations",
roots=["aws_s3_iad"],
table_version="42",
backfill=True,
set_policy=False, # one-shot replication only
)
# Backfill specific partitions:
client.catalog.place(
namespace="vision",
table="annotations",
roots=["aws_s3_iad"],
partition_filter=[
{"partition_id": "uuid-abc-123"},
{"partition_values": ["2026-03-01"]},
],
)
Removing a root from the policy:
result = client.catalog.unplace(
namespace="gear",
table="annotations",
root="aws_s3_iad",
)
# Does NOT delete replicated data — only stops future replication
For architecture details (Replica Ledger, ReplicaTriggerScanner daemon, ReplicaSubscriber, backfill orchestration, worker pool routing), see the DeltaCAT Server README and CLAUDE.md sections on Multi-Root Data Replication.
Subscriptions, Publications, Transforms, and Pipelines
These resources define named, repeatable data workflows. They connect to the
job system: when a subscription or transform is configured with
DispatchMode.CUSTOM, triggering it creates a claimable job that external
workers pick up via client.jobs.claim(). (See Managed workflows → Dispatch modes for a short overview.)
How the pieces fit together
Subscription (watches a source table for new data)
→ Transform (defines processing: custom code or column projection)
→ Publication (defines where output goes)
→ Pipeline (groups transforms into a DAG)
-
Subscription: Tracks per-partition watermarks on a source table. When triggered, it detects new data since the last checkpoint. Configured with a
dispatch_mode:DispatchMode.LOCAL(server runs it inline),DispatchMode.CUSTOM(creates a job for external workers), orDispatchMode.OSMO(OSMO compute pool). -
Transform: Binds a subscription (input) to a publication (output) with processing logic. Processing can be:
select_columns: Simple column projection (built-in, no custom code)subscriber_class: Fully-qualified Python class implementing theSubscriberABC (e.g.,"my_app.transforms.ConvertXDOF"). The class receives the full DeltaCAT runtime context and can use any framework (Polars, Daft, PyArrow, Pandas, custom code).
-
Publication: Declares where transform output lands (one or more sink tables) with provenance tracking.
-
Pipeline: Groups transforms into a named DAG with topological execution order for coordinated redrive and status monitoring.
How pipelines create jobs
When using DispatchMode.CUSTOM:
- You call
client.subscriptions.trigger(...)orclient.subscriptions.run(...) - The server detects new data and creates a
Jobin the durable job store - The job context includes source file paths, target staging paths, and credentials
- An external worker calls
client.jobs.claim(...)to receive the job - The worker processes data and calls
client.jobs.complete(...)with the updated watermark for source-consuming jobs - The server commits the output, advances the watermark, and triggers downstream subscriptions automatically
Example: Annotations Pipeline
This example mirrors a real annotations pipeline design: raw annotation files land in OpenStack Swift, a crawler indexes them, a diff subscriber detects changes, and a custom worker converts them to the curated annotations table format.
Steady-state orchestration: the root crawl publication uses trigger={"mode": "schedule", ...} and runs on the server-managed schedule manager (see Periodic triggers). The crawl uses CUSTOM dispatch so worker processes claim crawl jobs from the job store. Downstream transforms omit trigger (default event): when the crawl commits new data to annotations_crawled, the server triggers the diff transform, then the convert transform.
import json
from deltacat_client import Client, DispatchMode
client = Client(
"https://deltacat-api.example.com",
catalog="vision-prod",
bearer_token="pipeline-admin-token",
)
# ── Stage 1: Crawl publication ──────────────────────────────────────
# Scheduled root crawl: the schedule manager dispatches jobs for each cron
# slot. CUSTOM dispatch → workers claim.
crawl_pub = client.publications.create_crawler(
publication_id="annotations-crawl",
name="Annotations Crawl",
sink_namespace="vision",
sink_table="annotations_crawled",
crawl_params={"seed_paths": [{"path": "s3://annotation-bucket/annotations"}]},
trigger={
"mode": "schedule",
"schedule": {
"kind": "cron",
"cron": "30 8,12 * * *",
"timezone": "America/Los_Angeles",
},
},
dispatch_mode=DispatchMode.CUSTOM,
dispatch_config=json.dumps({"required_worker_tags": ["cpu"]}),
)
# ── Stage 2: Diff transform ─────────────────────────────────────────
# Compares consecutive crawler table versions to produce ADD/UPDATE/DELETE
# deltas. Diff jobs are memory-hungry, so pin them to dedicated workers.
# Default trigger is event-driven when annotations_crawled changes.
diff_transform = client.transforms.create(
transform_id="annotations-diff",
name="Annotations Diff",
source_tables=[{"namespace": "vision", "table": "annotations_crawled"}],
sink_tables=[{"namespace": "vision", "table": "annotations_diff"}],
subscriber_class=(
"deltacat.compute.pipelines.subscriber.diff_subscriber.DiffSubscriber"
),
subscriber_mode="version",
dispatch_mode=DispatchMode.CUSTOM,
dispatch_config=json.dumps({
"required_worker_tags": ["highmem"],
"allowed_worker_ids": [
"diff-worker-useast2-1",
"diff-worker-useast2-2",
],
}),
config_json=json.dumps(
{
"subscriber_kwargs": {
"primary_key": "episode_id",
"version_key": "modified_timestamp_ns",
}
}
),
)
# ── Stage 3: Convert + Curate transform ─────────────────────────────
# Custom worker converts raw XDOF/LeRobot annotations to the canonical
# schema and writes curated output.
convert_transform = client.transforms.create(
transform_id="annotations-convert-xform",
name="XDOF/LeRobot to Canonical Annotations",
source_tables=[{"namespace": "vision", "table": "annotations_diff"}],
sink_tables=[{"namespace": "vision", "table": "annotations"}],
subscriber_class="example.transforms.AnnotationConverter",
dispatch_mode=DispatchMode.CUSTOM,
dispatch_config=json.dumps({"required_worker_tags": ["cpu", "us-east-2"]}),
)
# ── Pipeline (groups all stages for coordinated management) ─────────
pipeline = client.pipelines.create(
pipeline_id="annotations-pipeline",
name="Annotations Pipeline",
node_ids=["annotations-convert-xform"],
)
# No manual run()/trigger() in steady state: the publication schedule scanner
# dispatches crawl jobs; downstream transforms run on source-table updates.
# Optional one-shot bootstrap before the first scheduled bucket:
# client.publications.run(crawl_pub.publication_id)
Worker side (runs on a separate CPU fleet):
from deltacat_client import Client
worker = Client(
"https://deltacat-api.example.com",
worker_id="cpu-worker-us-east-2-pod-7",
worker_tags=["cpu", "us-east-2"],
bearer_token="worker-token-1",
)
while True:
job = worker.jobs.claim()
if job is None:
time.sleep(5)
continue
worker.jobs.start_heartbeat(job.job_id, interval_seconds=30)
# The job context tells the worker what to read and where to write.
# example.transforms.AnnotationConverter.process_deltas() handles the
# actual conversion logic using any Python framework.
for table_ref, files in job.data_access.items():
for file_info in files.get("files", []):
print(f"Processing: {file_info['path']}")
worker.jobs.complete(
job.job_id,
records_processed=1000,
watermark={"partition_watermarks": {"slot-a": 1}, "known_partitions": ["slot-a"]},
)
Pipeline operations
# Check pipeline health
status = client.pipelines.status("annotations-pipeline")
print(f"Active: {status.raw.get('active_count')}")
# Inspect a subscription's watermark (how far it has processed)
wm = client.subscriptions.get_watermark("annotations-convert")
# Pause a subscription (stops processing until resumed)
client.subscriptions.pause("annotations-convert")
client.subscriptions.resume("annotations-convert")
# Redrive: reset the watermark and reprocess from scratch
client.subscriptions.redrive(
"annotations-convert",
rewind_watermark="reset",
)
# Cascade redrive: redrive this subscription and all downstream ones
client.subscriptions.redrive(
"annotations-crawler",
cascade=True,
dry_run=True, # Preview the plan first
)
Creating a subscription, transform, publication, or pipeline only registers the
resource definition. Work is not executed until a control-plane action such as
trigger, run, or redrive is invoked.
Training Data / PackDS Workflow
DeltaCAT manages training datasets stored in PackDS format -- an
episode-oriented training data layout backed by
Lance columnar storage. A PackDS
dataset contains a steps.lance directory where each row is a training
step, grouped into episodes (e.g., robot manipulation trajectories).
The training workflow lets you register PackDS datasets, get training
metadata via plan, and submit preprocessing jobs for episode
filtering and shard assignment.
Key terms:
- Episode index sidecar: A Parquet file placed next to the PackDS data that materializes per-episode metadata (frame counts, task labels, annotation sources). Avoids scanning the full Lance table for every training prep step.
- Snapshot ID: A 32-hex content-addressed fingerprint of the exact data files in a resolved read plan. Used to key all training sidecars so that the same data always produces the same artifacts.
- Shard manifest: A Parquet file mapping
(episode, step)pairs to GPU shard indices for deterministic distributed data loading.
Register a PackDS dataset
Register an existing PackDS dataset by creating a table with packds_uri
in its properties, then committing the pre-written Lance files.
For DeltaCAT to recognize the table as PackDS-backed for training workflows, both conditions must be true:
- the committed files have Lance content type and
.packdsin their URLs - the table metadata carries
packds_uri
from deltacat_client import Client
client = Client("http://localhost:8080")
# 1. Create namespace (if needed) and table with packds_uri property.
client.catalog.create_namespace(namespace="training")
client.catalog.create_table(
namespace="training",
table="episodes_v42",
content_type="lance",
table_properties={
"packds_uri": "s3://training-data/training/episodes_v42.packds",
},
)
# If the table already exists, set packds_uri via alter_table:
# client.catalog.alter_table(
# namespace="training",
# table="episodes_v42",
# table_properties={"packds_uri": "s3://training-data/training/episodes_v42.packds"},
# )
# 2. Build manifest entries describing the pre-written .packds/steps.lance files.
entries = [
{
"path": "s3://training-data/training/episodes_v42.packds/steps.lance/_versions/1.manifest",
"record_count": 0,
"content_length": 5321,
},
{
"path": "s3://training-data/training/episodes_v42.packds/steps.lance/data/0000.lance",
"record_count": 125000,
"content_length": 18350211,
},
{
"path": "s3://training-data/training/episodes_v42.packds/steps.lance/_indices/uuid.idx",
"record_count": 0,
"content_length": 81244,
},
]
# For the full manifest-entry construction logic, see:
# deltacat.utils.lance_utils.build_manifest_entries()
# Commit the pre-written Lance files.
result = client.catalog.commit(
namespace="training",
table="episodes_v42",
entries=entries,
format="lance",
mode="merge",
)
After a write that DeltaCAT recognizes as PackDS-backed, DeltaCAT
automatically submits an index_build job to build the episode-index
sidecar next to the PackDS data. Recognition depends on the two rules
above: Lance-backed committed files plus packds_uri/.packds hints.
Get training metadata via plan
plan = client.catalog.plan(
namespace="training",
table="episodes_v42",
)
print(plan.snapshot_id) # "a1b2c3d4..." (32-hex content-addressed ID)
print(plan.packds_uri) # "s3://bucket/data/v42.packds"
print(plan.episode_index) # {"uri": "s3://.../__episode_index/.../index.parquet",
# "status": "ready"} # or "pending", "missing", "failed"
The snapshot_id is a deterministic fingerprint of the exact data files
backing this table version. Two calls to plan that resolve the same
files always return the same snapshot_id. The episode_index.status
tells you whether the sidecar Parquet file has been built:
ready-- sidecar exists, ready for usepending-- build job is runningmissing-- no job submitted yet (sidecar will be built on next write or reconciliation)failed-- build job failed (check server logs)
Submit a preprocessing job
response = client.jobs.submit_preprocessing_job(
source_table={
"namespace": "training",
"table": "episodes_v42",
},
filter_predicate={"and": [
{"eq": ["task", "pick_and_place"]},
{"gte": ["total_frames", 50]},
]},
shard_recipe={
"shard_size": 64,
"episode_sampling_rate": 1.0,
"action_horizon": 8,
"seed": 42,
},
)
print(response.job_id) # "preprocess:..."
print(response.snapshot_id) # Frozen at submit time
print(response.packds_uri) # "s3://bucket/data/v42.packds"
print(response.episode_index_exists) # True/False
The shard recipe parameters control how episodes are distributed across
GPU shards: shard_size is the target number of training steps per shard,
episode_sampling_rate controls what fraction of episodes to include,
action_horizon determines the effective length of each episode (steps
that can start a complete action sequence), and seed ensures
reproducibility.
The job is submitted to the server's job queue. A preprocessing worker claims it, builds/loads the episode index, applies the filter, computes shard assignments, and writes a shard manifest.
Two preprocessing modes are supported:
- Mode A: omit
output_tableto compute only a shard manifest under the frozen sourcesnapshot_id. - Mode B: provide
output_tableto materialize a filtered PackDS snapshot as a DeltaCAT output, commit it with a deterministicpreprocessing_job_id, write the shard manifest under the committed outputsnapshot_id, and enqueue the output episode-index build.
Retrieve the shard manifest
After the preprocessing job completes, get the shard manifest via
plan with a recipe_hash:
plan = client.catalog.plan(
namespace="training",
table="episodes_v42",
recipe_hash="abc123",
)
print(plan.shard_manifest) # {"uri": "s3://.../__shard_manifests/.../abc123.parquet"}
MCP tools for training
When using DeltaCAT via MCP (e.g., from Claude Code), the relevant tools are:
deltacat_plan_read-- returnssnapshot_id,packds_uri,episode_indexstatus, and optionalshard_manifest. Useinclude_files=falsefor PackDS tables at scale (returns metadata only, O(deltas) not O(files)). Usepreferred_rootto resolve paths through a replicated root.deltacat_submit_preprocessing_job-- submits a preprocessing job with filter predicates and shard recipesdeltacat_manage_placement-- set data placement policies to replicate training data across storage roots (e.g., OpenStack Swift to S3 for GPU access)
Further reading
For the full architectural reference -- including the episode index schema, shard manifest algorithm, structured filter predicate format, the relationship between the episode index sidecar and Lance native indexes, the row-layout contiguity invariant, and reconciliation internals -- see deltacat/compute/training/README.md.
Configuration
The root Client can carry common deployment settings:
catalogheadersuser_idbearer_tokenverify_sslfollow_redirectstimeout
import os
from deltacat_client import Client
client = Client(
"https://deltacat-api.example.com",
catalog="robotics-prod",
bearer_token=os.environ["DELTACAT_TOKEN"],
user_id="robotics-trainer@example.com",
timeout=60,
verify_ssl="/etc/ssl/certs/internal-ca.pem",
)
When catalog is configured on the root client, resource methods use it by
default unless an explicit catalog= override is provided.
Custom headers can be attached for tracing, correlation, or proxy deployments:
client = Client(
"https://deltacat-api.example.com",
bearer_token="my-token",
headers={
"x-trace-id": "run-12345",
"x-correlation-id": "pipeline-abc",
},
)
[!NOTE] Identity headers like
X-DeltaCAT-Userare only trusted when the server is configured withDELTACAT_TRUST_IDENTITY_HEADERS=true(Trusted Proxy mode). For direct auth, usebearer_tokeninstead.
The client supports both synchronous and asynchronous lifecycle management:
from deltacat_client import Client
with Client("https://deltacat-api.example.com") as client:
tables = client.catalog.list_tables(namespace="robotics")
The context manager form is useful for long-lived services and scripts that want explicit control over the underlying HTTP client lifecycle.
Data Model Highlights
Client exposes typed models for the most common server responses, including:
ClaimedJob-- claimed job withjob_id,job_token,data_access,sink_upload,credentialsJobStatus-- job lifecycle state, progress, errorReadPlan-- file manifest with schema, partitions, and content typesScanTask-- individual scan unit within a read planDataFile-- file path, size, content type, record countWriteSession-- durable write session with staged data pathWriteEntry-- explicit file metadata for commitSubscriptionInfo,SubscriptionWatermark,WatermarkState-- subscription lifecyclePublicationInfo,TransformInfo,PipelineInfo-- orchestration resourcesPipelineActionResult-- pipeline operation results
These models are designed to make file and manifest-oriented workflows easier to work with in multimodal environments.
Supported Read Paths
DeltaCAT distinguishes between:
- Preview: bounded server-side samples via
preview_data - Plan: lightweight metadata-only read planning via
plan
For scalable reads, use plan and execute the resulting plan client-side.
Package Architecture
This distribution contains:
deltacat_client: the public Python facadedeltacat_generated_client: bundled generated REST bindings used internally
The generated package is treated as an implementation detail. Consumers should
import from deltacat_client.
Regenerating the Bundled Client
From the DeltaCAT repository root:
make export-openapi
make generate-deltacat-client
This exports the current REST OpenAPI schema and regenerates the bundled low-level client bindings.
Development Notes
The standalone deltacat-client package is tested, built, and release-hardened
independently from the main deltacat package:
- dedicated
make test-clienttarget - generated-client drift check
- wheel build check
- fresh-environment install smoke test
See README-development.md in the repository root for the current development
targets and CI workflow.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file deltacat_client-0.1.0.tar.gz.
File metadata
- Download URL: deltacat_client-0.1.0.tar.gz
- Upload date:
- Size: 214.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b87f910b448ebae2153dc9e7282c0855c684d02f86446602b614eee79a69a57f
|
|
| MD5 |
5034337f3e4fc0a71c69e67fcfa664ed
|
|
| BLAKE2b-256 |
257e58b148a6b7564d4e43d1b230aba26d26a6edda04a8585d708341eec911f6
|
File details
Details for the file deltacat_client-0.1.0-py3-none-any.whl.
File metadata
- Download URL: deltacat_client-0.1.0-py3-none-any.whl
- Upload date:
- Size: 553.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
683ccb8bb08e137649f40046b06633dd36bd9f018b3ab4303d54972cce6c17fb
|
|
| MD5 |
35c93de7ff774215c963ad2453e4d657
|
|
| BLAKE2b-256 |
e23589a871789f5f98a27c2c0005932c61bf193513ca81ea6664ed8c71441539
|