Skip to main content

Lightweight REST client for the DeltaCAT API server.

Project description

deltacat logo

deltacat-client is the Python REST client for DeltaCAT. It lets you read and write tables, manage jobs, and build data pipelines on a DeltaCAT API server -- without installing the full storage, compute, or server runtime stack.

The client talks to a DeltaCAT server over HTTP. Data files are read from and written to cloud storage (S3, GCS, Azure) directly by the client using server-vended credentials. Metadata operations (schema validation, transaction management, compaction triggers) stay server-side.

Overview

The client is organized around a root Client object with resource-oriented subclients:

Subclient Purpose
client.catalog Namespaces, tables, read/write, transactions
client.jobs Job submission, claiming, lifecycle, progress
client.subscriptions Incremental data processing pipelines
client.publications Data publishing workflows
client.transforms Source-to-sink data transformations
client.pipelines Multi-stage pipeline orchestration

Installation

pip install deltacat-client

Optional extras for local data materialization:

pip install "deltacat-client[all]"    # Full client-side read/write stack
pip install "deltacat-client[pandas]" # Pandas DataFrame support
pip install "deltacat-client[polars]" # Polars DataFrame support
pip install "deltacat-client[lance]"  # Lance dataset support

Getting Started

Before using the client, you need a running DeltaCAT API server. See Server Setup for instructions.

Quick Start

from deltacat_client import Client
import pyarrow as pa

# Connect to a DeltaCAT server
client = Client("http://localhost:8080")

# Write data to a table (table is created automatically)
data = pa.table({
    "id": [1, 2, 3],
    "name": ["Cheshire", "Dinah", "Felix"],
    "age": [3, 7, 5],
})
client.catalog.write(data, namespace="default", table="cool_cats", mode="auto", format="parquet")

# Read the data back
df = client.catalog.read(namespace="default", table="cool_cats", read_as="pandas")
print(df)

Core Concepts

DeltaCAT lets you manage Tables across one or more Catalogs. A Table is a named collection of data files. A Catalog is a named data lake that contains tables. For more on DeltaCAT's data model, see the DeltaCAT README.

Expand the sections below to see examples of core client operations.

Authentication

When connecting to a production server with auth enabled, provide a bearer token:

from deltacat_client import Client

client = Client(
    "https://deltacat-api.example.com",
    bearer_token="your-api-token",
)

The server validates your token and maps it to a user identity for permission checks (read, write, admin). The client automatically includes the token on every request. For data file access, the server vends short-lived STS credentials that the client uses to read from and write to cloud storage directly.

This is different from thick DeltaCAT (import deltacat as dc) where authentication is handled entirely by the filesystem layer (e.g., AWS IAM roles, boto3 profiles).

See Configuration for bearer token setup, trusted identity headers, and role-based access control.

Reading Data
# Read a full table as a Pandas DataFrame
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas")

# Read as PyArrow, Polars, or NumPy
arrow_table = client.catalog.read(namespace="robotics", table="episodes", read_as="pyarrow")
polars_df = client.catalog.read(namespace="robotics", table="episodes", read_as="polars")

# Filter and limit
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    filter_predicate={"eq": ["task", "pick_screwdriver"]},
    limit=5000,
)

# Read a specific table version (time travel)
df = client.catalog.read(namespace="robotics", table="episodes", read_as="pandas", as_of=1712697600000000000)
Writing Data

The client supports writing PyArrow tables, Pandas DataFrames, Polars DataFrames, Daft DataFrames, NumPy arrays, Ray Datasets, and local files (Parquet, CSV, TSV, PSV, Feather, JSON, ORC, AVRO, Lance).

import pyarrow as pa

# Write data (creates the table if it doesn't exist)
data = pa.table({"episode_id": [1, 2], "score": [0.95, 0.87]})
client.catalog.write(data, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Append more data
data2 = pa.table({"episode_id": [3, 4], "score": [0.91, 0.89]})
client.catalog.write(data2, namespace="robotics", table="predictions", mode="auto", format="parquet")

# Write a Pandas DataFrame
import pandas as pd
df = pd.DataFrame({"episode_id": [5], "score": [0.93]})
client.catalog.write(df, namespace="robotics", table="predictions", mode="auto", format="parquet")
Transactions

Transactions provide atomic multi-step operations with automatic heartbeating and rollback on failure.

import pyarrow as pa

with client.transaction(commit_message="Backfill predictions") as tx:
    client.catalog.write(
        pa.table({"episode_id": [10, 11], "score": [0.88, 0.92]}),
        namespace="robotics",
        table="predictions",
        mode="add",
        format="parquet",
    )

    # Reads within the transaction see uncommitted writes
    df = client.catalog.read(
        namespace="robotics",
        table="predictions",
        read_as="pandas",
    )
    print(f"Rows visible in transaction: {len(df)}")
# Transaction commits automatically on exit; aborts on exception

See Transactions for time-travel reads, manual commit/abort, and transaction rules.

Jobs

DeltaCAT uses a durable job system for background work (compaction, data relay, subscription processing). The client can submit, monitor, and execute jobs.

# List all jobs
jobs = client.jobs.list()
for job in jobs:
    print(f"{job.job_id}: {job.state}")

# Submit a compaction job
result = client.jobs.submit_compaction(namespace="default", table="predictions")
print(f"Submitted: {result.job_id}")

# Wait for it to complete
status = client.jobs.wait(result.job_id, timeout_seconds=120)
print(f"Final state: {status.state}")

Workers claim and execute jobs. Any process can act as a worker:

# Claim the next pending job matching our worker tags
job = client.jobs.claim(worker_tags=["subscriber"])
if job:
    print(f"Claimed {job.job_id} (type: {job.context.get('job_type')})")

    # Report progress
    client.jobs.heartbeat(job.job_id, progress=0.5, message="Processing...")

    # Do the work...

    # Mark complete
    client.jobs.complete(job.job_id, records_processed=1000)

See Jobs and Workers for job types, worker routing, authentication, and dispatch modes.

Subscriptions

Subscriptions track changes to source tables and process new data incrementally. They maintain a watermark so each run picks up only new data.

# Create a subscription that watches for new data in "raw_episodes"
client.subscriptions.create(
    subscriber_id="episode_processor",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)

# Trigger processing (dispatches a job to a subscriber worker)
client.subscriptions.trigger("episode_processor")

# Check watermark state
wm = client.subscriptions.get_watermark("episode_processor")
print(f"Watermark: {wm.watermark}")

# Pause / resume / delete
client.subscriptions.pause("episode_processor")
client.subscriptions.resume("episode_processor")
client.subscriptions.delete("episode_processor")

See Pipelines for subscription modes (delta vs. version), triggers, and redrive.

Publications

Publications write processed data to sink tables. They can be triggered manually or wired into a pipeline after a subscription.

# Create a publication that writes to a sink table
client.publications.create(
    publication_id="episode_publisher",
    name="Episode Publisher",
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="local",
)

# Run the publication
result = client.publications.run("episode_publisher")
print(f"Published: {result}")

See Pipelines for connecting publications to subscriptions.

Transforms

Transforms read from source tables, apply processing logic, and write to sink tables. They combine subscription (source tracking) with publication (sink writing) in a single node.

# Create a transform: raw_episodes -> clean_episodes
client.transforms.create(
    transform_id="episode_cleaner",
    name="Episode Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_episodes"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_episodes"}],
    dispatch_mode="custom",
)

# Trigger transform processing
client.subscriptions.trigger("episode_cleaner")

# Pause / resume
client.transforms.pause("episode_cleaner")
client.transforms.resume("episode_cleaner")

See Pipelines for transform configuration, redrive, and rollback.

Pipelines

Pipelines wire subscriptions, transforms, and publications into a connected DAG. When an upstream node completes, downstream nodes are triggered automatically.

# First, create the individual nodes
client.subscriptions.create(
    subscriber_id="ingest",
    source_tables=[{"namespace": "robotics", "table": "raw_data"}],
    subscriber_type="custom",
    dispatch_mode="custom",
)
client.transforms.create(
    transform_id="clean",
    name="Data Cleaner",
    source_tables=[{"namespace": "robotics", "table": "raw_data"}],
    sink_tables=[{"namespace": "robotics", "table": "clean_data"}],
    dispatch_mode="custom",
)
client.publications.create(
    publication_id="publish",
    name="Data Publisher",
    sink_tables=[{"namespace": "robotics", "table": "final_data"}],
    dispatch_mode="local",
)

# Wire them into a pipeline (nodes are connected in DAG order)
client.pipelines.create(
    pipeline_id="etl_pipeline",
    name="ETL Pipeline",
    node_ids=["ingest", "clean", "publish"],
)

# Check pipeline status
status = client.pipelines.status("etl_pipeline")

# Pause / resume all nodes at once
client.pipelines.pause("etl_pipeline")
client.pipelines.resume("etl_pipeline")

See Pipelines for DAG construction, redrive, and pipeline operations.

Data Placement and Replication

DeltaCAT catalogs can span multiple storage backends (S3, SwiftStack, Lustre). Data placement lets you replicate tables across roots so readers access data from the closest location.

# Place a table on an additional storage root for replication
client.catalog.place(
    namespace="robotics",
    table="episodes",
    roots=["aws_s3_iad"],       # Replicate to this root
    backfill=True,               # Copy existing data too
)

# Check replication status
status = client.catalog.replication_status(namespace="robotics", table="episodes")
print(f"Roots: {status}")

# Read from the closest root (server resolves automatically)
df = client.catalog.read(
    namespace="robotics",
    table="episodes",
    read_as="pandas",
    root="aws_s3_iad",          # Prefer this root for file paths
)

# Remove a replication target
client.catalog.unplace(namespace="robotics", table="episodes", roots=["aws_s3_iad"])

New writes are automatically replicated to all placed roots via a background subscriber. Reads with root= get file paths rewritten through the preferred root when data is available there.

See Configuration for data root setup and multi-root catalog configuration.

Scheduled Processing

Subscriptions and transforms can run on a schedule instead of being triggered manually. DeltaCAT supports interval-based and cron-based scheduling.

# Process new data every 5 minutes
client.subscriptions.create(
    subscriber_id="metrics_ingester",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"interval_seconds": 300}},
)

# Process at 2am UTC daily using a cron expression
client.subscriptions.create(
    subscriber_id="nightly_aggregator",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "schedule", "schedule": {"cron": "0 2 * * *", "timezone": "UTC"}},
)

# Event-driven: only runs when triggered manually or by an upstream pipeline node
client.subscriptions.create(
    subscriber_id="on_demand_processor",
    source_tables=[{"namespace": "telemetry", "table": "raw_metrics"}],
    subscriber_type="custom",
    dispatch_mode="custom",
    trigger={"mode": "event"},
)

The DeltaCAT server's built-in scheduler evaluates triggers periodically and dispatches jobs for subscriptions and transforms whose schedules are due. Paused nodes are skipped.

See Configuration for trigger options and scheduling details.

Server Setup

The DeltaCAT client connects to a DeltaCAT API server. For setup instructions, see:

Additional Resources

Guide Description
Reading and Writing Read plans, write modes, staged writes, supported formats
Transactions Transaction lifecycle, time travel, rules and limitations
Jobs and Workers Job types, claiming, heartbeat, worker routing, authentication
Pipelines Subscriptions, publications, transforms, DAGs, redrive
Configuration Auth, dispatch modes, triggers, data placement
Architecture Package boundary, generated client, development notes

For the core DeltaCAT data model, storage architecture, and catalog APIs, see the DeltaCAT documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

deltacat_client-0.1.1.tar.gz (182.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

deltacat_client-0.1.1-py3-none-any.whl (542.6 kB view details)

Uploaded Python 3

File details

Details for the file deltacat_client-0.1.1.tar.gz.

File metadata

  • Download URL: deltacat_client-0.1.1.tar.gz
  • Upload date:
  • Size: 182.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for deltacat_client-0.1.1.tar.gz
Algorithm Hash digest
SHA256 65a2771ebce1ab1406eb80ffcf01146c47644ebc9b52e92d5d7864d18ed33c10
MD5 28b145b04e0fbfcc3299fc52a7b42d18
BLAKE2b-256 f733602cbb66d178a243ac70b1076e2f0364d751b5565a49a9f0d55a7ad2498b

See more details on using hashes here.

File details

Details for the file deltacat_client-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: deltacat_client-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 542.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for deltacat_client-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f71e93a4cc0c586c341eb08bf24320d38ccdec213e2f8c9670c61fa7384f2d69
MD5 ce8036c3e5a07cb5d006a6b3323d8792
BLAKE2b-256 b5015ce73b8fd52d3ecd897dbf7afc87c70449c13a7c1edd2e834ae65f2aab68

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page