Skip to main content

Python SDK for Arc time-series database

Project description

arc-client

Python SDK for Arc time-series database.

Python 3.9+ License: MIT

Installation

pip install arc-client

# With pandas support
pip install arc-client[pandas]

# With polars support
pip install arc-client[polars]

# With all optional dependencies
pip install arc-client[all]

Or with uv:

uv add arc-client
uv add arc-client --extra pandas
uv add arc-client --extra all

Quick Start

from arc_client import ArcClient

with ArcClient(host="localhost", token="your-token") as client:
    # Write data (columnar format - fastest)
    client.write.write_columnar(
        measurement="cpu",
        columns={
            "time": [1633024800000000, 1633024801000000],
            "host": ["server01", "server01"],
            "usage_idle": [95.0, 94.5],
        },
    )

    # Query to pandas DataFrame
    df = client.query.query_pandas("SELECT * FROM default.cpu WHERE host = 'server01'")
    print(df)

Features

  • High-performance ingestion: MessagePack columnar format (9M+ records/sec)
  • Multiple formats: MessagePack columnar/row, InfluxDB Line Protocol
  • DataFrame integration: Pandas, Polars, PyArrow
  • Sync and async APIs: Full async support with httpx
  • Buffered writes: Automatic batching with size and time thresholds
  • Query support: SQL queries with JSON or Arrow IPC streaming
  • Management: Retention policies, continuous queries, delete operations
  • Authentication: Token management (create, rotate, revoke)

Data Ingestion

Columnar Format (Recommended)

The fastest way to write data. Uses MessagePack with gzip compression:

client.write.write_columnar(
    measurement="cpu",
    columns={
        "time": [1633024800000000, 1633024801000000],  # microseconds
        "host": ["server01", "server02"],
        "region": ["us-east", "us-west"],
        "usage_idle": [95.0, 87.3],
        "usage_system": [3.2, 8.1],
    },
    database="default",  # optional
)

DataFrame Ingestion

Write directly from pandas or polars DataFrames:

import pandas as pd

df = pd.DataFrame({
    "time": pd.to_datetime(["2024-01-01", "2024-01-02"]),
    "host": ["server01", "server02"],
    "value": [42.0, 43.5],
})

client.write.write_dataframe(
    df,
    measurement="metrics",
    time_column="time",
    tag_columns=["host"],
)

Buffered Writes

For high-throughput scenarios, use buffered writes with automatic batching:

with client.write.buffered(batch_size=10000, flush_interval=5.0) as buffer:
    for record in records:
        buffer.write(
            measurement="events",
            tags={"source": record.source},
            fields={"value": record.value},
            timestamp=record.timestamp,
        )
    # Auto-flushes on exit or when batch_size/flush_interval reached

Line Protocol

For compatibility with InfluxDB tooling:

# Single line
client.write.write_line_protocol("cpu,host=server01 usage=45.2 1633024800000000000")

# Multiple lines
lines = [
    "cpu,host=server01 usage=45.2",
    "cpu,host=server02 usage=67.8",
]
client.write.write_line_protocol(lines)

Querying Data

JSON Response

result = client.query.query("SELECT * FROM default.cpu WHERE time > now() - INTERVAL '1 hour'")
print(result.columns)  # ['time', 'host', 'usage']
print(result.data)     # [[1633024800000000, 'server01', 45.2], ...]
print(result.row_count)

pandas DataFrame

df = client.query.query_pandas("SELECT * FROM default.cpu LIMIT 1000")

Polars DataFrame

pl_df = client.query.query_polars("SELECT * FROM default.cpu LIMIT 1000")

PyArrow Table (Zero-Copy)

table = client.query.query_arrow("SELECT * FROM default.cpu LIMIT 1000")

Query Estimation

Preview query cost before execution:

estimate = client.query.estimate("SELECT * FROM default.cpu")
print(f"Estimated rows: {estimate.estimated_rows}")
print(f"Warning level: {estimate.warning_level}")  # none, low, medium, high

List Measurements

measurements = client.query.list_measurements(database="default")
for m in measurements:
    print(f"{m.measurement}: {m.file_count} files, {m.total_size_mb:.1f} MB")

Async Support

All operations have async equivalents:

import asyncio
from arc_client import AsyncArcClient

async def main():
    async with AsyncArcClient(host="localhost", token="your-token") as client:
        # Write
        await client.write.write_columnar(
            measurement="cpu",
            columns={"time": [...], "host": [...], "usage": [...]},
        )

        # Query
        df = await client.query.query_pandas("SELECT * FROM default.cpu")

        # Buffered writes
        async with client.write.buffered(batch_size=5000) as buffer:
            for record in records:
                await buffer.write(measurement="events", fields={"v": record.v})

asyncio.run(main())

Management Operations

Retention Policies

Automatically delete old data:

# Create policy
policy = client.retention.create(
    name="30-day-retention",
    database="default",
    retention_days=30,
    measurement="logs",  # optional, applies to all if not specified
)

# List policies
policies = client.retention.list()

# Execute (with dry_run first)
result = client.retention.execute(policy.id, dry_run=True)
print(f"Would delete {result.deleted_count} rows")

# Execute for real
result = client.retention.execute(policy.id, dry_run=False, confirm=True)

Continuous Queries

Aggregate and downsample data automatically:

# Create a CQ to downsample CPU metrics to 1-hour averages
cq = client.continuous_queries.create(
    name="cpu-hourly",
    database="default",
    source_measurement="cpu",
    destination_measurement="cpu_1h",
    query="SELECT time_bucket('1 hour', time) as time, host, avg(usage) as usage FROM default.cpu GROUP BY 1, 2",
    interval="1h",
)

# Manual execution with time range
result = client.continuous_queries.execute(
    cq.id,
    start_time="2024-01-01T00:00:00Z",
    end_time="2024-01-02T00:00:00Z",
    dry_run=True,
)

# List CQs
cqs = client.continuous_queries.list(database="default")

Delete Operations

Delete data with WHERE clause:

# Always dry_run first
result = client.delete.delete(
    database="default",
    measurement="logs",
    where="time < '2024-01-01'",
    dry_run=True,
)
print(f"Would delete {result.deleted_count} rows from {result.affected_files} files")

# Execute deletion
result = client.delete.delete(
    database="default",
    measurement="logs",
    where="time < '2024-01-01'",
    dry_run=False,
    confirm=True,  # Required for large deletes
)

Authentication

# Verify current token
verify = client.auth.verify()
if verify.valid:
    print(f"Token: {verify.token_info.name}")
    print(f"Permissions: {verify.permissions}")

# Create new token
result = client.auth.create_token(
    name="my-app-token",
    description="Token for my application",
    permissions=["read", "write"],
)
print(f"New token: {result.token}")  # Save this - shown only once!

# List tokens
tokens = client.auth.list_tokens()

# Rotate token
rotated = client.auth.rotate_token(token_id=123)
print(f"New token: {rotated.new_token}")

# Revoke token
client.auth.revoke_token(token_id=123)

Configuration

client = ArcClient(
    host="localhost",       # Arc server hostname
    port=8000,              # Arc server port (default: 8000)
    token="your-token",     # API token
    database="default",     # Default database
    timeout=30.0,           # Request timeout in seconds
    compression=True,       # Enable gzip compression for writes
    ssl=False,              # Use HTTPS
    verify_ssl=True,        # Verify SSL certificates
)

Error Handling

from arc_client.exceptions import (
    ArcError,              # Base exception
    ArcConnectionError,    # Connection failures
    ArcAuthenticationError,# Auth failures (401)
    ArcQueryError,         # Query execution errors
    ArcIngestionError,     # Write failures
    ArcValidationError,    # Invalid input
    ArcNotFoundError,      # Resource not found (404)
    ArcRateLimitError,     # Rate limited (429)
    ArcServerError,        # Server errors (5xx)
)

try:
    client.query.query("INVALID SQL")
except ArcQueryError as e:
    print(f"Query failed: {e}")
except ArcConnectionError as e:
    print(f"Connection failed: {e}")

Documentation

See the full documentation for detailed guides and API reference.

License

MIT License - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

arc_tsdb_client-0.1.0.tar.gz (26.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

arc_tsdb_client-0.1.0-py3-none-any.whl (50.4 kB view details)

Uploaded Python 3

File details

Details for the file arc_tsdb_client-0.1.0.tar.gz.

File metadata

  • Download URL: arc_tsdb_client-0.1.0.tar.gz
  • Upload date:
  • Size: 26.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for arc_tsdb_client-0.1.0.tar.gz
Algorithm Hash digest
SHA256 11ca1bbccc5facc0a7e9ea1cf37422919d6f14ab701f69533b7800e99fb83b97
MD5 87baf1923cc8d23a5bcf62797cda053c
BLAKE2b-256 40cc9d94e60e6d7679654790e9d065d5c81120ca98127dd45eb709a7edfad319

See more details on using hashes here.

File details

Details for the file arc_tsdb_client-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for arc_tsdb_client-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d3af8f827d125c5fad62b411c9005d7c8febcfb89b43cefa778a36388a71ea18
MD5 ca4669e734a73c49a390d1bf3fec720f
BLAKE2b-256 017475a65dd474072713fe11dd3e5e2e7998da08c56084479b75cdf56826051b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page