Lightweight REST client and thin MCP scaffolding for the DeltaCAT API server.
Project description
deltacat-client is the primary thin Python client package for
DeltaCAT. It lets you read and
write tables, manage jobs, and build data pipelines on a DeltaCAT API server
without installing the full storage, compute, or server runtime stack.
The client talks to a DeltaCAT server over HTTP. Data files are read from and written to cloud storage (S3, GCS, Azure) directly by the client using server-vended credentials. Metadata operations (schema validation, transaction management, compaction triggers) stay server-side.
Overview
The client is organized around a root Client object with resource-oriented subclients:
| Subclient | Purpose |
|---|---|
client.catalog |
Namespaces, tables, read/write, transactions |
client.jobs |
Job submission, claiming, lifecycle, progress |
client.subscriptions |
Incremental data processing pipelines |
client.publications |
Data publishing workflows |
client.transforms |
Source-to-sink data transformations |
client.pipelines |
Multi-stage pipeline orchestration |
Installation
pip install deltacat-client
Package roles:
pip install "deltacat[server]" # Host the DeltaCAT REST/MCP server
pip install "deltacat-client" # Thin REST client
pip install "deltacat-io-core[all]" # Shared local execution only
pip install "deltacat-client[mcp]" # Add the thin MCP HTTP transport dependency set
Optional extras for local data materialization:
pip install "deltacat-client[all]" # Full client-side read/write stack
pip install "deltacat-client[pandas]" # Pandas DataFrame support
pip install "deltacat-client[polars]" # Polars DataFrame support
pip install "deltacat-client[lance]" # Lance dataset support
pip install "deltacat-client[mcp]" # Thin MCP HTTP transport dependency set
The deltacat_client.mcp namespace ships in the base deltacat-client wheel.
The optional [mcp] extra adds the thin MCP HTTP transport dependency set.
Today the typed MCP client supports:
- catalog/data reads
- staged writes
- typed job observation
- typed subscription, publication, transform, and pipeline observation
- typed control/execution actions where the server contract is now promoted:
pause,resume,run,trigger, andredriveas applicable
Worker lifecycle and admin/system-job submission flows still remain raw MCP or REST surfaces.
Getting Started
Before using the client, you need a running DeltaCAT API server. See Server Setup for instructions.
Quick Start
from deltacat_client import Client
import pyarrow as pa
# Connect to a DeltaCAT server
client = Client("http://localhost:8080")
# Write data to a table (table is created automatically)
data = pa.table({
"id": [1, 2, 3],
"name": ["Cheshire", "Dinah", "Felix"],
"age": [3, 7, 5],
})
client.catalog.write(data, namespace="default", table="cool_cats", mode="auto", format="parquet")
# Read the data back
df = client.catalog.read(namespace="default", table="cool_cats", read_as="pandas")
print(df)
Core Concepts
DeltaCAT lets you manage Tables across one or more Catalogs. A Table is a named collection of data files. A Catalog is a named data lake that contains tables. For more on DeltaCAT's data model, see the DeltaCAT README.
Expand the sections below to see examples of core client operations.
Authentication
When connecting to a production server with auth enabled, provide a bearer token:
from deltacat_client import Client
client = Client(
"https://deltacat-api.example.com",
bearer_token="your-api-token",
)
Admins can also onboard a new user, grant their initial access, and receive a one-time token to share over a secure out-of-band channel:
from deltacat_client import Client
admin = Client(
"https://deltacat-api.example.com",
bearer_token="admin-api-token",
)
created = admin.auth.create_user(
user_id="newadmin@example.com",
display_name="New Admin",
email="newadmin@example.com",
initial_role="ADMIN",
resource_type="catalog",
resource_name="*",
issue_token_label="bootstrap",
idempotency_key="create-newadmin-001",
)
bootstrap_token = created.token.token
print("Share this token once via a secure secret channel:", bootstrap_token)
The server validates your token and maps it to a user identity for permission checks (read, write, admin). The client automatically includes the token on every request. For data file access, the server vends short-lived STS credentials that the client uses to read from and write to cloud storage directly.
See Configuration for the detailed auth model, bootstrap tokens, and role-based access control.
Reading Data
# Read a full table as a Pandas DataFrame
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas")
# Read as PyArrow, Polars, or NumPy
arrow_table = client.catalog.read(namespace="robotics", table="episodes", read_as="pyarrow")
polars_df = client.catalog.read(namespace="robotics", table="episodes", read_as="polars")
# Filter and limit
df = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
filter_predicate={"eq": ["task", "pick_screwdriver"]},
limit=5000,
)
# Read a specific table version (time travel)
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas", as_of=1712697600000000000)
Writing Data
The client supports writing PyArrow tables, Pandas DataFrames, Polars DataFrames, Daft DataFrames, NumPy arrays, Ray Datasets, and local files (Parquet, CSV, TSV, PSV, Feather, JSON, ORC, AVRO, Lance).
import pyarrow as pa
# Explicit table creation with layout configuration
client.catalog.create_table(
namespace="robotics",
table="predictions",
schema_def=[
{"name": "episode_id", "type": "int64", "is_merge_key": True},
{"name": "score", "type": "float64"},
{"name": "episode_day", "type": "string"},
],
partition_scheme={"keys": [{"key": ["episode_day"], "transform": "identity"}]},
sort_scheme={"keys": [{"key": ["episode_id"], "sort_order": "descending"}]},
table_version_properties={"schema_evolution_mode": "manual"},
namespace_properties={"owner": "robotics-team"},
auto_create_namespace=True,
content_type="parquet",
)
# Write data (creates the table if it doesn't exist)
data = pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]})
client.catalog.write(data, namespace="robotics", table="predictions", mode="auto", format="parquet")
# Bootstrap a missing table during a staged or high-level write
client.catalog.write(
data,
namespace="robotics",
table="predictions_v2",
mode="auto",
format="parquet",
create_if_missing={
"schema_def": [
{"name": "episode_id", "type": "int64"},
{"name": "episode_day", "type": "string"},
],
"partition_scheme": {"keys": [{"key": ["episode_day"], "transform": "identity"}]},
"auto_create_namespace": True,
},
)
# Append more data
data2 = pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]})
client.catalog.write(data2, namespace="robotics", table="predictions", mode="auto", format="parquet")
# Write a Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"episode_id": [5], "score": [0.93]})
client.catalog.write(df, namespace="robotics", table="predictions", mode="auto", format="parquet")
# Explicit alter_table evolution
client.catalog.alter_table(
namespace="robotics",
table="predictions",
table_version="1",
sort_scheme={
"keys": [
{
"key": ["episode_id"],
"sort_order": "descending",
"null_order": "at_start",
"transform": "identity",
}
]
},
schema_updates={
"operations": [
{"op": "add", "field": {"name": "count", "type": "int64"}}
]
},
)
Transactions
Transactions provide atomic multi-step operations with automatic heartbeating and rollback on failure.
import pyarrow as pa
with client.transaction(commit_message="Backfill predictions") as tx:
client.catalog.write(
pa.table({"episode_id": [10, 11], "score": [0.88, 0.92]}),
namespace="robotics",
table="predictions",
mode="add",
format="parquet",
)
# Reads within the transaction see uncommitted writes
df = client.catalog.read(
namespace="robotics",
table="predictions",
read_as="pandas",
)
print(f"Rows visible in transaction: {len(df)}")
# Transaction commits automatically on exit; aborts on exception
See Transactions for time-travel reads, manual commit/abort, and transaction rules.
Jobs
DeltaCAT uses a durable job system for background work (compaction, data relay, subscription processing). The client can submit, monitor, and execute jobs.
# List all jobs
jobs = client.jobs.list()
for job in jobs:
print(f"{job.job_id}: {job.state}")
# Submit a compaction job
result = client.jobs.submit_compaction(namespace="default", table="predictions")
print(f"Submitted: {result.job_id}")
# Wait for it to complete
status = client.jobs.wait(result.job_id, timeout_seconds=120)
print(f"Final state: {status.state}")
Workers claim and execute jobs. Any process can act as a worker:
# Claim the next pending job matching our worker tags
job = client.jobs.claim(worker_tags=["subscriber"])
if job:
print(f"Claimed {job.job_id} (type: {job.context.get('job_type')})")
# Report fine-grained progress and declare the deadline for the next chunk
client.jobs.emit_event(
job,
event_name="batch_started",
completed=1,
expected=4,
metadata={"batch": 1},
heartbeat_timeout_seconds=300,
)
# Do the work...
# Mark complete. Source-consuming jobs must include the advanced
# watermark so the server can persist progress correctly.
client.jobs.complete(
job,
records_processed=1000,
watermark={
"partition_watermarks": {"analytics.events": 42},
"known_partitions": ["analytics.events"],
},
)
The server can also respond with retryable post-commit repair errors after the
job is already terminal in the durable store. Workers should retry the same
complete() or terminal fail() call on the same claimed attempt when they
see retryable job_completion_* or job_failure_* repair errors, and only
stop immediately on non-retryable ownership-loss responses such as
job_zombie_owner.
See Jobs and Workers for job types, worker routing, authentication, and dispatch modes.
Subscriptions
Subscriptions track changes to source tables and process new data incrementally. They maintain a watermark so each run picks up only new data.
# Create a subscription that watches for new data in "raw_episodes"
client.subscriptions.create(
subscriber_id="episode_processor",
source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
subscriber_type="custom",
dispatch_mode="custom",
)
# Trigger processing (dispatches a job to a subscriber worker)
client.subscriptions.trigger("episode_processor")
# Check watermark state
wm = client.subscriptions.get_watermark("episode_processor")
print(f"Watermark: {wm.watermark}")
# Pause / resume / delete
client.subscriptions.pause("episode_processor")
client.subscriptions.resume("episode_processor")
client.subscriptions.delete("episode_processor")
See Pipelines for subscription modes (delta vs. version), triggers, and redrive.
Publications
Publications write processed data to sink tables. They can be triggered manually or wired into a pipeline after a subscription.
# Create a publication that writes to a sink table
client.publications.create(
publication_id="episode_publisher",
name="Episode Publisher",
sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
dispatch_mode="local",
)
# Run the publication
result = client.publications.run("episode_publisher")
print(f"Published: {result}")
See Pipelines for connecting publications to subscriptions.
Transforms
Transforms read from source tables, apply processing logic, and write to sink tables. They combine subscription (source tracking) with publication (sink writing) in a single node.
# Create a transform: raw_episodes -> clean_episodes
client.transforms.create(
transform_id="episode_cleaner",
name="Episode Cleaner",
source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
dispatch_mode="custom",
)
# Trigger transform processing
client.subscriptions.trigger("episode_cleaner")
# Pause / resume
client.transforms.pause("episode_cleaner")
client.transforms.resume("episode_cleaner")
See Pipelines for transform configuration, redrive, and rollback.
Pipelines
Pipelines wire subscriptions, transforms, and publications into a connected DAG. When an upstream node completes, downstream nodes are triggered automatically.
# First, create the individual nodes
client.subscriptions.create(
subscriber_id="ingest",
source_tables=[{"namespace": "robotics", "table": "raw_data"}],
subscriber_type="custom",
dispatch_mode="custom",
)
client.transforms.create(
transform_id="clean",
name="Data Cleaner",
source_tables=[{"namespace": "robotics", "table": "raw_data"}],
sink_tables=[{"namespace": "robotics", "table": "clean_data"}],
dispatch_mode="custom",
)
client.publications.create(
publication_id="publish",
name="Data Publisher",
sink_tables=[{"namespace": "robotics", "table": "final_data"}],
dispatch_mode="local",
)
# Wire them into a pipeline (nodes are connected in DAG order)
client.pipelines.create(
pipeline_id="etl_pipeline",
name="ETL Pipeline",
node_ids=["ingest", "clean", "publish"],
)
# Check pipeline status
status = client.pipelines.status("etl_pipeline")
# Pause / resume all nodes at once
client.pipelines.pause("etl_pipeline")
client.pipelines.resume("etl_pipeline")
See Pipelines for DAG construction, redrive, and pipeline operations.
Data Placement and Replication
DeltaCAT catalogs can span multiple storage backends (S3, SwiftStack, Lustre). Data placement lets you replicate tables across roots so readers access data from the closest location.
# Place a table on an additional storage root for replication
client.catalog.place(
namespace="robotics",
table="episodes",
roots=["aws_s3_iad"], # Replicate to this root
backfill=True, # Copy existing data too
)
# Check replication status
status = client.catalog.replication_status(namespace="robotics", table="episodes")
print(f"Roots: {status}")
# Read from the closest root (server resolves automatically)
df = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
root="aws_s3_iad", # Prefer this root for file paths
)
# Remove a replication target
client.catalog.unplace(namespace="robotics", table="episodes", roots=["aws_s3_iad"])
New writes are automatically replicated to all placed roots via a background subscriber. Reads with root= get file paths rewritten through the preferred root when data is available there.
See Configuration for data root setup and multi-root catalog configuration.
Scheduled Processing
Subscriptions and transforms can run on a schedule instead of being triggered manually. DeltaCAT supports interval-based and cron-based scheduling.
# Process new data every 5 minutes
client.subscriptions.create(
subscriber_id="metrics_ingester",
source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
subscriber_type="custom",
dispatch_mode="custom",
trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)
# Process at 2am UTC daily using a cron expression
client.subscriptions.create(
subscriber_id="nightly_aggregator",
source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
subscriber_type="custom",
dispatch_mode="custom",
trigger={"mode": "schedule", "schedule": {"cron": "0 2 * * *", "timezone": "UTC"}},
)
# Event-driven: only runs when triggered manually or by an upstream pipeline node
client.subscriptions.create(
subscriber_id="on_demand_processor",
source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
subscriber_type="custom",
dispatch_mode="custom",
trigger={"mode": "event"},
)
The DeltaCAT server's scheduler evaluates triggers periodically and dispatches
jobs for subscriptions and transforms whose schedules are due. In
single-process deployments this can run in the server process; in multi-worker
production it runs as a separate --transport scheduler process. Paused nodes
are skipped.
See Configuration for trigger options and scheduling details.
Server Setup
The DeltaCAT client connects to a DeltaCAT API server. For setup instructions, see:
- Server Setup Guide -- start a local or production server
- MCP Server for Claude Code -- use DeltaCAT from Claude Code via MCP
- REST API Reference -- full REST endpoint documentation
- Client Compatibility Runbook -- promotion-time packaged client/server compatibility validation
Current MCP status for Python users:
- use
deltacat[server]to host the MCP server - use
deltacat-clientfor the thin REST client - use
deltacat-client[mcp]for the thin typed MCP HTTP client deltacat_client.mcpnow covers catalog/data, staged writes, typed job observation, and typed orchestration observation/control/execution wrappers for subscriptions, publications, transforms, and pipelines
Additional Resources
| Guide | Description |
|---|---|
| Reading and Writing | Read plans, write modes, staged writes, supported formats |
| Transactions | Transaction lifecycle, time travel, rules and limitations |
| Jobs and Workers | Job types, claiming, heartbeat, worker routing, authentication |
| Pipelines | Subscriptions, publications, transforms, DAGs, redrive |
| Configuration | Auth, dispatch modes, triggers, data placement |
| Maintainer Workflow | Relationship between REST and MCP, generated bindings, facade updates, and validation guards |
| Architecture | Package boundary, generated client, development notes |
| Client Compatibility Runbook | Promotion-time packaged client/server compatibility validation |
For the core DeltaCAT data model, storage architecture, and catalog APIs, see the DeltaCAT documentation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file deltacat_client-0.1.2.tar.gz.
File metadata
- Download URL: deltacat_client-0.1.2.tar.gz
- Upload date:
- Size: 210.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
366429956e271fcbd44f2d3b481ca338ec01f3d4db8c529e97a6071b5717ac02
|
|
| MD5 |
4093a20079ad4588a1079eb818cc84e9
|
|
| BLAKE2b-256 |
60ce71f63b5561824c0fac70c4e24185a41e3bde3f7a625f022e3ab77174dca6
|
File details
Details for the file deltacat_client-0.1.2-py3-none-any.whl.
File metadata
- Download URL: deltacat_client-0.1.2-py3-none-any.whl
- Upload date:
- Size: 599.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dfde2a3b38dc402b68f073fbda3799fbd00b7b4dbfe28b69fa549a4ff44585a4
|
|
| MD5 |
2a01674b84eb378f96ee0e19ef1131b8
|
|
| BLAKE2b-256 |
ee3aa208e973e12826968b8477adab3786303e5480377b9028b3c897d02c2a40
|