Skip to main content

Python SDK for Arc time-series database

Project description

arc-tsdb-client

Python SDK for Arc time-series database.

PyPI version Python 3.9+ License: MIT

Installation

pip install arc-tsdb-client

# With pandas support
pip install arc-tsdb-client[pandas]

# With polars support
pip install arc-tsdb-client[polars]

# With all optional dependencies
pip install arc-tsdb-client[all]

Or with uv:

uv add arc-tsdb-client
uv add arc-tsdb-client --extra pandas
uv add arc-tsdb-client --extra all

Quick Start

from arc_client import ArcClient

with ArcClient(host="localhost", token="your-token") as client:
    # Write data (columnar format - fastest)
    client.write.write_columnar(
        measurement="cpu",
        columns={
            "time": [1633024800000000, 1633024801000000],
            "host": ["server01", "server01"],
            "usage_idle": [95.0, 94.5],
        },
    )

    # Query to pandas DataFrame
    df = client.query.query_pandas("SELECT * FROM default.cpu WHERE host = 'server01'")
    print(df)

Features

  • High-performance ingestion: MessagePack columnar format (9M+ records/sec)
  • Multiple formats: MessagePack columnar/row, InfluxDB Line Protocol
  • DataFrame integration: Pandas, Polars, PyArrow
  • Sync and async APIs: Full async support with httpx
  • Buffered writes: Automatic batching with size and time thresholds
  • Query support: SQL queries with JSON or Arrow IPC streaming
  • Management: Retention policies, continuous queries, delete operations
  • Authentication: Token management (create, rotate, revoke)

Data Ingestion

Columnar Format (Recommended)

The fastest way to write data. Uses MessagePack with gzip compression:

client.write.write_columnar(
    measurement="cpu",
    columns={
        "time": [1633024800000000, 1633024801000000],  # microseconds
        "host": ["server01", "server02"],
        "region": ["us-east", "us-west"],
        "usage_idle": [95.0, 87.3],
        "usage_system": [3.2, 8.1],
    },
    database="default",  # optional
)

DataFrame Ingestion

Write directly from pandas or polars DataFrames:

import pandas as pd

df = pd.DataFrame({
    "time": pd.to_datetime(["2024-01-01", "2024-01-02"]),
    "host": ["server01", "server02"],
    "value": [42.0, 43.5],
})

client.write.write_dataframe(
    df,
    measurement="metrics",
    time_column="time",
    tag_columns=["host"],
)

Buffered Writes

For high-throughput scenarios, use buffered writes with automatic batching:

with client.write.buffered(batch_size=10000, flush_interval=5.0) as buffer:
    for record in records:
        buffer.write(
            measurement="events",
            tags={"source": record.source},
            fields={"value": record.value},
            timestamp=record.timestamp,
        )
    # Auto-flushes on exit or when batch_size/flush_interval reached

Line Protocol

For compatibility with InfluxDB tooling:

# Single line
client.write.write_line_protocol("cpu,host=server01 usage=45.2 1633024800000000000")

# Multiple lines
lines = [
    "cpu,host=server01 usage=45.2",
    "cpu,host=server02 usage=67.8",
]
client.write.write_line_protocol(lines)

Querying Data

JSON Response

result = client.query.query("SELECT * FROM default.cpu WHERE time > now() - INTERVAL '1 hour'")
print(result.columns)  # ['time', 'host', 'usage']
print(result.data)     # [[1633024800000000, 'server01', 45.2], ...]
print(result.row_count)

pandas DataFrame

df = client.query.query_pandas("SELECT * FROM default.cpu LIMIT 1000")

Polars DataFrame

pl_df = client.query.query_polars("SELECT * FROM default.cpu LIMIT 1000")

PyArrow Table (Zero-Copy)

table = client.query.query_arrow("SELECT * FROM default.cpu LIMIT 1000")

Query Estimation

Preview query cost before execution:

estimate = client.query.estimate("SELECT * FROM default.cpu")
print(f"Estimated rows: {estimate.estimated_rows}")
print(f"Warning level: {estimate.warning_level}")  # none, low, medium, high

List Measurements

measurements = client.query.list_measurements(database="default")
for m in measurements:
    print(f"{m.measurement}: {m.file_count} files, {m.total_size_mb:.1f} MB")

Async Support

All operations have async equivalents:

import asyncio
from arc_client import AsyncArcClient

async def main():
    async with AsyncArcClient(host="localhost", token="your-token") as client:
        # Write
        await client.write.write_columnar(
            measurement="cpu",
            columns={"time": [...], "host": [...], "usage": [...]},
        )

        # Query
        df = await client.query.query_pandas("SELECT * FROM default.cpu")

        # Buffered writes
        async with client.write.buffered(batch_size=5000) as buffer:
            for record in records:
                await buffer.write(measurement="events", fields={"v": record.v})

asyncio.run(main())

Management Operations

Retention Policies

Automatically delete old data:

# Create policy
policy = client.retention.create(
    name="30-day-retention",
    database="default",
    retention_days=30,
    measurement="logs",  # optional, applies to all if not specified
)

# List policies
policies = client.retention.list()

# Execute (with dry_run first)
result = client.retention.execute(policy.id, dry_run=True)
print(f"Would delete {result.deleted_count} rows")

# Execute for real
result = client.retention.execute(policy.id, dry_run=False, confirm=True)

Continuous Queries

Aggregate and downsample data automatically:

# Create a CQ to downsample CPU metrics to 1-hour averages
cq = client.continuous_queries.create(
    name="cpu-hourly",
    database="default",
    source_measurement="cpu",
    destination_measurement="cpu_1h",
    query="SELECT time_bucket('1 hour', time) as time, host, avg(usage) as usage FROM default.cpu GROUP BY 1, 2",
    interval="1h",
)

# Manual execution with time range
result = client.continuous_queries.execute(
    cq.id,
    start_time="2024-01-01T00:00:00Z",
    end_time="2024-01-02T00:00:00Z",
    dry_run=True,
)

# List CQs
cqs = client.continuous_queries.list(database="default")

Delete Operations

Delete data with WHERE clause:

# Always dry_run first
result = client.delete.delete(
    database="default",
    measurement="logs",
    where="time < '2024-01-01'",
    dry_run=True,
)
print(f"Would delete {result.deleted_count} rows from {result.affected_files} files")

# Execute deletion
result = client.delete.delete(
    database="default",
    measurement="logs",
    where="time < '2024-01-01'",
    dry_run=False,
    confirm=True,  # Required for large deletes
)

Authentication

# Verify current token
verify = client.auth.verify()
if verify.valid:
    print(f"Token: {verify.token_info.name}")
    print(f"Permissions: {verify.permissions}")

# Create new token
result = client.auth.create_token(
    name="my-app-token",
    description="Token for my application",
    permissions=["read", "write"],
)
print(f"New token: {result.token}")  # Save this - shown only once!

# List tokens
tokens = client.auth.list_tokens()

# Rotate token
rotated = client.auth.rotate_token(token_id=123)
print(f"New token: {rotated.new_token}")

# Revoke token
client.auth.revoke_token(token_id=123)

Configuration

client = ArcClient(
    host="localhost",       # Arc server hostname
    port=8000,              # Arc server port (default: 8000)
    token="your-token",     # API token
    database="default",     # Default database
    timeout=30.0,           # Request timeout in seconds
    compression=True,       # Enable gzip compression for writes
    ssl=False,              # Use HTTPS
    verify_ssl=True,        # Verify SSL certificates
)

Error Handling

from arc_client.exceptions import (
    ArcError,              # Base exception
    ArcConnectionError,    # Connection failures
    ArcAuthenticationError,# Auth failures (401)
    ArcQueryError,         # Query execution errors
    ArcIngestionError,     # Write failures
    ArcValidationError,    # Invalid input
    ArcNotFoundError,      # Resource not found (404)
    ArcRateLimitError,     # Rate limited (429)
    ArcServerError,        # Server errors (5xx)
)

try:
    client.query.query("INVALID SQL")
except ArcQueryError as e:
    print(f"Query failed: {e}")
except ArcConnectionError as e:
    print(f"Connection failed: {e}")

Documentation

See the full documentation for detailed guides and API reference.

License

MIT License - see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

arc_tsdb_client-0.1.1.tar.gz (26.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

arc_tsdb_client-0.1.1-py3-none-any.whl (50.4 kB view details)

Uploaded Python 3

File details

Details for the file arc_tsdb_client-0.1.1.tar.gz.

File metadata

  • Download URL: arc_tsdb_client-0.1.1.tar.gz
  • Upload date:
  • Size: 26.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for arc_tsdb_client-0.1.1.tar.gz
Algorithm Hash digest
SHA256 34ec4087c7c8499f633913cd6a902de25e0ff36b906e70cb83883b09546cdc10
MD5 7d17c3dd84bd999cbe829abee44125e7
BLAKE2b-256 21d0dc603eeccb5be69ad80f3df22d1eac4256098c6b1a24ee2700ea8abbffed

See more details on using hashes here.

File details

Details for the file arc_tsdb_client-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for arc_tsdb_client-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1fd65d8385f8c3ed0435a4d46395f824bfb67a6878f3ed23552b5aae9e3d8b27
MD5 c41e881899c8c74b5d77ab9e6cdd3173
BLAKE2b-256 33239e82789321f07888462500499d7cb03c918482b1faa3de6b09b46cdb2358

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page