Lightweight REST client and thin MCP scaffolding for the DeltaCAT API server.
Project description
deltacat-client is the primary Python client package for
DeltaCAT. It lets you read and
write tables, run jobs, and build data pipelines against a DeltaCAT API
server without installing the full storage, compute, or server runtime
stack.
The client talks to a DeltaCAT server over HTTP. Metadata operations (schema validation, transaction management, compaction) stay server-side, while large data reads and writes go directly between the client and cloud storage using short-lived credentials the server vends on demand.
Overview
The client is organized around a root Client object with resource-oriented subclients:
| Subclient | Purpose |
|---|---|
client.catalog |
Namespaces, tables, read/write, transactions |
client.jobs |
Job submission, claiming, lifecycle, progress |
client.publications |
Incremental producers into the DeltaCAT Lakehouse (pipeline root nodes) |
client.subscriptions |
Incremental consumers from the DeltaCAT Lakehouse (pipeline leaf nodes) |
client.transforms |
Incrementally transform data within the DeltaCAT Lakehouse (pipeline intermediate nodes) |
client.pipelines |
Wire publications, transforms, and subscriptions into a connected DAG |
Installation
pip install deltacat-client
Optional extras for local data materialization:
pip install "deltacat-client[all]" # Full client-side read/write stack
pip install "deltacat-client[pandas]" # Pandas DataFrame support
pip install "deltacat-client[polars]" # Polars DataFrame support
pip install "deltacat-client[daft]" # Daft DataFrame support
pip install "deltacat-client[lance]" # Lance dataset support
pip install "deltacat-client[mcp]" # Typed async MCP HTTP client
Getting Started
Before using the client, you need a running DeltaCAT API server. See Server Setup for instructions.
DeltaCAT lets you manage Tables across one or more Catalogs. A Table is a named collection of data files. A Catalog is a named data lake that contains tables. For the full data model, see the DeltaCAT README.
Quick Start
from deltacat_client import Client
import pyarrow as pa
# Connect to a DeltaCAT server
client = Client("http://localhost:8080")
# Write data to a table. The table is created automatically — by default
# the client writes in ``mode="auto"`` with ``format="parquet"``.
data = pa.table({
"id": [1, 2, 3],
"name": ["Cheshire", "Dinah", "Felix"],
"age": [3, 7, 5],
})
client.catalog.write(data, table="cool_cats")
# Read the data back
df = client.catalog.read(table="cool_cats", read_as="pandas")
print(df)
Core Concepts
Expand the sections below to see examples of core client operations.
Authentication
When connecting to a production server with auth enabled, provide a bearer token:
from deltacat_client import Client
client = Client(
"https://deltacat-api.example.com",
bearer_token="your-api-token",
)
Admins can also onboard a new user, grant their initial access, and receive a one-time token to share over a secure out-of-band channel:
from deltacat_client import Client
admin = Client(
"https://deltacat-api.example.com",
bearer_token="admin-api-token",
)
created = admin.auth.create_user(
user_id="newadmin@example.com",
display_name="New Admin",
email="newadmin@example.com",
initial_role="ADMIN",
resource_type="catalog",
resource_name="*",
issue_token_label="bootstrap",
idempotency_key="create-newadmin-001",
)
bootstrap_token = created.token.token
print("Share this token once via a secure secret channel:", bootstrap_token)
The server validates your token and maps it to a user identity for permission checks (read, write, admin). The client automatically includes the token on every request. For data file access, the server vends short-lived STS credentials that the client uses to read from and write to cloud storage directly.
See Configuration for the detailed auth model, bootstrap tokens, and role-based access control.
Reading Data
# Read a table as a PyArrow table (the default format)
arrow_table = client.catalog.read(namespace="robotics", table="episodes")
# Read as Pandas, Polars, or Daft
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas")
polars_result = client.catalog.read(namespace="robotics", table="episodes", read_as="polars")
daft_df = client.catalog.read(namespace="robotics", table="episodes", read_as="daft")
# Filter rows and limit results
df = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
filter_predicate={"eq": ["task", "pick_screwdriver"]},
limit=5000,
)
# Time travel: read the table as it existed at a prior point in time.
# The as_of value is a nanosecond-precision Unix epoch timestamp.
df = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
as_of=1712697600000000000,
)
See Reading and Writing for scalable reads, lazy vs. eager materialization, and format-specific behavior. For PackDS-backed training tables, see Training Data.
Writing Data
The client supports writing PyArrow tables, Pandas DataFrames, Polars DataFrames, Daft DataFrames, NumPy arrays, Ray Datasets, and local files (Parquet, CSV, TSV, PSV, Feather, JSON, ORC, AVRO, Lance).
import pyarrow as pa
# Write data to a table. The defaults are mode="auto" (creates the
# table on first write, appends thereafter) and format="parquet".
data = pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]})
client.catalog.write(data, namespace="robotics", table="predictions")
# Append more data — same call, same defaults.
data2 = pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]})
client.catalog.write(data2, namespace="robotics", table="predictions")
# Write a Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"episode_id": [5], "score": [0.93]})
client.catalog.write(df, namespace="robotics", table="predictions")
# Override the defaults when you need a specific mode or format. Here we
# create a Lance-formatted table with explicit schema, partitioning, and
# sort order.
client.catalog.create_table(
namespace="robotics",
table="scored_episodes",
schema=pa.schema([
pa.field("episode_id", pa.int64()),
pa.field("score", pa.float64()),
pa.field("episode_day", pa.string()),
]),
partition_scheme={"keys": [{"key": ["episode_day"], "transform": "identity"}]},
sort_scheme={"keys": [{"key": ["episode_id"], "sort_order": "descending"}]},
auto_create_namespace=True,
)
data = pa.table({
"episode_id": [1, 2],
"score": [0.95, 0.87],
"episode_day": ["2025-01-15", "2025-01-15"],
})
client.catalog.write(data, namespace="robotics", table="scored_episodes")
# Evolve a table's schema after creation
client.catalog.alter_table(
namespace="robotics",
table="predictions",
schema_updates={
"operations": [
{"op": "add", "field": {"name": "confidence", "type": "float64"}}
]
},
)
See Reading and Writing for write modes,
staged writes, path-based writes with create_if_missing, and the full
list of supported formats and schema inputs.
For PackDS training data writes, see Training Data.
Live Feature Enrichment
Declare one or more merge keys on the table schema and subsequent writes update existing records in place.
import pyarrow as pa
import deltacat as dc
from deltacat_client import Client
client = Client("http://localhost:8080")
# Fields tagged with is_merge_key=True identify the key columns used to
# reconcile updates.
schema = dc.Schema.of([
dc.Field.of(pa.field("user_id", pa.int64()), is_merge_key=True),
dc.Field.of(pa.field("name", pa.string())),
dc.Field.of(pa.field("age", pa.int32())),
dc.Field.of(pa.field("job", pa.string())),
])
# First write creates a Lance table with the merge-key schema.
initial = pa.table({
"user_id": pa.array([1, 2, 3], type=pa.int64()),
"name": ["Jim", "Dinah", "Bob"],
"age": pa.array([30, 28, 45], type=pa.int32()),
"job": ["Teacher", "Painter", "Sailor"],
})
client.catalog.write(
initial,
namespace="demo",
table="users",
format="lance",
create_if_missing={"schema": schema, "auto_create_namespace": True},
)
# Upsert: user_id 1 + 3 are updated; 4 and 5 are inserted.
upsert = pa.table({
"user_id": pa.array([1, 3, 4, 5], type=pa.int64()),
"name": ["Cheshire", "Felix", "Tom", "Simpkin"],
"age": pa.array([3, 2, 5, 12], type=pa.int32()),
"job": ["Tour Guide", "Drifter", "Housekeeper", "Mouser"],
})
client.catalog.write(
upsert,
namespace="demo",
table="users",
mode="merge",
format="lance",
)
print(client.catalog.read(namespace="demo", table="users", read_as="pyarrow"))
# Delete: only the merge-key columns need to be supplied.
client.catalog.write(
pa.table({"user_id": pa.array([3, 5], type=pa.int64())}),
namespace="demo",
table="users",
mode="delete",
format="lance",
)
# user_id 3 and 5 are gone; 1, 2, 4 remain.
print(client.catalog.read(namespace="demo", table="users", read_as="pyarrow"))
PackDS v5 tables do not create DeltaCAT merge keys by default. For
canonical PackDS training data, use episode_id identity partitioning
and REPLACE_PARTITION; explicit merge keys remain available when a
caller deliberately wants normal DeltaCAT MERGE/DELETE semantics.
See Training Data.
PackDS Tables Without Merge Keys
PackDS v5 tables are identity-partitioned by episode_id with no merge
keys by default. Live feature enrichment then runs episode-at-a-time:
discover the episodes you want to enrich from the auto-maintained
<table>__episodes companion, then rewrite each target episode with
mode="replace_partition". The replaced partition is committed
atomically; sibling episodes are never touched.
import pyarrow as pa
from deltacat_client import Client
client = Client("http://localhost:8080")
# 1. Initial PackDS write across multiple episodes. DeltaCAT auto-creates
# the table with episode_id identity partitioning, no merge keys, and
# the `<table>__episodes` companion.
initial = pa.table({
"episode_id": [
"ep_0001", "ep_0001", "ep_0001",
"ep_0002", "ep_0002",
"ep_0003", "ep_0003", "ep_0003", "ep_0003",
],
"step_index": [0, 1, 2, 0, 1, 0, 1, 2, 3],
"task": [
"pick", "pick", "pick",
"place", "place",
"stack", "stack", "stack", "stack",
],
"annotation": [None] * 9,
})
client.catalog.write(
initial,
namespace="robotics",
table="episode_steps",
format="lance",
create_if_missing={
"schema_def": [
{"name": "episode_id", "type": "string"},
{"name": "step_index", "type": "int64"},
{"name": "task", "type": "string"},
{"name": "annotation", "type": "string"},
],
"layout": "packds",
"default_content_type": "lance",
"auto_create_namespace": True,
},
)
# 2. Discover which episodes exist by reading the companion table.
# Companion rows are written transactionally with each PackDS commit.
# The companion is a normal Parquet table; it is not subject to the
# PackDS read-scope guard, so an unfiltered read is fine.
companion = client.catalog.read(
namespace="robotics",
table="episode_steps__episodes",
read_as="pyarrow",
columns=["episode_id", "total_frames"],
).sort_by("episode_id")
# Pick the first episode by id. The companion's read order is not
# guaranteed without a sort, so we sort first and pull both the id and
# the row count from the same row.
target_id = companion["episode_id"][0].as_py()
target_len = companion["total_frames"][0].as_py()
# 3. Enrich the target episode end-to-end via REPLACE_PARTITION. No
# merge keys, no merge-on-read. The replacement is per-partition;
# sibling episodes are unaffected. Note that ``layout="packds"`` is
# only required at create time; subsequent writes pick the layout up
# from the table's stored ``dataset_layout`` property.
enriched = pa.table({
"episode_id": [target_id] * target_len,
"step_index": list(range(target_len)),
"task": ["pick"] * target_len,
"annotation": [f"auto-labeled-step-{i}" for i in range(target_len)],
})
client.catalog.write(
enriched,
namespace="robotics",
table="episode_steps",
mode="replace_partition",
format="lance",
)
# 4. Scoped read of the enriched episode. PackDS reads must be scoped
# to a finite set of episode_ids (equality / IN) or a partition
# filter; unscoped full-table PackDS reads are rejected by default
# and require ``allow_full_packds_scan=True`` (or
# ``DELTACAT_ALLOW_FULL_PACKDS_SCAN=true`` server-side) to opt in.
print(client.catalog.read(
namespace="robotics",
table="episode_steps",
read_as="pyarrow",
filter_predicate={"eq": ["episode_id", target_id]},
columns=["episode_id", "step_index", "task", "annotation"],
))
Larger enrichment loops typically batch over a filtered companion read
(e.g. filter_predicate={"in": ["episode_id", batch_ids]}) so each
replace_partition write touches exactly one episode while the worker
pool processes many in parallel. Because each episode is its own
DeltaCAT partition, concurrent replace-partition writes to disjoint
episodes do not contend.
See Training Data for the companion-table schema and guidance on choosing between this no-merge-key flow and an explicit-merge-key PackDS table.
Transactions
Transactions provide atomic multi-step operations with automatic heartbeating and rollback on failure.
import pyarrow as pa
with client.transaction(commit_message="Backfill predictions") as tx:
client.catalog.write(
pa.table({"episode_id": [10, 11], "score": [0.88, 0.92]}),
namespace="robotics",
table="predictions",
mode="add",
)
# Reads within the transaction see uncommitted writes
df = client.catalog.read(
namespace="robotics",
table="predictions",
read_as="pandas",
)
print(f"Rows visible in transaction: {len(df)}")
# Transaction commits automatically on exit; aborts on exception
See Transactions for time-travel reads, manual commit/abort, and transaction rules.
Jobs
DeltaCAT uses a durable job system for background work (compaction, data relay, subscription processing). The client can submit, monitor, and execute jobs.
# List all jobs
jobs = client.jobs.list()
for job in jobs:
print(f"{job.job_id}: {job.state}")
# Submit a compaction job
result = client.jobs.submit_compaction(table="predictions")
print(f"Submitted: {result.job_id}")
# Wait for it to complete
status = client.jobs.wait(result.job_id, timeout_seconds=120)
print(f"Final state: {status.state}")
Workers claim and execute jobs. Any process can act as a worker:
# Claim the next pending job matching our worker tags
job = client.jobs.claim(worker_tags=["subscriber"])
if job:
print(f"Claimed {job.job_id} (type: {job.context.get('job_type')})")
# Report fine-grained progress and declare the deadline for the next chunk
client.jobs.emit_event(
job,
event_name="batch_started",
completed=1,
expected=4,
metadata={"batch": 1},
heartbeat_timeout_seconds=300,
)
# Do the work...
# Mark complete. Source-consuming jobs must include the advanced
# watermark so the server can persist progress correctly.
client.jobs.complete(
job,
records_processed=1000,
watermark={
"partition_watermarks": {"analytics.events": 42},
"known_partitions": ["analytics.events"],
},
)
See Jobs and Workers for job types, worker routing, heartbeat rules, retry semantics, and dispatch modes.
Managed Ray Example
For subscription, transform, and publication jobs, dispatch_mode="ray"
submits the work onto DeltaCAT-managed Ray clusters instead of the shared
custom-worker claim pool. For large prebuilt environments, prefer
docker.image as the primary runtime artifact and use payload only for
small code/config overlays. The server still vends the DeltaCAT runtime from
its promoted runtime manifest so the remote cluster gets compatible
deltacat, deltacat_client, and deltacat_io_core packages.
from deltacat_client import Client
client = Client("http://localhost:8080", bearer_token="token")
payload = client.jobs.stage_ray_payload(
py_modules=["./shared_logic"],
)
ray_dispatch_yaml = """
cluster_shutdown_policy: terminate
docker:
image: "registry.example.com/ml/runtime@sha256:0123456789abcdef"
payload:
py_modules:
- {py_module}
head_node_type: ray.head.default
available_node_types:
ray.head.default:
node_config:
InstanceType: m7i.2xlarge
ray.worker.default:
min_workers: 1
max_workers: 4
node_config:
InstanceType: m7i.2xlarge
""".format(
py_module=payload.payload["py_modules"][0],
)
client.subscriptions.create(
subscriber_id="episode_processor",
source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
subscriber_type="custom",
dispatch_mode="ray",
dispatch_config=ray_dispatch_yaml,
trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)
The scheduler now auto-triggers the subscription every five minutes, DeltaCAT
auto-provisions the Ray cluster when a run starts, and
cluster_shutdown_policy: terminate tears the cluster back down after each run
by default. If the deployment advertises a compatible image profile, DeltaCAT
will execute inside the requested docker.image while keeping the promoted
DeltaCAT runtime bundle as the compatibility layer.
For reference, the Ray launcher YAML above is:
cluster_shutdown_policy: terminate
docker:
image: registry.example.com/ml/runtime@sha256:0123456789abcdef
payload:
py_modules:
- s3://.../shared_logic.tar.gz
head_node_type: ray.head.default
available_node_types:
ray.head.default:
node_config:
InstanceType: m7i.2xlarge
ray.worker.default:
min_workers: 1
max_workers: 4
node_config:
InstanceType: m7i.2xlarge
Use payload for small overlays only:
- experiment modules
- config bundles
- supplemental wheels
Do not treat payload as the primary transport for a large application image.
After launch, owner/admin callers can inspect cluster metadata and then use the managed-Ray access helpers against a specific job:
# `access` describes the currently discovered cluster shape for the job:
# region, cluster name, head instance id/private IP, worker ids, and the
# launch template metadata DeltaCAT used for the cluster.
access = client.jobs.get_managed_ray_access("job-id")
print(access.head_instance_id, access.head_private_ip)
# By default the SSM session starts on the Ray head node.
# The AWS CLI / Session Manager plugin will open an interactive session in
# your terminal, and the helper asks SSM to launch `bash -l` there so you land
# directly in a login shell on the instance.
client.jobs.start_managed_ray_ssm_session("job-id")
# For non-interactive inspection across the whole cluster, use send-command.
client.jobs.send_managed_ray_ssm_command("job-id", commands=["hostname"], target="all")
Publications
Publications are incremental producers that write new data into the DeltaCAT Lakehouse. They sit at the root of a pipeline DAG and can be triggered manually or fired by an upstream event.
# Create a publication that writes to a sink table
client.publications.create(
publication_id="episode_publisher",
name="Episode Publisher",
sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
dispatch_mode="local",
)
# Run the publication
result = client.publications.run("episode_publisher")
print(f"Published: {result}")
See Pipelines for publication configuration and DAG construction.
Subscriptions
Subscriptions are incremental consumers of DeltaCAT tables. They sit at the leaves of a pipeline DAG, tracking a per-partition watermark so each run picks up only new data.
# Create a subscription that watches for new data in "raw_episodes"
client.subscriptions.create(
subscriber_id="episode_processor",
source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
subscriber_type="custom",
dispatch_mode="custom",
)
# Trigger processing (dispatches a job to a subscriber worker)
client.subscriptions.trigger("episode_processor")
# Check watermark state
wm = client.subscriptions.get_watermark("episode_processor")
print(f"Watermark: {wm.watermark}")
# Pause / resume / delete
client.subscriptions.pause("episode_processor")
client.subscriptions.resume("episode_processor")
client.subscriptions.delete("episode_processor")
See Pipelines for subscription modes (delta vs. version), triggers, and redrive.
Transforms
Transforms are the intermediate nodes of a pipeline DAG. Each transform reads from one or more source tables, applies processing logic, and writes to one or more sink tables.
# Create a transform: raw_episodes -> clean_episodes
client.transforms.create(
transform_id="episode_cleaner",
name="Episode Cleaner",
source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
dispatch_mode="custom",
)
# Trigger transform processing
client.subscriptions.trigger("episode_cleaner")
# Pause / resume
client.transforms.pause("episode_cleaner")
client.transforms.resume("episode_cleaner")
See Pipelines for transform configuration, redrive, and rollback.
Pipelines
Pipelines wire publications, transforms, and subscriptions into a connected DAG. When an upstream node completes, downstream nodes are triggered automatically.
# First, create connected pipeline nodes
client.publications.create(
publication_id="ingest_pub",
name="Raw Ingest Publisher",
sink_tables=[{"namespace": "robotics", "table": "raw_data"}],
dispatch_mode="local",
)
client.transforms.create(
transform_id="clean",
name="Data Cleaner",
source_tables=[{"namespace": "robotics", "table": "raw_data"}],
sink_tables=[{"namespace": "robotics", "table": "clean_data"}],
dispatch_mode="custom",
)
client.subscriptions.create(
subscriber_id="consume_clean",
source_tables=[{"namespace": "robotics", "table": "clean_data"}],
subscriber_type="custom",
dispatch_mode="custom",
)
# Preview the connected pipeline from an interior seed node
preview = client.pipelines.discover(seed_node_ids=["clean"])
print(preview.execution_order)
# Option A: persist exactly the previewed node_ids
client.pipelines.create(
pipeline_id="etl_pipeline_pinned",
name="ETL Pipeline (Pinned)",
node_ids=preview.node_ids,
)
# Option B: create directly from seed node ids
client.pipelines.create(
pipeline_id="etl_pipeline_seeded",
name="ETL Pipeline (Seeded)",
seed_node_ids=["clean"],
)
# Check pipeline status
status = client.pipelines.status("etl_pipeline_seeded")
# Pause / resume all nodes at once
client.pipelines.pause("etl_pipeline_seeded")
client.pipelines.resume("etl_pipeline_seeded")
See Pipelines for DAG construction, discovery semantics, redrive, rollback, and stored-order validation.
Data Placement and Replication
DeltaCAT catalogs can span multiple storage backends (S3, SwiftStack, Lustre). Data placement lets you replicate tables across roots so readers access data from the closest location.
# List available data roots for the catalog. The keys of this dict
# are the valid values to pass as `roots=` and `root=` below.
catalog_info = client.catalog.describe_catalog("default")
available_roots = catalog_info.data_roots or {}
for root_name, root_info in available_roots.items():
print(root_name, root_info)
# Example output (dict[str, DataRootInfoSummary]):
# aws_s3_iad {'root': 's3://bucket-iad/', 'storage_type': 's3',
# 'region': 'us-east-1', 'endpoint_url': None}
# aws_s3_pdx {'root': 's3://bucket-pdx/', 'storage_type': 's3',
# 'region': 'us-west-2', 'endpoint_url': None}
# swiftstack_pdx {'root': 's3://swiftstack-bucket/', 'storage_type': 's3',
# 'region': None, 'endpoint_url': 'https://pdx.swiftstack.example.com'}
# Pick a target root from what the catalog actually advertises.
# In production you'd choose based on region, storage class, or policy.
target_root = next(iter(available_roots))
# Place a table on an additional storage root for replication
client.catalog.place(
namespace="robotics",
table="episodes",
roots=[target_root], # Replicate to this root
backfill=True, # Copy existing data too
)
# Check replication status
status = client.catalog.replication_status(namespace="robotics", table="episodes")
print(f"Roots: {status}")
# Read from the closest root (server resolves automatically)
df = client.catalog.read(
namespace="robotics",
table="episodes",
read_as="pandas",
root=target_root, # Prefer this root for file paths
)
# Remove a replication target
client.catalog.unplace(namespace="robotics", table="episodes", root=target_root)
New writes are automatically replicated to all placed roots via a background subscriber. Reads with root= get file paths rewritten through the preferred root when data is available there.
See Configuration for data root setup and multi-root catalog configuration.
Discovery Root Administration
Catalog admins can publish read-only discovery roots for system crawler, GC,
relay, replication, and placement jobs. Discovery roots are separate from
managed data roots, so they can describe authority roots such as s3://
without becoming valid table write targets.
client.catalog.create_storage_binding(
"default",
name="swiftstack_pdx_root",
storage_type="s3",
storage_class="discovery",
uri_scheme="s3",
endpoint_url="https://pdx.swiftstack.example.com",
region="us-west-2",
credential_source={"kind": "aws_profile", "profile": "pdx"},
)
client.catalog.create_discovery_root(
"default",
name="swiftstack_pdx",
binding_name="swiftstack_pdx_root",
root_uri="s3://",
authorization={"principals": ["service:system-crawler"]},
)
roots = client.catalog.list_discovery_roots(
"default",
include_disabled=True,
include_history=True,
)
details = client.catalog.describe_discovery_root(
"default",
"swiftstack_pdx",
include_history=True,
)
# Updates create a new immutable discovery-root version.
client.catalog.update_discovery_root(
"default",
"swiftstack_pdx",
binding_name="swiftstack_pdx_root",
root_uri="s3://",
description="PDX authority root",
authorization={"principals": ["service:system-crawler"]},
)
# Deletes are logical disables and remain visible with include_disabled=True.
client.catalog.delete_discovery_root("default", "swiftstack_pdx")
Discovery-root detail responses include the effective binding revision, endpoint, region, authorization summary, lifecycle state, sanitized credential-source metadata, and effective storage snapshot.
Scheduled Processing
Subscriptions and transforms can run on a schedule instead of being triggered manually. DeltaCAT supports interval-based and cron-based scheduling.
# Process new data every 5 minutes
client.subscriptions.create(
subscriber_id="metrics_ingester",
source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
subscriber_type="custom",
dispatch_mode="custom",
trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)
# Process at 2am UTC daily using a cron expression
client.subscriptions.create(
subscriber_id="nightly_aggregator",
source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
subscriber_type="custom",
dispatch_mode="custom",
trigger={"mode": "schedule", "schedule": {"cron": "0 2 * * *", "timezone": "UTC"}},
)
# Event-driven: only runs when triggered manually or by an upstream pipeline node
client.subscriptions.create(
subscriber_id="on_demand_processor",
source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
subscriber_type="custom",
dispatch_mode="custom",
trigger={"mode": "event"},
)
See Configuration for trigger options and scheduling details.
Agentic Access (MCP)
DeltaCAT ships with a built-in Model Context Protocol server so AI agents (for example, Claude Code) can browse catalogs, inspect schemas, plan reads, and stage writes through natural language instead of hand-written Python.
Most hand-written code should just use the REST Client(...) shown
above. Reach for MCP when you want to:
- let an agent explore and operate on your catalog conversationally
- embed DeltaCAT as a tool in an agentic application
- use a typed async Python wrapper over the MCP HTTP surface
(
deltacat-client[mcp]) from code that is already agentic in shape
See the MCP Server guide for the full tool reference, the typed async client, and recipes for agent-driven catalog workflows.
Server Setup
The DeltaCAT client connects to a DeltaCAT API server. For setup instructions, see:
- Server Setup Guide -- start a local or production server
- REST API Reference -- full REST endpoint documentation
- MCP Server Guide -- agentic access via MCP
Additional Resources
| Guide | Description |
|---|---|
| Reading and Writing | Read plans, write modes, staged writes, supported formats |
| Training Data | PackDS tables, episode indexes, shard manifests, distributed training |
| Transactions | Transaction lifecycle, time travel, rules and limitations |
| Jobs and Workers | Job types, claiming, heartbeat, worker routing, authentication |
| Pipelines | Publications, transforms, subscriptions, DAGs, redrive |
| Configuration | Auth, dispatch modes, triggers, data placement |
| Maintainer Workflow | Relationship between REST and MCP, generated bindings, facade updates, and validation guards |
| Architecture | Package boundary, generated client, development notes |
| Client Compatibility Runbook | Promotion-time packaged client/server compatibility validation |
For the core DeltaCAT data model, storage architecture, and catalog APIs, see the DeltaCAT documentation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file deltacat_client-0.1.9.tar.gz.
File metadata
- Download URL: deltacat_client-0.1.9.tar.gz
- Upload date:
- Size: 255.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3e52373be0e585870a2affb1437c5b51c60e4de18495ac1ec930ea64b711fa2c
|
|
| MD5 |
149ae2756451796b0906309b6501c998
|
|
| BLAKE2b-256 |
f3e6999a660a5a82aa9d8d00915a08cb1103ad6f9b6e47f69df0c43e44a11b55
|
File details
Details for the file deltacat_client-0.1.9-py3-none-any.whl.
File metadata
- Download URL: deltacat_client-0.1.9-py3-none-any.whl
- Upload date:
- Size: 677.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
04013897a806c723a186817c322e18257062e2ace8671004c789f4d7b5f53306
|
|
| MD5 |
0a1e29107bbb42e386115b746e0247cd
|
|
| BLAKE2b-256 |
41c32b2a8073bb341451093a00f73ccd24f8cb65bfa8a19e0907e373db503a68
|