Skip to main content

Lightweight REST client for the DeltaCAT API server.

Project description

DeltaCAT Client

deltacat-client is the Python REST client for DeltaCAT.

It provides a lightweight way to interact with a DeltaCAT API server from external applications, workers, agents, and orchestration systems without installing the full DeltaCAT storage, compute, or server runtime stack.

Overview

The client is organized around a small root Client object with resource-oriented subclients:

  • client.jobs — job submission, claiming, lifecycle
  • client.catalog — namespace/table CRUD, read planning, data placement
  • client.subscriptions — incremental data processing pipelines
  • client.publications — crawler and data publishing workflows
  • client.transforms — source-to-sink data transformations
  • client.pipelines — multi-stage pipeline orchestration

This Client facade wraps the lower-level REST bindings generated from DeltaCAT's OpenAPI schema.

Installation

uv pip install deltacat-client

deltacat-client depends on the shared deltacat-io-core package for local plan execution and local file materialization. You normally install the client extras below rather than installing deltacat-io-core directly.

Package boundary:

  • deltacat-client depends on deltacat-io-core
  • deltacat-client and deltacat-io-core are intended to work without the thick deltacat package being installed
  • use the thick deltacat package only when you explicitly want the native catalog/storage/compute runtime or want to execute thin plans through the thick public API

Naming convention:

  • package/distribution name: deltacat-io-core
  • Python import module: deltacat_io_core

Optional extras:

  • deltacat-client[io] for local plan execution and local file-based read/write helpers
  • deltacat-client[pandas] for Pandas dataframe input to client.catalog.write(...)
  • deltacat-client[polars] for Polars dataframe input and AVRO materialization
  • deltacat-client[lance] for local Lance dataset read/write support
  • deltacat-client[all] for the full client-side read/write UX stack

High-level client.catalog.write(...) supports:

  • PyArrow tables
  • Pandas DataFrames
  • Polars DataFrames
  • Daft DataFrames
  • NumPy arrays
  • Ray Datasets
  • local Parquet / CSV / TSV / PSV / Feather / JSON / ORC / AVRO files
  • local Lance dataset directories

Quick Start

First-Run Setup

Before using the client, ensure a DeltaCAT API server is running. For local development:

pip install deltacat
python -m deltacat.server --catalog-root /tmp/my-catalog --transport http --port 8080

For an existing production server, get the base URL from your team (e.g., https://deltacat-api.example.com).

Connect and Explore

from deltacat_client import Client

# Local development
client = Client("http://localhost:8080")

# Production (with auth)
client = Client(
    "https://deltacat-api.example.com",
    catalog="robotics-prod",
    user_id="researcher@example.com",
    bearer_token="...",
)

# Discover what's available
catalogs = client.catalog.list_catalogs()
namespaces = client.catalog.list_namespaces()
tables = client.catalog.list_tables(namespace="robotics")

# Inspect a table
desc = client.catalog.describe_table(namespace="robotics", table="episodes")
print(f"Schema: {desc.schema}")
print(f"Rows: {desc.total_records}")

Read Data

# Direct read: the client plans and executes inline
pandas_df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    filter_predicate={"eq": ["task", "pick_screwdriver"]},
    limit=5000,
    offset=0,
)

# Advanced: build a reusable read plan when you want to inspect or reuse it
plan = client.catalog.plan(namespace="robotics", table="episodes")
print(f"Files: {len(plan.all_files())}")
print(f"Partitions: {len(plan.partition_summaries())}")

# Execute the reusable plan directly through the client
reused_df = client.catalog.read(plan=plan, read_as="pandas")

# The thick deltacat package can also execute the same thin plan
import deltacat as dc
dataset = dc.read_table("episodes", namespace="robotics", plan=plan)

For schemaless tables, client.catalog.read(...) follows the same contract as DeltaCAT itself and returns a flattened manifest table rather than the file contents. The manifest rows include file paths and metadata such as content type, sizes, and record counts. Materializing actual rows from a schemaless table remains an explicit second step via client.catalog.from_manifest_table(...).

Write Data

import pyarrow as pa

# High-level write: the client manages staging and commit internally
result = client.catalog.write(
    pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]}),
    namespace="robotics",
    table="predictions",
    mode="add",
    format="parquet",
)

Transactions

Within a transaction context, supported catalog operations bind to the active transaction automatically. You only need to pass transaction=... when you want to override the ambient context explicitly.

import pyarrow as pa

with client.transaction(commit_message="Create and backfill predictions") as tx:
    client.catalog.create_namespace(namespace="robotics_staging")

    client.catalog.create_table(
        namespace="robotics_staging",
        table="predictions",
        schema_def=[
            {"name": "episode_id", "type": "int64", "is_merge_key": True},
            {"name": "score", "type": "float64"},
        ],
        content_type="parquet",
        auto_create_namespace=False,
    )

    client.catalog.write(
        pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]}),
        namespace="robotics_staging",
        table="predictions",
        mode="add",
        format="parquet",
    )

    preview = client.catalog.read(
        namespace="robotics_staging",
        table="predictions",
        read_as="pandas",
    )

The transaction context manager heartbeats while it is open, commits on normal exit, and aborts on exception.

Use an explicit transaction handle when you want to control commit and abort yourself:

import pyarrow as pa

tx = client.transaction(commit_message="Backfill prediction scores")
try:
    client.catalog.write(
        pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]}),
        namespace="robotics",
        table="predictions",
        mode="add",
        format="parquet",
        transaction=tx,
    )

    preview = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
        transaction=tx,
    )

    tx.commit()
except Exception:
    tx.abort(reason="backfill failed")
    raise

Historic transactions are also supported for time-travel reads:

checkpoint_ns = 1712697600000000000

with client.transaction(as_of=checkpoint_ns) as historic_tx:
    historic_df = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
        transaction=historic_tx,
    )

An as_of transaction reads a historical snapshot and is read-only. Write operations against a historic transaction are rejected.

Transaction-aware catalog APIs include:

  • namespace operations:
    • create_namespace(...)
    • get_namespace(...)
    • namespace_exists(...)
    • alter_namespace(...)
    • drop_namespace(..., purge=False)
    • list_namespaces(...)
  • table operations:
    • create_table(...)
    • describe_table(...)
    • table_exists(...)
    • alter_table(...)
    • rename_table(...)
    • drop_table(..., purge=False)
    • list_tables(...)
    • list_table_versions(...)
  • data operations:
    • plan(...)
    • read(...)
    • stage(...)
    • write(...)
    • commit(...)

Transaction-bound mutation results include transaction_id and transaction_version so callers can observe the authoritative server-side version when needed.

Fetched transactions and write sessions remain bound to their originating catalog and transaction metadata:

tx = client.transaction(catalog="robotics-prod", commit_message="stage files")
session = client.catalog.stage(
    namespace="robotics",
    table="predictions",
    mode="merge",
    format="parquet",
    transaction=tx,
)

fetched_session = client.catalog.get_write_session(session.session_id)
fetched_tx = client.catalog.get_transaction(tx.id)

attached = fetched_session.commit(
    entries=[{"path": "/tmp/output.parquet", "record_count": 1024}]
)

latest_tx = fetched_tx.get()

Important transaction rules:

  • within with client.transaction(...):, supported catalog operations bind to the ambient transaction automatically
  • explicit transaction=... overrides the ambient transaction, but it must belong to the same Client instance
  • catalog=... is not allowed when an active transaction context is already in scope
  • fetched write sessions and fetched transactions keep their bound catalog; a conflicting explicit catalog=... is rejected
  • some catalog APIs intentionally remain non-transactional and raise if called inside a transaction context, including write_data(...), truncate_table(...), rollback_table(...), placement APIs, retention, and langolier / compaction status helpers

Typical next steps after discovery are:

  • inspect a specific table with client.catalog.describe_table(...)
  • read directly with client.catalog.read(...)
  • build a read plan with client.catalog.plan(...) when you want to inspect, page, cache, or reuse it
  • create or manage orchestration resources such as subscriptions and pipelines

When used as a context manager, client.transaction(...) automatically heartbeats in the background while the transaction is open and idle.

Likewise, a standalone WriteSession returned by client.catalog.stage(...) can be used as a context manager to auto-heartbeat while files are being written, and it will abort the session on exception.

When you want an explicit reusable plan, plan(...) supports:

  • filter_predicate: structured JSON AST pushed into DeltaCAT partition pruning, file-stat pruning, and Polars row filtering
  • max_files / file_offset: page very large plans instead of returning every file path in one response
  • include_column_stats: opt in only when the client actually needs per-file statistics

client.catalog.read(...) returns a PyArrow table by default and also accepts:

  • read_as="pandas"
  • read_as="polars"
  • read_as="numpy"
  • read_as="lance" for a lazy Lance dataset when the plan resolves to a single Lance dataset path
  • read_as="pyarrow_parquet" for a lazy pyarrow.parquet.ParquetFile when the plan resolves to a single Parquet file
  • read_as="daft"
  • read_as="ray_dataset"

For schemaless tables, those supported read_as conversions apply to the flattened manifest table, not to the underlying file contents.

Use client.catalog.from_manifest_table(...) when you explicitly want to materialize the referenced files into a structured dataset.

Supported Orchestration Path

The DeltaCAT client is the supported control plane for running managed work.

  • client.jobs.submit_compaction(...)
  • client.jobs.submit_langolier(...)
  • client.jobs.submit_retention(...)
  • client.jobs.submit_gc_scan(...)
  • client.jobs.submit_gc_full(...)
  • client.jobs.submit_gc_outbox(...)
  • client.jobs.submit_gc_archive(...)
  • client.jobs.submit_gc_purge(...)
  • client.jobs.submit_gc_restore(...)
  • client.jobs.recover_pending_delete(...)
  • client.jobs.audit_pending_delete(...)
  • client.jobs.get_gc_stats(...)
  • client.jobs.list_gc_exempt(...)
  • client.jobs.submit_janitor(...)
  • client.publications.create_crawler(...) + client.publications.run(...)
  • client.transforms.create(...) with DiffSubscriber + client.transforms.run(...) or trigger(...)
  • client.catalog.place(...) + client.catalog.replication_status(...) + client.catalog.unplace(...)

Managed Workflows

Dispatch modes

Use DispatchMode for dispatch_mode: LOCAL (run on the API host), CUSTOM (enqueue a job for external workers-the usual production choice), or OSMO (NVIDIA OSMO). For CUSTOM, workers pull work with client.jobs.claim(). See Subscriptions, publications, transforms, and pipelines for the full lifecycle.

Periodic triggers

Trigger configuration is part of each subscription, publication, and transform.

Create or update each subscription, publication, or transform with:

  • trigger={"mode": ...}
  • trigger={"mode": "schedule", "schedule": {...}} for scheduled resources

Schedule shapes are strict:

  • {"kind": "interval", "interval_seconds": N} only accepts interval_seconds
  • {"kind": "cron", "cron": "...", "timezone": "Area/City"} accepts cron plus optional timezone
  • mixed interval/cron fields are rejected instead of being silently ignored

Use the trigger types as follows:

  • manual: never auto-trigger; only run(...) / trigger(...) starts work
  • event: auto-trigger on upstream table updates (subscriptions and transforms)
  • schedule: auto-trigger on the server-managed periodic scheduler
  • custom: publication-only; intended for external orchestrators

Defaults:

  • subscriptions default to event
  • transforms default to event
  • root publications default to manual

The API server runs the built-in pipeline schedule manager. Scheduled subscriptions, publications, and transforms do not require scheduler enable flags on the server, and each resource controls its own schedule independently. Scheduled work uses the same durable job path as manual triggers, so DispatchMode.CUSTOM produces claimable jobs for external workers.

You can change trigger policy after creation:

client.subscriptions.update(
    "events-sub",
    trigger={
        "mode": "schedule",
        "schedule": {"kind": "interval", "interval_seconds": 300},
    },
)

client.publications.update(
    "crawl-root",
    trigger={
        "mode": "schedule",
        "schedule": {
            "kind": "cron",
            "cron": "30 8,12 * * *",
            "timezone": "America/Los_Angeles",
        },
    },
)

client.transforms.update(
    "events-diff",
    trigger={"mode": "manual"},
)

Auth for trigger-policy changes follows the resource auth model:

  • subscriptions: creator, managers, or ADMIN
  • publications: creator, managers, or ADMIN
  • transforms: creator, managers, or ADMIN

The annotations example below uses a scheduled root crawl publication plus event transforms so each crawl completion updates the catalog and downstream stages fire automatically.

Crawl as a Publication

Use a crawler publication when the crawl should be a named, managed DeltaCAT workflow:

from deltacat_client import DispatchMode

pub = client.publications.create_crawler(
    publication_id="crawl-openstack-swift-primary",
    name="OpenStack Swift PRIMARY Crawl",
    sink_namespace="crawled",
    sink_table="openstack_swift_primary",
    crawl_params={
        "seed_paths": [{"path": "s3://data-home"}],
        "filesystem_id": "openstack_swift_fs",
        "storage_backend": "openstack_swift",
        "crawl_strategy": "adaptive",
        "max_parallelism": 32,
    },
    dispatch_mode=DispatchMode.CUSTOM,
)

run = client.publications.run(pub.publication_id)
print(run.success, run.job_id if hasattr(run, "job_id") else None)

The declared publication sink is authoritative. The client/server force the crawler to write to sink_namespace.sink_table, even if the embedded crawl_params contains a different namespace or table name.

Diff as a Transform

Use a first-class transform when the diff should be part of a managed CDC pipeline:

import json

from deltacat_client import DispatchMode

xform = client.transforms.create(
    transform_id="openstack-swift-primary-diff",
    name="OpenStack Swift PRIMARY Diff",
    source_tables=[{"namespace": "crawled", "table": "openstack_swift_primary"}],
    sink_tables=[{"namespace": "changes", "table": "openstack_swift_primary_events"}],
    subscriber_class=(
        "deltacat.compute.pipelines.subscriber.diff_subscriber.DiffSubscriber"
    ),
    subscriber_mode="version",
    config_json=json.dumps(
        {
            "subscriber_kwargs": {
                "primary_key": "file_path",
                "version_key": "modified_at",
            }
        }
    ),
    dispatch_mode=DispatchMode.CUSTOM,
)

run = client.transforms.run(xform.transform_id)
print(run.success)

Maintenance Jobs

Use client.jobs for maintenance and operational workflows:

client.jobs.submit_gc_scan(
    seed_paths=["s3://bucket-a", "s3://bucket-b"],
    filesystem_id="amlfs01",
    max_depth=5,
)

client.jobs.submit_gc_full(
    pending_delete_root="s3://bucket/pending-delete",
    dry_run=True,
    source_types=["gc_scanner"],
)

client.jobs.submit_gc_outbox(
    outbox_root="s3://bucket/outbox",
    dry_run=True,
)

client.jobs.submit_gc_purge(
    pending_delete_root="s3://bucket/pending-delete",
    dry_run=True,
)

recoverable = client.jobs.recover_pending_delete(
    pending_delete_root="s3://bucket/pending-delete",
)

audit = client.jobs.audit_pending_delete(
    pending_delete_root="s3://bucket/pending-delete",
)

stats = client.jobs.get_gc_stats(filesystem_id="amlfs01")
exempt_dirs = client.jobs.list_gc_exempt()

client.jobs.submit_langolier(namespace="robotics", table="episodes")
client.jobs.submit_retention(namespace="robotics", table="episodes")
client.jobs.submit_janitor(dry_run=True)

AI Agent / MCP Integration

DeltaCAT exposes an MCP (Model Context Protocol) server that enables AI agents like Claude, Cursor, and other LLM-powered tools to interact with the catalog through natural language. The MCP server exposes 46+ tools for catalog management, data operations, and pipeline orchestration.

Starting the MCP Server

# For Claude Code / Cursor:
claude mcp add deltacat -- python -m deltacat.server --catalog-root /path/to/catalog

# For HTTP-based MCP clients:
python -m deltacat.server --catalog-root /path/to/catalog --transport http --port 8080

Key MCP Tools for Agents

Discovery and inspection:

Tool Purpose
deltacat_list_catalogs List available catalogs
deltacat_list_namespaces List namespaces in a catalog
deltacat_list_tables Discover tables in a namespace
deltacat_describe_table Inspect schema, row count, size, properties
deltacat_list_table_versions List version history for a table
deltacat_preview_data Bounded server-side data sample
deltacat_guide Get contextual help about DeltaCAT concepts

Catalog management:

Tool Purpose
deltacat_create_namespace Create a new namespace
deltacat_create_table Create a table with schema, properties, content type
deltacat_alter_table Update table properties or description
deltacat_rename_table Rename a table
deltacat_drop_table Delete a table (with optional data purge)
deltacat_drop_namespace Delete a namespace
deltacat_rollback Restore a table to a prior point in time
deltacat_truncate_table Remove all data from a table

Reading and writing data:

Tool Purpose
deltacat_plan_read Build a read plan for client-side execution
deltacat_write_data Small inline writes (< 10k rows)
deltacat_stage_write Start a staged write session (large writes)
deltacat_commit_write Commit staged files to the catalog

Placement and replication:

Tool Purpose
deltacat_manage_placement Set, query, or remove data placement policies across roots

Pipelines and orchestration:

Tool Purpose
deltacat_create_subscription Set up incremental data processing
deltacat_create_transform Define a data transformation
deltacat_create_publication Define sink tables
deltacat_create_pipeline Group pipeline nodes into a named DAG
deltacat_trigger_subscription Trigger async processing
deltacat_run_subscription Run processing synchronously

Jobs and workers:

Tool Purpose
deltacat_claim_job Claim a pending job (with worker tag affinity)
deltacat_submit_preprocessing_job Submit a PackDS preprocessing job

Example: Building the Annotations Pipeline via MCP

An AI agent can build a complete annotations pipeline described in lab design docs using MCP tools alone:

  1. Create the raw annotations table using deltacat_stage_write + deltacat_commit_write (the agent writes schema-conformant Parquet files)

  2. Create a crawler subscription to watch for new annotation files:

    deltacat_create_subscription(
        subscriber_id="annotations-crawler",
        source_tables=[{"namespace": "vision", "table": "crawler_root"}],
        subscriber_type="custom",
        subscriber_mode="version",
        dispatch_mode="custom",
        dispatch_config='{"required_worker_tags": ["cpu"]}'
    )
    
  3. Create a diff subscription for change detection:

    deltacat_create_subscription(
        subscriber_id="annotations-diff",
        source_tables=[{"namespace": "vision", "table": "crawler_output"}],
        subscriber_type="diff",
        primary_key="episode_id",
        version_key="modified_timestamp_ns"
    )
    
  4. Create a transform for curated output:

    deltacat_create_transform(
        transform_id="annotations-curate",
        name="Curate Annotations",
        source_tables=[{"namespace": "vision", "table": "annotations_diff"}],
        sink_tables=[{"namespace": "vision", "table": "annotations"}],
        select_columns=["episode_id", "key", "value", "start_time_ns", "end_time_ns"]
    )
    
  5. Wire into a pipeline and trigger:

    deltacat_create_pipeline(
        pipeline_id="annotations-pipeline",
        name="Annotations Pipeline",
        node_ids=["annotations-curate"]
    )
    deltacat_trigger_subscription("annotations-crawler")
    

For the complete MCP tool reference and configuration guide, see deltacat/docs/mcp/README.md.

Common Workflows

Catalog Management

Create, inspect, and manage namespaces and tables.

from deltacat_client import Client

client = Client("http://localhost:8080")

# --- Namespaces ---

# Create a namespace
client.catalog.create_namespace(namespace="robotics")

# Check existence
if client.catalog.namespace_exists(namespace="robotics"):
    print("Namespace exists")

# Get namespace details
ns = client.catalog.get_namespace(namespace="robotics")
print(ns.properties)

# Update namespace properties
client.catalog.alter_namespace(
    namespace="robotics",
    properties={"team": "manipulation", "env": "prod"},
)

# --- Tables ---

# Create a table with properties
client.catalog.create_table(
    namespace="robotics",
    table="episodes",
    content_type="parquet",
    table_properties={"owner": "alice", "retention_days": "90"},
)

# Check existence
if client.catalog.table_exists(namespace="robotics", table="episodes"):
    print("Table exists")

# Inspect a table
desc = client.catalog.describe_table(namespace="robotics", table="episodes")
print(f"Schema: {desc.schema}")
print(f"Rows: {desc.total_records}")
print(f"Properties: {desc.properties}")

# Update table properties
client.catalog.alter_table(
    namespace="robotics",
    table="episodes",
    table_properties={"retention_days": "180"},
)

# List table version history
versions = client.catalog.list_table_versions(
    namespace="robotics", table="episodes",
)
for v in versions:
    print(f"Version: {v.version}")

# Rename a table
client.catalog.rename_table(
    namespace="robotics",
    table="episodes",
    new_name="episodes_v1",
)

# Rollback to a prior point in time (nanosecond timestamp)
client.catalog.rollback_table(
    namespace="robotics",
    table="episodes_v1",
    as_of=1711497600000000000,  # nanosecond epoch
)

# Truncate all data (keeps table metadata)
client.catalog.truncate_table(namespace="robotics", table="episodes_v1")

# Drop a table (purge=True also deletes data files)
client.catalog.drop_table(namespace="robotics", table="episodes_v1", purge=True)

# Drop a namespace
client.catalog.drop_namespace(namespace="robotics", purge=True)

Data Operations

Direct reads, optional read plans, inline writes, retention, and compaction status.

# Read directly through the client
dataset = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
)

# Build a reusable plan when you want file-level metadata
plan = client.catalog.plan(namespace="robotics", table="episodes")
print(f"Files: {plan.total_files}, Records: {plan.total_records}")

# The full deltacat package can also execute the same thin plan
import deltacat as dc
thick_dataset = dc.read_table("episodes", namespace="robotics", plan=plan)

# Small inline write (< 10k rows, agent-generated data)
client.catalog.write_data(
    namespace="robotics",
    table="predictions",
    data=[{"episode_id": 1, "score": 0.95}],
)

# Check compaction status
status = client.catalog.compaction_status(
    namespace="robotics", table="episodes",
)

# Run retention cleanup (dry_run=True previews without deleting)
result = client.catalog.langolier_cleanup(
    namespace="robotics",
    table="episodes",
    dry_run=True,
)

# Get effective retention policy
policy = client.catalog.get_retention_policy(
    namespace="robotics", table="episodes",
)

Auth

Inspect your identity and manage permissions.

# Check your identity
me = client.auth.whoami()
print(f"User: {me.user_id}, Auth configured: {me.auth_configured}")

# Grant a role (admin only)
client.auth.grant(user_id="bob@example.com", role="WRITER")

# Grant scoped to a namespace
client.auth.grant(
    user_id="bob@example.com",
    role="READER",
    resource_type="namespace",
    resource_name="robotics",
)

# List grants
grants = client.auth.list_grants(user_id="bob@example.com")
for g in grants:
    print(f"{g.user_id}: {g.role} on {g.scope_type}/{g.scope_name}")

# The convenience facade returns only grant items. The raw REST API may also
# include informational metadata when auth is not configured.

# Revoke
client.auth.revoke(
    user_id="bob@example.com",
    resource_type="namespace",
    resource_name="robotics",
)

Jobs

This workflow is intended for external workers. A worker claims a queued job, keeps the job alive with heartbeats while processing, and reports the result back to the DeltaCAT API server.

from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    worker_id="gpu-01",
    worker_tags=["gpu", "arm64"],
    bearer_token="worker-token-1",  # Required in Direct Auth mode
)

job = client.jobs.claim()
if job is not None:
    # The claim response includes a job_token (when DELTACAT_JOB_SECRET is set
    # on the server). The client passes it automatically on subsequent calls.
    print(f"Claimed job {job.job_id}, token: {job.job_token}")

    # Keep the job lease alive while processing.
    client.jobs.start_heartbeat(job.job_id, interval_seconds=30)

    # Read source files from the job context, perform work, then report success.
    # Source-consuming jobs must include the updated watermark on completion.
    client.jobs.complete(
        job.job_id,
        records_processed=50000,
        watermark={"partition_watermarks": {"slot-a": 123}, "known_partitions": ["slot-a"]},
    )

If you need a blocking poller instead of a background heartbeat loop, use client.jobs.wait(job_id).

For richer progress inspection, use client.jobs.get_progress(job_id). The response separates worker heartbeat progress from structured server-side progress events:

progress = client.jobs.get_progress(job.job_id)

print(progress.progress_fraction)   # 0.5
print(progress.progress_message)    # "halfway"
print(progress.progress_events)     # {"source_node_run": JobProgressEntry(...)}

How jobs are created

Workers do not create jobs themselves. A job is the universal work primitive in DeltaCAT: a typed work unit with a claim/heartbeat/complete lifecycle. Jobs enter the shared claim pool through two submission paths:

Pipeline-scheduled jobs (declarative, incremental): Subscriptions, transforms, and publications define repeatable workflows that watch for data changes and process them incrementally. When a pipeline action (trigger, run, redrive) fires, it creates a job. Crawler, diff, relay, and custom subscriber jobs all use this path.

Ad-hoc submitted jobs (imperative, one-shot): submit_preprocessing_job() creates a one-shot job for a specific snapshot with specific parameters. Episode index builds are auto- submitted on PackDS table commits. Future system operations (compaction, GC, langolier) will also use ad-hoc submission.

Both paths produce jobs in the same claim pool. Workers see all pending jobs and claim based on worker tags and dispatch config. The worker-side lifecycle is always the same: claim, heartbeat, process, then complete or fail.

Target architecture: Jobs are the foundational primitive. Pipelines are orchestration over jobs. Every work unit -- whether from a pipeline trigger, an ad-hoc submit, a compaction dispatch, or a GC scan -- is a job with full lifecycle tracking. Some execution paths bypass the job store, including inline local dispatch and fire-and-forget queue messages.

Routing jobs to the right workers

For DispatchMode.CUSTOM, set dispatch_config when you create the resource and pass matching tags when workers claim jobs. The job system uses a shared claim pool, so these filters are the main way to separate CPU-only work from GPU-heavy work.

import json

from deltacat_client import DispatchMode

transform = client.transforms.create(
    transform_id="annotations-transform",
    name="Annotations Transform",
    source_tables=[{"namespace": "vision", "table": "annotations_raw"}],
    sink_tables=[{"namespace": "vision", "table": "annotations_curated"}],
    subscriber_class="example.transforms.AnnotationTransform",
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps(
        {
            "required_worker_tags": ["gpu", "arm64"],
            "allowed_worker_ids": ["trainer-usw2-pod-17"],
        }
    ),
)

job = client.jobs.claim(worker_tags=["gpu", "arm64"])

Supported affinity keys today:

  • required_worker_tags
  • allowed_worker_ids

Job Store Durability

The server defaults to a durable job store (DELTACAT_JOB_STORE_BACKEND=deltacat) that persists job state across process restarts and supports multi-instance HA behind a load balancer. Jobs go through five states: PENDING, RUNNING, FINALIZING, COMPLETED, FAILED. The FINALIZING state is a crash-recovery checkpoint for server-side catalog mutations after worker completion.

Custom worker jobs use a shared claim pool with required_worker_tags and allowed_worker_ids as routing filters. This is sufficient for the initial deployment but is not a full multi-queue scheduler.

Choosing a worker_id

worker_id is a stable identifier for the worker process claiming jobs. It is not pre-registered with the server and it does not reserve work ahead of time. A good value is something stable for the lifetime of the worker, such as:

  • a Kubernetes pod name
  • a VM or instance identifier
  • a hostname plus process role
  • a generated UUID persisted for the life of the process

Example:

client = Client(
    "https://deltacat-api.example.com",
    worker_id="trainer-usw2-pod-17",
)

Can multiple workers use the same worker_id?

They should not.

Jobs are not associated with any worker until they are claimed. After a worker claims a job, the server records the worker_id and expects subsequent heartbeats, completion, and failure reports to come from that same identifier. Using the same worker_id from multiple concurrent workers makes ownership and recovery behavior ambiguous.

Job Authentication

When auth is enabled on the server, claiming a job requires WRITER permission. In the default Direct Auth mode, workers satisfy that with a bearer token. In Trusted Proxy mode, the upstream proxy supplies the authenticated identity header and the client does not need to send bearer_token. The claim response includes a per-job HMAC token (job_token) that is required on all subsequent heartbeat, complete, and fail calls. The client handles this automatically: the token is stored on the ClaimedJob object and passed as an X-Job-Token header by the transport.

Tokens are bound to the specific claim instance (job_id, worker_id, claimed_at) and auto-invalidate if the job is reclaimed (e.g., after heartbeat expiry).

client = Client(
    "https://deltacat-api.example.com",
    worker_id="gpu-01",
    bearer_token="worker-token-1",  # Authenticates the claim
)

job = client.jobs.claim()
# job.job_token is automatically passed on heartbeat/complete/fail

Current limitation: no generic query-job submit API

deltacat-client does not yet expose a first-class submit_query_job() or submit_dataframe_script() API. Supported paths are:

  • plan(...) for scalable reads that your code executes client-side
  • stage(...) + commit(...) for scalable writes
  • subscriptions/publications/transforms/pipelines for repeatable managed flows

If you need to run arbitrary remote Python or dataframe code on a managed cluster, use a dedicated server-side job contract.

Catalog Read Planning

plan is the scalable path for reading table data. It resolves metadata server-side and returns a typed Plan describing the files, schema, and partition layout needed to execute the read client-side.

from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")

plan = client.catalog.plan(
    namespace="robotics",
    table="episodes",
    filter_predicate={
        "and": [
            {"eq": ["task", "pick_screwdriver"]},
            {"gte": ["episode_length", 32]},
        ]
    },
    limit=10000,
    include_stats=False,
)
all_files = plan.all_files()
image_files = plan.files_with_extension(".jpg", ".png")
partitions = plan.partition_summaries()

The structured filter_predicate format is a JSON AST. Supported operators:

  • eq, neq, gt, gte, lt, lte
  • in
  • between
  • and, or, not
  • is_null

Examples:

{"eq": ["task", "pick"]}
{"in": ["embodiment", ["gr1", "franka"]]}
{"between": ["episode_length", 32, 128]}
{"and": [{"eq": ["task", "pick"]}, {"gte": ["success_rate", 0.9]}]}

Most client code should call client.catalog.read(...) directly and let the client generate the plan inline:

from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")
dataset = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
)

This path plans and executes through the shared deltacat-io-core execution layer, including MOR, lazy Parquet/Lance reads, schema alignment, sort-aware ordering, and distributed reader outputs where supported.

Use an explicit ReadPlan when you want to inspect or reuse the metadata:

plan = client.catalog.plan(namespace="robotics", table="episodes")
dataset = client.catalog.read(plan=plan, read_as="pandas")

The thick deltacat package can also execute the same thin plan:

import deltacat as dc

dataset = dc.read_table("episodes", namespace="robotics", plan=plan)

[!NOTE] deltacat-client intentionally does not depend on the full deltacat runtime. client.catalog.read(...) does not require the thick deltacat package to be installed.

Typical uses for a ReadPlan:

  • pass the plan into client.catalog.read(plan=plan) for direct execution
  • keep planning separate from execution when metadata and data scans happen in different stages of your workflow
  • pass the same plan into dc.read_table(plan=plan) when the full deltacat package is available and you want to stay on the thick public API
  • iterate over file paths from plan.all_files()
  • group work by logical partition with plan.partition_summaries()
  • inspect modality or file type via DataFile.content_type and DataFile.extension
  • hand the plan to a downstream reader that performs the actual data scan

client.catalog.read(...) executes through the shared thin-client execution layer whether the plan was generated inline or passed explicitly. The plan contract carries the metadata needed for direct execution. There is no separate runtime bridge back into thick DeltaCAT for thin plan execution.

[!IMPORTANT] The shared thin-client data execution/materialization path currently supports local paths and s3:// paths only. Plans or staged writes that require gs:// or az:// on the shared IO path fail deterministically until dedicated backend support lands.

For very large tables:

  • request plans in pages with limit and offset
  • leave include_stats=False unless the client needs them
  • keep the API server on the metadata path only and execute the read client-side

For PackDS/training tables at scale (1PB+):

  • use include_files=False to skip file enumeration entirely — returns only snapshot_id, packds_uri, episode_index, shard_manifest, and replication_status (O(deltas), not O(files))
  • use preferred_root to resolve file paths through a replicated root (see Data Placement and Replication)

Staged Writes

Most applications should use client.catalog.write(...) and let the client manage staging and commit automatically.

Use stage(...) when you need explicit control over where files are written, when they are committed, or how prewritten files are registered. It returns a typed WriteSession with the destination directory and write mode.

from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")

session = client.catalog.stage(
    namespace="robotics",
    table="embeddings",
    mode="merge",
    format="parquet",
)

entry = session.entry_for(
    "embeddings/output.parquet",
    records=1024,
)

result = client.catalog.commit(session=session, entries=[entry])

In practice, an application or worker using staged writes will:

  1. call stage(...)
  2. write one or more output files under session.data_dir
  3. build WriteEntry values for those files
  4. call commit(...)

client.catalog.write(...) is the higher-level convenience path. It accepts:

  • PyArrow tables
  • Pandas DataFrames
  • Polars DataFrames
  • local Parquet, Feather, ORC, or AVRO file paths or lists of those file paths
  • local Lance dataset directories

It also manages session liveness automatically while local files are being materialized, and temporarily heartbeats a bound transaction during the same window when needed.

Data Placement and Replication

DeltaCAT supports multi-root catalogs where data files are stored across multiple storage backends (e.g., S3 in us-east-2, OpenStack Swift in PRIMARY, local Lustre). The placement API lets you declare which roots should have copies of a table's data and triggers replication automatically.

Core concepts:

  • Data Root: A named storage backend in the catalog's DataRootConfig (e.g., "aws_s3_iad", "openstack_swift_primary"). Each root has a URI prefix and optional connection configuration.
  • Placement Policy: A table property that declares which data roots should have copies. Once set, new writes are automatically replicated by a background daemon. Stored on the table, applies to all table versions.
  • Backfill: One-shot replication of existing data to a new root. Runs as a chunked, checkpointed orchestration job on dedicated planner/relay workers.
  • Replication Status: Per-root progress tracking. Each delta (unit of data) is tracked individually in the Replica Ledger system table.
  • Preferred Root: When reading via plan, you can request file paths resolved through a specific root. For training tables (PackDS), this uses a whole-snapshot gate: either ALL data + sidecars are available on the preferred root, or everything stays on the primary root.
from deltacat_client import Client

client = Client("https://deltacat-api.example.com", catalog="robotics-prod")

# Set a placement policy: replicate to S3 IAD and backfill existing data
result = client.catalog.place(
    namespace="vision",
    table="annotations",
    roots=["aws_s3_iad"],
    backfill=True,       # replicate existing data (latest active version)
    set_policy=True,     # set ongoing policy for new writes
)
print(result)
# {"table": "annotations", "namespace": "vision", "roots": ["aws_s3_iad"],
#  "policy_set": true, "subscriptions_created": 1, "backfill_jobs": ["replica_backfill:..."]}

Checking replication progress:

status = client.catalog.replication_status(
    namespace="vision",
    table="annotations",
)
print(status)
# {"table_ref": "vision.annotations",
#  "by_root": {"aws_s3_iad": {"complete": 95, "pending": 5, "failed": 0, "total": 100}},
#  "replicated_roots": [],
#  "policy": {"target_roots": ["aws_s3_iad"], "include_sidecars": true},
#  "backfill_status_by_root": {
#    "aws_s3_iad": {
#      "planner_job": {"job_id": "replica_backfill:...", "state": "completed"},
#      "transfer_shards": {"pending": 0, "running": 0, "finalizing": 0,
#                           "completed": 100, "failed": 0, "total": 100},
#      "checkpoint": {"last_partition_id": "...", "last_stream_position": 100},
#      "scope_total_deltas": 100,
#      "completed_deltas": 100,
#      "contiguous_completed_deltas": 100,
#      "blocked_on_gap": false
#    }
#  }}

# Scope to a specific table version:
status = client.catalog.replication_status(
    namespace="vision",
    table="annotations",
    table_version="42",
)

Reading from the closest root:

# plan with preferred_root resolves file paths through the replicated root
plan = client.catalog.plan(
    namespace="vision",
    table="annotations",
    preferred_root="aws_s3_iad",
    include_files=False,  # PackDS training: skip file enumeration, O(deltas) only
)
# plan.packds_uri -> "s3://training-data-iad/.../v42.packds" (if fully replicated)
# plan.preferred_root_used -> true/false (whether the gate passed)

Targeted backfill with selectors:

# Backfill a specific table version without setting ongoing policy:
client.catalog.place(
    namespace="vision",
    table="annotations",
    roots=["aws_s3_iad"],
    table_version="42",
    backfill=True,
    set_policy=False,  # one-shot replication only
)

# Backfill specific partitions:
client.catalog.place(
    namespace="vision",
    table="annotations",
    roots=["aws_s3_iad"],
    partition_filter=[
        {"partition_id": "uuid-abc-123"},
        {"partition_values": ["2026-03-01"]},
    ],
)

Removing a root from the policy:

result = client.catalog.unplace(
    namespace="gear",
    table="annotations",
    root="aws_s3_iad",
)
# Does NOT delete replicated data — only stops future replication

For architecture details (Replica Ledger, ReplicaTriggerScanner daemon, ReplicaSubscriber, backfill orchestration, worker pool routing), see the DeltaCAT Server README and CLAUDE.md sections on Multi-Root Data Replication.

Subscriptions, Publications, Transforms, and Pipelines

These resources define named, repeatable data workflows. They connect to the job system: when a subscription or transform is configured with DispatchMode.CUSTOM, triggering it creates a claimable job that external workers pick up via client.jobs.claim(). (See Managed workflows → Dispatch modes for a short overview.)

How the pieces fit together

Subscription (watches a source table for new data)
    → Transform (defines processing: custom code or column projection)
        → Publication (defines where output goes)
            → Pipeline (groups transforms into a DAG)
  • Subscription: Tracks per-partition watermarks on a source table. When triggered, it detects new data since the last checkpoint. Configured with a dispatch_mode: DispatchMode.LOCAL (server runs it inline), DispatchMode.CUSTOM (creates a job for external workers), or DispatchMode.OSMO (OSMO compute pool).

  • Transform: Binds a subscription (input) to a publication (output) with processing logic. Processing can be:

    • select_columns: Simple column projection (built-in, no custom code)
    • subscriber_class: Fully-qualified Python class implementing the Subscriber ABC (e.g., "my_app.transforms.ConvertXDOF"). The class receives the full DeltaCAT runtime context and can use any framework (Polars, Daft, PyArrow, Pandas, custom code).
  • Publication: Declares where transform output lands (one or more sink tables) with provenance tracking.

  • Pipeline: Groups transforms into a named DAG with topological execution order for coordinated redrive and status monitoring.

How pipelines create jobs

When using DispatchMode.CUSTOM:

  1. You call client.subscriptions.trigger(...) or client.subscriptions.run(...)
  2. The server detects new data and creates a Job in the durable job store
  3. The job context includes source file paths, target staging paths, and credentials
  4. An external worker calls client.jobs.claim(...) to receive the job
  5. The worker processes data and calls client.jobs.complete(...) with the updated watermark for source-consuming jobs
  6. The server commits the output, advances the watermark, and triggers downstream subscriptions automatically

Example: Annotations Pipeline

This example mirrors a real annotations pipeline design: raw annotation files land in OpenStack Swift, a crawler indexes them, a diff subscriber detects changes, and a custom worker converts them to the curated annotations table format.

Steady-state orchestration: the root crawl publication uses trigger={"mode": "schedule", ...} and runs on the server-managed schedule manager (see Periodic triggers). The crawl uses CUSTOM dispatch so worker processes claim crawl jobs from the job store. Downstream transforms omit trigger (default event): when the crawl commits new data to annotations_crawled, the server triggers the diff transform, then the convert transform.

import json

from deltacat_client import Client, DispatchMode

client = Client(
    "https://deltacat-api.example.com",
    catalog="vision-prod",
    bearer_token="pipeline-admin-token",
)

# ── Stage 1: Crawl publication ──────────────────────────────────────
# Scheduled root crawl: the schedule manager dispatches jobs for each cron
# slot. CUSTOM dispatch → workers claim.

crawl_pub = client.publications.create_crawler(
    publication_id="annotations-crawl",
    name="Annotations Crawl",
    sink_namespace="vision",
    sink_table="annotations_crawled",
    crawl_params={"seed_paths": [{"path": "s3://annotation-bucket/annotations"}]},
    trigger={
        "mode": "schedule",
        "schedule": {
            "kind": "cron",
            "cron": "30 8,12 * * *",
            "timezone": "America/Los_Angeles",
        },
    },
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps({"required_worker_tags": ["cpu"]}),
)

# ── Stage 2: Diff transform ─────────────────────────────────────────
# Compares consecutive crawler table versions to produce ADD/UPDATE/DELETE
# deltas. Diff jobs are memory-hungry, so pin them to dedicated workers.
# Default trigger is event-driven when annotations_crawled changes.

diff_transform = client.transforms.create(
    transform_id="annotations-diff",
    name="Annotations Diff",
    source_tables=[{"namespace": "vision", "table": "annotations_crawled"}],
    sink_tables=[{"namespace": "vision", "table": "annotations_diff"}],
    subscriber_class=(
        "deltacat.compute.pipelines.subscriber.diff_subscriber.DiffSubscriber"
    ),
    subscriber_mode="version",
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps({
        "required_worker_tags": ["highmem"],
        "allowed_worker_ids": [
            "diff-worker-useast2-1",
            "diff-worker-useast2-2",
        ],
    }),
    config_json=json.dumps(
        {
            "subscriber_kwargs": {
                "primary_key": "episode_id",
                "version_key": "modified_timestamp_ns",
            }
        }
    ),
)

# ── Stage 3: Convert + Curate transform ─────────────────────────────
# Custom worker converts raw XDOF/LeRobot annotations to the canonical
# schema and writes curated output.

convert_transform = client.transforms.create(
    transform_id="annotations-convert-xform",
    name="XDOF/LeRobot to Canonical Annotations",
    source_tables=[{"namespace": "vision", "table": "annotations_diff"}],
    sink_tables=[{"namespace": "vision", "table": "annotations"}],
    subscriber_class="example.transforms.AnnotationConverter",
    dispatch_mode=DispatchMode.CUSTOM,
    dispatch_config=json.dumps({"required_worker_tags": ["cpu", "us-east-2"]}),
)

# ── Pipeline (groups all stages for coordinated management) ─────────

pipeline = client.pipelines.create(
    pipeline_id="annotations-pipeline",
    name="Annotations Pipeline",
    node_ids=["annotations-convert-xform"],
)

# No manual run()/trigger() in steady state: the publication schedule scanner
# dispatches crawl jobs; downstream transforms run on source-table updates.
# Optional one-shot bootstrap before the first scheduled bucket:
# client.publications.run(crawl_pub.publication_id)

Worker side (runs on a separate CPU fleet):

from deltacat_client import Client

worker = Client(
    "https://deltacat-api.example.com",
    worker_id="cpu-worker-us-east-2-pod-7",
    worker_tags=["cpu", "us-east-2"],
    bearer_token="worker-token-1",
)

while True:
    job = worker.jobs.claim()
    if job is None:
        time.sleep(5)
        continue

    worker.jobs.start_heartbeat(job.job_id, interval_seconds=30)

    # The job context tells the worker what to read and where to write.
    # example.transforms.AnnotationConverter.process_deltas() handles the
    # actual conversion logic using any Python framework.
    for table_ref, files in job.data_access.items():
        for file_info in files.get("files", []):
            print(f"Processing: {file_info['path']}")

    worker.jobs.complete(
        job.job_id,
        records_processed=1000,
        watermark={"partition_watermarks": {"slot-a": 1}, "known_partitions": ["slot-a"]},
    )

Pipeline operations

# Check pipeline health
status = client.pipelines.status("annotations-pipeline")
print(f"Active: {status.raw.get('active_count')}")

# Inspect a subscription's watermark (how far it has processed)
wm = client.subscriptions.get_watermark("annotations-convert")

# Pause a subscription (stops processing until resumed)
client.subscriptions.pause("annotations-convert")
client.subscriptions.resume("annotations-convert")

# Redrive: reset the watermark and reprocess from scratch
client.subscriptions.redrive(
    "annotations-convert",
    rewind_watermark="reset",
)

# Cascade redrive: redrive this subscription and all downstream ones
client.subscriptions.redrive(
    "annotations-crawler",
    cascade=True,
    dry_run=True,  # Preview the plan first
)

Creating a subscription, transform, publication, or pipeline only registers the resource definition. Work is not executed until a control-plane action such as trigger, run, or redrive is invoked.

Training Data / PackDS Workflow

DeltaCAT manages training datasets stored in PackDS format -- an episode-oriented training data layout backed by Lance columnar storage. A PackDS dataset contains a steps.lance directory where each row is a training step, grouped into episodes (e.g., robot manipulation trajectories).

The training workflow lets you register PackDS datasets, get training metadata via plan, and submit preprocessing jobs for episode filtering and shard assignment.

Key terms:

  • Episode index sidecar: A Parquet file placed next to the PackDS data that materializes per-episode metadata (frame counts, task labels, annotation sources). Avoids scanning the full Lance table for every training prep step.
  • Snapshot ID: A 32-hex content-addressed fingerprint of the exact data files in a resolved read plan. Used to key all training sidecars so that the same data always produces the same artifacts.
  • Shard manifest: A Parquet file mapping (episode, step) pairs to GPU shard indices for deterministic distributed data loading.

Register a PackDS dataset

Register an existing PackDS dataset by creating a table with packds_uri in its properties, then committing the pre-written Lance files.

For DeltaCAT to recognize the table as PackDS-backed for training workflows, both conditions must be true:

  1. the committed files have Lance content type and .packds in their URLs
  2. the table metadata carries packds_uri
from deltacat_client import Client

client = Client("http://localhost:8080")

# 1. Create namespace (if needed) and table with packds_uri property.
client.catalog.create_namespace(namespace="training")
client.catalog.create_table(
    namespace="training",
    table="episodes_v42",
    content_type="lance",
    table_properties={
        "packds_uri": "s3://training-data/training/episodes_v42.packds",
    },
)

# If the table already exists, set packds_uri via alter_table:
# client.catalog.alter_table(
#     namespace="training",
#     table="episodes_v42",
#     table_properties={"packds_uri": "s3://training-data/training/episodes_v42.packds"},
# )

# 2. Build manifest entries describing the pre-written .packds/steps.lance files.
entries = [
    {
        "path": "s3://training-data/training/episodes_v42.packds/steps.lance/_versions/1.manifest",
        "record_count": 0,
        "content_length": 5321,
    },
    {
        "path": "s3://training-data/training/episodes_v42.packds/steps.lance/data/0000.lance",
        "record_count": 125000,
        "content_length": 18350211,
    },
    {
        "path": "s3://training-data/training/episodes_v42.packds/steps.lance/_indices/uuid.idx",
        "record_count": 0,
        "content_length": 81244,
    },
]

# For the full manifest-entry construction logic, see:
# deltacat.utils.lance_utils.build_manifest_entries()

# Commit the pre-written Lance files.
result = client.catalog.commit(
    namespace="training",
    table="episodes_v42",
    entries=entries,
    format="lance",
    mode="merge",
)

After a write that DeltaCAT recognizes as PackDS-backed, DeltaCAT automatically submits an index_build job to build the episode-index sidecar next to the PackDS data. Recognition depends on the two rules above: Lance-backed committed files plus packds_uri/.packds hints.

Get training metadata via plan

plan = client.catalog.plan(
    namespace="training",
    table="episodes_v42",
)

print(plan.snapshot_id)        # "a1b2c3d4..." (32-hex content-addressed ID)
print(plan.packds_uri)         # "s3://bucket/data/v42.packds"
print(plan.episode_index)      # {"uri": "s3://.../__episode_index/.../index.parquet",
                                #  "status": "ready"}  # or "pending", "missing", "failed"

The snapshot_id is a deterministic fingerprint of the exact data files backing this table version. Two calls to plan that resolve the same files always return the same snapshot_id. The episode_index.status tells you whether the sidecar Parquet file has been built:

  • ready -- sidecar exists, ready for use
  • pending -- build job is running
  • missing -- no job submitted yet (sidecar will be built on next write or reconciliation)
  • failed -- build job failed (check server logs)

Submit a preprocessing job

response = client.jobs.submit_preprocessing_job(
    source_table={
        "namespace": "training",
        "table": "episodes_v42",
    },
    filter_predicate={"and": [
        {"eq": ["task", "pick_and_place"]},
        {"gte": ["total_frames", 50]},
    ]},
    shard_recipe={
        "shard_size": 64,
        "episode_sampling_rate": 1.0,
        "action_horizon": 8,
        "seed": 42,
    },
)

print(response.job_id)              # "preprocess:..."
print(response.snapshot_id)         # Frozen at submit time
print(response.packds_uri)          # "s3://bucket/data/v42.packds"
print(response.episode_index_exists)  # True/False

The shard recipe parameters control how episodes are distributed across GPU shards: shard_size is the target number of training steps per shard, episode_sampling_rate controls what fraction of episodes to include, action_horizon determines the effective length of each episode (steps that can start a complete action sequence), and seed ensures reproducibility.

The job is submitted to the server's job queue. A preprocessing worker claims it, builds/loads the episode index, applies the filter, computes shard assignments, and writes a shard manifest.

Two preprocessing modes are supported:

  • Mode A: omit output_table to compute only a shard manifest under the frozen source snapshot_id.
  • Mode B: provide output_table to materialize a filtered PackDS snapshot as a DeltaCAT output, commit it with a deterministic preprocessing_job_id, write the shard manifest under the committed output snapshot_id, and enqueue the output episode-index build.

Retrieve the shard manifest

After the preprocessing job completes, get the shard manifest via plan with a recipe_hash:

plan = client.catalog.plan(
    namespace="training",
    table="episodes_v42",
    recipe_hash="abc123",
)

print(plan.shard_manifest)  # {"uri": "s3://.../__shard_manifests/.../abc123.parquet"}

MCP tools for training

When using DeltaCAT via MCP (e.g., from Claude Code), the relevant tools are:

  • deltacat_plan_read -- returns snapshot_id, packds_uri, episode_index status, and optional shard_manifest. Use include_files=false for PackDS tables at scale (returns metadata only, O(deltas) not O(files)). Use preferred_root to resolve paths through a replicated root.
  • deltacat_submit_preprocessing_job -- submits a preprocessing job with filter predicates and shard recipes
  • deltacat_manage_placement -- set data placement policies to replicate training data across storage roots (e.g., OpenStack Swift to S3 for GPU access)

Further reading

For the full architectural reference -- including the episode index schema, shard manifest algorithm, structured filter predicate format, the relationship between the episode index sidecar and Lance native indexes, the row-layout contiguity invariant, and reconciliation internals -- see deltacat/compute/training/README.md.

Configuration

The root Client can carry common deployment settings:

  • catalog
  • headers
  • user_id
  • bearer_token
  • verify_ssl
  • follow_redirects
  • timeout
import os

from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    catalog="robotics-prod",
    bearer_token=os.environ["DELTACAT_TOKEN"],
    user_id="robotics-trainer@example.com",
    timeout=60,
    verify_ssl="/etc/ssl/certs/internal-ca.pem",
)

When catalog is configured on the root client, resource methods use it by default unless an explicit catalog= override is provided.

Custom headers can be attached for tracing, correlation, or proxy deployments:

client = Client(
    "https://deltacat-api.example.com",
    bearer_token="my-token",
    headers={
        "x-trace-id": "run-12345",
        "x-correlation-id": "pipeline-abc",
    },
)

[!NOTE] Identity headers like X-DeltaCAT-User are only trusted when the server is configured with DELTACAT_TRUST_IDENTITY_HEADERS=true (Trusted Proxy mode). For direct auth, use bearer_token instead.

The client supports both synchronous and asynchronous lifecycle management:

from deltacat_client import Client

with Client("https://deltacat-api.example.com") as client:
    tables = client.catalog.list_tables(namespace="robotics")

The context manager form is useful for long-lived services and scripts that want explicit control over the underlying HTTP client lifecycle.

Data Model Highlights

Client exposes typed models for the most common server responses, including:

  • ClaimedJob -- claimed job with job_id, job_token, data_access, sink_upload, credentials
  • JobStatus -- job lifecycle state, progress, error
  • ReadPlan -- file manifest with schema, partitions, and content types
  • ScanTask -- individual scan unit within a read plan
  • DataFile -- file path, size, content type, record count
  • WriteSession -- durable write session with staged data path
  • WriteEntry -- explicit file metadata for commit
  • SubscriptionInfo, SubscriptionWatermark, WatermarkState -- subscription lifecycle
  • PublicationInfo, TransformInfo, PipelineInfo -- orchestration resources
  • PipelineActionResult -- pipeline operation results

These models are designed to make file and manifest-oriented workflows easier to work with in multimodal environments.

Supported Read Paths

DeltaCAT distinguishes between:

  • Preview: bounded server-side samples via preview_data
  • Plan: lightweight metadata-only read planning via plan

For scalable reads, use plan and execute the resulting plan client-side.

Package Architecture

This distribution contains:

  • deltacat_client: the public Python facade
  • deltacat_generated_client: bundled generated REST bindings used internally

The generated package is treated as an implementation detail. Consumers should import from deltacat_client.

Regenerating the Bundled Client

From the DeltaCAT repository root:

make export-openapi
make generate-deltacat-client

This exports the current REST OpenAPI schema and regenerates the bundled low-level client bindings.

Development Notes

The standalone deltacat-client package is tested, built, and release-hardened independently from the main deltacat package:

  • dedicated make test-client target
  • generated-client drift check
  • wheel build check
  • fresh-environment install smoke test

See README-development.md in the repository root for the current development targets and CI workflow.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

deltacat_client-0.1.0.tar.gz (214.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

deltacat_client-0.1.0-py3-none-any.whl (553.2 kB view details)

Uploaded Python 3

File details

Details for the file deltacat_client-0.1.0.tar.gz.

File metadata

  • Download URL: deltacat_client-0.1.0.tar.gz
  • Upload date:
  • Size: 214.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for deltacat_client-0.1.0.tar.gz
Algorithm Hash digest
SHA256 b87f910b448ebae2153dc9e7282c0855c684d02f86446602b614eee79a69a57f
MD5 5034337f3e4fc0a71c69e67fcfa664ed
BLAKE2b-256 257e58b148a6b7564d4e43d1b230aba26d26a6edda04a8585d708341eec911f6

See more details on using hashes here.

File details

Details for the file deltacat_client-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: deltacat_client-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 553.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for deltacat_client-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 683ccb8bb08e137649f40046b06633dd36bd9f018b3ab4303d54972cce6c17fb
MD5 35c93de7ff774215c963ad2453e4d657
BLAKE2b-256 e23589a871789f5f98a27c2c0005932c61bf193513ca81ea6664ed8c71441539

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page