Skip to main content

Lightweight REST client and thin MCP scaffolding for the DeltaCAT API server.

Project description

deltacat logo

deltacat-client is the primary Python client package for DeltaCAT. It lets you read and write tables, run jobs, and build data pipelines against a DeltaCAT API server without installing the full storage, compute, or server runtime stack.

The client talks to a DeltaCAT server over HTTP. Metadata operations (schema validation, transaction management, compaction) stay server-side, while large data reads and writes go directly between the client and cloud storage using short-lived credentials the server vends on demand.

Overview

The client is organized around a root Client object with resource-oriented subclients:

Subclient Purpose
client.catalog Namespaces, tables, read/write, transactions
client.jobs Job submission, claiming, lifecycle, progress
client.publications Incremental producers into the DeltaCAT Lakehouse (pipeline root nodes)
client.subscriptions Incremental consumers from the DeltaCAT Lakehouse (pipeline leaf nodes)
client.transforms Incrementally transform data within the DeltaCAT Lakehouse (pipeline intermediate nodes)
client.pipelines Wire publications, transforms, and subscriptions into a connected DAG

Installation

pip install deltacat-client

Optional extras for local data materialization:

pip install "deltacat-client[all]"    # Full client-side read/write stack
pip install "deltacat-client[pandas]" # Pandas DataFrame support
pip install "deltacat-client[polars]" # Polars DataFrame support
pip install "deltacat-client[daft]"   # Daft DataFrame support
pip install "deltacat-client[lance]"  # Lance dataset support
pip install "deltacat-client[mcp]"    # Typed async MCP HTTP client

Getting Started

Before using the client, you need a running DeltaCAT API server. See Server Setup for instructions.

DeltaCAT lets you manage Tables across one or more Catalogs. A Table is a named collection of data files. A Catalog is a named data lake that contains tables. For the full data model, see the DeltaCAT README.

Quick Start

from deltacat_client import Client
import pyarrow as pa

# Connect to a DeltaCAT server
client = Client("http://localhost:8080")

# Write data to a table (table is created automatically)
data = pa.table({
    "id": [1, 2, 3],
    "name": ["Cheshire", "Dinah", "Felix"],
    "age": [3, 7, 5],
})
client.catalog.write(data, table="cool_cats", mode="auto", format="parquet")

# Read the data back
df = client.catalog.read(table="cool_cats", read_as="pandas")
print(df)

Core Concepts

Expand the sections below to see examples of core client operations.

Authentication

When connecting to a production server with auth enabled, provide a bearer token:

from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    bearer_token="your-api-token",
)

Admins can also onboard a new user, grant their initial access, and receive a one-time token to share over a secure out-of-band channel:

from deltacat_client import Client

admin = Client(
    "https://deltacat-api.example.com",
    bearer_token="admin-api-token",
)

created = admin.auth.create_user(
    user_id="newadmin@example.com",
    display_name="New Admin",
    email="newadmin@example.com",
    initial_role="ADMIN",
    resource_type="catalog",
    resource_name="*",
    issue_token_label="bootstrap",
    idempotency_key="create-newadmin-001",
)

bootstrap_token = created.token.token
print("Share this token once via a secure secret channel:", bootstrap_token)

The server validates your token and maps it to a user identity for permission checks (read, write, admin). The client automatically includes the token on every request. For data file access, the server vends short-lived STS credentials that the client uses to read from and write to cloud storage directly.

See Configuration for the detailed auth model, bootstrap tokens, and role-based access control.

Reading Data
# Read a table as a PyArrow table (the default format)
arrow_table = client.catalog.read(namespace="robotics", table="episodes")

# Read as Pandas, Polars, or Daft
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas")
polars_result = client.catalog.read(namespace="robotics", table="episodes", read_as="polars")
daft_df = client.catalog.read(namespace="robotics", table="episodes", read_as="daft")

# Filter rows and limit results
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    filter_predicate={"eq": ["task", "pick_screwdriver"]},
    limit=5000,
)

# Time travel: read the table as it existed at a prior point in time.
# The as_of value is a nanosecond-precision Unix epoch timestamp.
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    as_of=1712697600000000000,
)

See Reading and Writing for scalable reads, lazy vs. eager materialization, and format-specific behavior. For PackDS-backed training tables, see Training Data.

Writing Data

The client supports writing PyArrow tables, Pandas DataFrames, Polars DataFrames, Daft DataFrames, NumPy arrays, Ray Datasets, and local files (Parquet, CSV, TSV, PSV, Feather, JSON, ORC, AVRO, Lance).

import pyarrow as pa

# Write data to a table.
# The table and namespace are created automatically when mode is "auto".
data = pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]})
client.catalog.write(data, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Append more data
data2 = pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]})
client.catalog.write(data2, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Write a Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"episode_id": [5], "score": [0.93]})
client.catalog.write(df, namespace="robotics", table="predictions", mode="auto", format="parquet")

# For precise control over schema, partitioning, and sort order,
# create the table explicitly before writing.
client.catalog.create_table(
    namespace="robotics",
    table="scored_episodes",
    schema=pa.schema([
        pa.field("episode_id", pa.int64()),
        pa.field("score", pa.float64()),
        pa.field("episode_day", pa.string()),
    ]),
    partition_scheme={"keys": [{"key": ["episode_day"], "transform": "identity"}]},
    sort_scheme={"keys": [{"key": ["episode_id"], "sort_order": "descending"}]},
    auto_create_namespace=True,
)

data = pa.table({
    "episode_id": [1, 2],
    "score": [0.95, 0.87],
    "episode_day": ["2025-01-15", "2025-01-15"],
})
client.catalog.write(data, namespace="robotics", table="scored_episodes", mode="auto", format="parquet")

# Evolve a table's schema after creation
client.catalog.alter_table(
    namespace="robotics",
    table="predictions",
    schema_updates={
        "operations": [
            {"op": "add", "field": {"name": "confidence", "type": "float64"}}
        ]
    },
)

See Reading and Writing for write modes, staged writes, path-based writes with create_if_missing, and the full list of supported formats and schema inputs. For PackDS training data writes, see Training Data.

Transactions

Transactions provide atomic multi-step operations with automatic heartbeating and rollback on failure.

import pyarrow as pa

with client.transaction(commit_message="Backfill predictions") as tx:
    client.catalog.write(
        pa.table({"episode_id": [10, 11], "score": [0.88, 0.92]}),
        namespace="robotics",
        table="predictions",
        mode="add",
        format="parquet",
    )

    # Reads within the transaction see uncommitted writes
    df = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
    )
    print(f"Rows visible in transaction: {len(df)}")
# Transaction commits automatically on exit; aborts on exception

See Transactions for time-travel reads, manual commit/abort, and transaction rules.

Jobs

DeltaCAT uses a durable job system for background work (compaction, data relay, subscription processing). The client can submit, monitor, and execute jobs.

# List all jobs
jobs = client.jobs.list()
for job in jobs:
    print(f"{job.job_id}: {job.state}")

# Submit a compaction job
result = client.jobs.submit_compaction(table="predictions")
print(f"Submitted: {result.job_id}")

# Wait for it to complete
status = client.jobs.wait(result.job_id, timeout_seconds=120)
print(f"Final state: {status.state}")

Workers claim and execute jobs. Any process can act as a worker:

# Claim the next pending job matching our worker tags
job = client.jobs.claim(worker_tags=["subscriber"])
if job:
    print(f"Claimed {job.job_id} (type: {job.context.get('job_type')})")

    # Report fine-grained progress and declare the deadline for the next chunk
    client.jobs.emit_event(
        job,
        event_name="batch_started",
        completed=1,
        expected=4,
        metadata={"batch": 1},
        heartbeat_timeout_seconds=300,
    )

    # Do the work...

    # Mark complete. Source-consuming jobs must include the advanced
    # watermark so the server can persist progress correctly.
    client.jobs.complete(
        job,
        records_processed=1000,
        watermark={
            "partition_watermarks": {"analytics.events": 42},
            "known_partitions": ["analytics.events"],
        },
    )

See Jobs and Workers for job types, worker routing, heartbeat rules, retry semantics, and dispatch modes.

Publications

Publications are incremental producers that write new data into the DeltaCAT Lakehouse. They sit at the root of a pipeline DAG and can be triggered manually or fired by an upstream event.

# Create a publication that writes to a sink table
client.publications.create(
    publication_id="episode_publisher",
    name="Episode Publisher",
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="local",
)

# Run the publication
result = client.publications.run("episode_publisher")
print(f"Published: {result}")

See Pipelines for publication configuration and DAG construction.

Subscriptions

Subscriptions are incremental consumers of DeltaCAT tables. They sit at the leaves of a pipeline DAG, tracking a per-partition watermark so each run picks up only new data.

# Create a subscription that watches for new data in "raw_episodes"
client.subscriptions.create(
    subscriber_id="episode_processor",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Trigger processing (dispatches a job to a subscriber worker)
client.subscriptions.trigger("episode_processor")

# Check watermark state
wm = client.subscriptions.get_watermark("episode_processor")
print(f"Watermark: {wm.watermark}")

# Pause / resume / delete
client.subscriptions.pause("episode_processor")
client.subscriptions.resume("episode_processor")
client.subscriptions.delete("episode_processor")

See Pipelines for subscription modes (delta vs. version), triggers, and redrive.

Transforms

Transforms are the intermediate nodes of a pipeline DAG. Each transform reads from one or more source tables, applies processing logic, and writes to one or more sink tables.

# Create a transform: raw_episodes -> clean_episodes
client.transforms.create(
    transform_id="episode_cleaner",
    name="Episode Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="custom",
)

# Trigger transform processing
client.subscriptions.trigger("episode_cleaner")

# Pause / resume
client.transforms.pause("episode_cleaner")
client.transforms.resume("episode_cleaner")

See Pipelines for transform configuration, redrive, and rollback.

Pipelines

Pipelines wire publications, transforms, and subscriptions into a connected DAG. When an upstream node completes, downstream nodes are triggered automatically.

# First, create connected pipeline nodes
client.publications.create(
    publication_id="ingest_pub",
    name="Raw Ingest Publisher",
    sink_tables=[{"namespace": "robotics", "table": "raw_data"}],
    dispatch_mode="local",
)
client.transforms.create(
    transform_id="clean",
    name="Data Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_data"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_data"}],
    dispatch_mode="custom",
)
client.subscriptions.create(
    subscriber_id="consume_clean",
    source_tables=[{"namespace": "robotics", "table": "clean_data"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Preview the connected pipeline from an interior seed node
preview = client.pipelines.discover(seed_node_ids=["clean"])
print(preview.execution_order)

# Option A: persist exactly the previewed node_ids
client.pipelines.create(
    pipeline_id="etl_pipeline_pinned",
    name="ETL Pipeline (Pinned)",
    node_ids=preview.node_ids,
)

# Option B: create directly from seed node ids
client.pipelines.create(
    pipeline_id="etl_pipeline_seeded",
    name="ETL Pipeline (Seeded)",
    seed_node_ids=["clean"],
)

# Check pipeline status
status = client.pipelines.status("etl_pipeline_seeded")

# Pause / resume all nodes at once
client.pipelines.pause("etl_pipeline_seeded")
client.pipelines.resume("etl_pipeline_seeded")

See Pipelines for DAG construction, discovery semantics, redrive, rollback, and stored-order validation.

Data Placement and Replication

DeltaCAT catalogs can span multiple storage backends (S3, SwiftStack, Lustre). Data placement lets you replicate tables across roots so readers access data from the closest location.

# Place a table on an additional storage root for replication
client.catalog.place(
    namespace="robotics",
    table="episodes",
    roots=["aws_s3_iad"],       # Replicate to this root
    backfill=True,               # Copy existing data too
)

# Check replication status
status = client.catalog.replication_status(namespace="robotics", table="episodes")
print(f"Roots: {status}")

# Read from the closest root (server resolves automatically)
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    root="aws_s3_iad",          # Prefer this root for file paths
)

# Remove a replication target
client.catalog.unplace(namespace="robotics", table="episodes", root="aws_s3_iad")

New writes are automatically replicated to all placed roots via a background subscriber. Reads with root= get file paths rewritten through the preferred root when data is available there.

See Configuration for data root setup and multi-root catalog configuration.

Scheduled Processing

Subscriptions and transforms can run on a schedule instead of being triggered manually. DeltaCAT supports interval-based and cron-based scheduling.

# Process new data every 5 minutes
client.subscriptions.create(
    subscriber_id="metrics_ingester",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)

# Process at 2am UTC daily using a cron expression
client.subscriptions.create(
    subscriber_id="nightly_aggregator",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"cron": "0 2 * * *", "timezone": "UTC"}},
)

# Event-driven: only runs when triggered manually or by an upstream pipeline node
client.subscriptions.create(
    subscriber_id="on_demand_processor",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "event"},
)

See Configuration for trigger options and scheduling details.

Agentic Access (MCP)

DeltaCAT ships with a built-in Model Context Protocol server so AI agents (for example, Claude Code) can browse catalogs, inspect schemas, plan reads, and stage writes through natural language instead of hand-written Python.

Most hand-written code should just use the REST Client(...) shown above. Reach for MCP when you want to:

  • let an agent explore and operate on your catalog conversationally
  • embed DeltaCAT as a tool in an agentic application
  • use a typed async Python wrapper over the MCP HTTP surface (deltacat-client[mcp]) from code that is already agentic in shape

See the MCP Server guide for the full tool reference, the typed async client, and recipes for agent-driven catalog workflows.

Server Setup

The DeltaCAT client connects to a DeltaCAT API server. For setup instructions, see:

Additional Resources

Guide Description
Reading and Writing Read plans, write modes, staged writes, supported formats
Training Data PackDS tables, episode indexes, shard manifests, distributed training
Transactions Transaction lifecycle, time travel, rules and limitations
Jobs and Workers Job types, claiming, heartbeat, worker routing, authentication
Pipelines Publications, transforms, subscriptions, DAGs, redrive
Configuration Auth, dispatch modes, triggers, data placement
Maintainer Workflow Relationship between REST and MCP, generated bindings, facade updates, and validation guards
Architecture Package boundary, generated client, development notes
Client Compatibility Runbook Promotion-time packaged client/server compatibility validation

For the core DeltaCAT data model, storage architecture, and catalog APIs, see the DeltaCAT documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

deltacat_client-0.1.5.tar.gz (226.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

deltacat_client-0.1.5-py3-none-any.whl (641.0 kB view details)

Uploaded Python 3

File details

Details for the file deltacat_client-0.1.5.tar.gz.

File metadata

  • Download URL: deltacat_client-0.1.5.tar.gz
  • Upload date:
  • Size: 226.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for deltacat_client-0.1.5.tar.gz
Algorithm Hash digest
SHA256 5801a4aa3ddc22eba1890a5f6e2aac840b2fa4dc3d2fc286edc314a2f8b7b1c8
MD5 4e9783a9a88de06361431049e970d6e3
BLAKE2b-256 b64ccf87c8f5d78b459f7f84d2012f18d8704ed695382529a4cb79b32c5ac318

See more details on using hashes here.

File details

Details for the file deltacat_client-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: deltacat_client-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 641.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for deltacat_client-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 00ebdae95d1ac1143945804c40119f3e95f1cb53b692bafec03e78e362217207
MD5 0f2e1ab43e203f0ccabf36c6425f852d
BLAKE2b-256 8d29fc5cac982b9b4e39eb7d6cd660f1375935452a499ed71ff3484db17fc8ac

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page