Skip to main content

Community Python client for InfluxDB 3.0

Project description

Your Image

Readthedocs document PyPI version PyPI downloads CodeQL analysis CircleCI Code Cov Community Slack

InfluxDB 3.0 Python Client

Introduction

influxdb_client_3 is a Python module that provides a simple and convenient way to interact with InfluxDB 3.0. This module supports both writing data to InfluxDB and querying data using the Flight client, which allows you to execute SQL and InfluxQL queries on InfluxDB 3.0.

We offer a "Getting Started: InfluxDB 3.0 Python Client Library" video that goes over how to use the library and goes over the examples.

Dependencies

  • pyarrow (automatically installed)
  • pandas (optional)

Installation

You can install 'influxdb3-python' using pip:

pip install influxdb3-python

Note: This does not include Pandas support. If you would like to use key features such as to_pandas() and write_file(), or to use PyArrow data conversion methods with nanosecond timestamp precision, you will need to install pandas separately.

Note: Please make sure you are using 3.9 or above. For the best performance use 3.11+

CLI (Agent-Friendly Query Tool)

This package includes an influx3 CLI for read/query workflows.

Run a query

influx3 query -d my_database "SELECT * FROM cpu LIMIT 5"

By default, output is JSON to stdout.

Supported formats

  • json (default)
  • jsonl
  • csv
  • pretty
influx3 query -d my_database --format csv "SELECT * FROM cpu LIMIT 5"

Config precedence

Configuration values are resolved in this order:

  1. CLI flags
  2. INFLUXDB3_* environment variables
  3. legacy INFLUX_* environment variables
  4. built-in defaults (host defaults to http://127.0.0.1:8181)

Relevant environment variables:

  • INFLUXDB3_HOST_URL (legacy fallback: INFLUX_HOST)
  • INFLUXDB3_DATABASE_NAME (legacy fallback: INFLUX_DATABASE)
  • INFLUXDB3_AUTH_TOKEN (legacy fallback: INFLUX_TOKEN)

Usage

One of the easiest ways to get started is to check out the "Influxdb3 Python Basic Usage" notebook. This scenario takes you through the core write and read APIs of the client library.

Additional examples independent of jupyter are detailed in the ./examples directory.

Importing the Module

from influxdb_client_3 import InfluxDBClient3, Point

Initialization

If you are using InfluxDB Cloud, then you should note that:

  1. Use bucket name for database or bucket in function argument.
client = InfluxDBClient3(token="your-token",
                         host="your-host",
                         database="your-database")

Writing Data

You can write data using the Point class, or supplying line protocol.

Using Points

point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)

Control tag order for first-write column order (InfluxDB 3 Enterprise)

from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, WriteType, write_client_options

point = Point("cpu") \
    .tag("host", "server-a") \
    .tag("region", "us-east") \
    .tag("rack", "r1") \
    .field("usage", 0.42)

write_options = WriteOptions(
    write_type=WriteType.synchronous,
    tag_order=["region", "host"],
)

client = InfluxDBClient3(
    token="your-token",
    host="your-host",
    database="your-database",
    write_client_options=write_client_options(write_options=write_options),
)
client.write(point)

Using Line Protocol

point = "measurement fieldname=0"
client.write(point)

Write from file

Users can import data from CSV, JSON, Feather, ORC, Parquet

import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError


class BatchingCallback(object):

    def __init__(self):
        self.write_count = 0

    def success(self, conf, data: str):
        self.write_count += 1
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf, data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()

write_options = WriteOptions(batch_size=100,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=5,
                                        max_retry_delay=30_000,
                                        exponential_base=2)

wco = write_client_options(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry,
                          write_options=write_options
                        )

with  InfluxDBClient3.InfluxDBClient3(
    token="INSERT_TOKEN",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    database="python", write_client_options=wco) as client:


    client.write_file(
        file='./out.csv',
        timestamp_column='time', tag_columns=["provider", "machineID"])

print(f'DONE writing from csv in {callback.write_count} batch(es)')

Pandas DataFrame

import pandas as pd

# Create a DataFrame with a timestamp column
df = pd.DataFrame({
    'time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']),
    'trainer': ['Ash', 'Misty', 'Brock'],
    'pokemon_id': [25, 120, 74],
    'pokemon_name': ['Pikachu', 'Staryu', 'Geodude']
})

# Write the DataFrame - timestamp_column is required for consistency
client.write_dataframe(
    df,
    measurement='caught',
    timestamp_column='time',
    tags=['trainer', 'pokemon_id']
)

Polars DataFrame

import polars as pl

# Create a DataFrame with a timestamp column
df = pl.DataFrame({
    'time': ['2024-01-01T00:00:00Z', '2024-01-02T00:00:00Z'],
    'trainer': ['Ash', 'Misty'],
    'pokemon_id': [25, 120],
    'pokemon_name': ['Pikachu', 'Staryu']
})

# Write the DataFrame - same API works for both pandas and polars
client.write_dataframe(
    df,
    measurement='caught',
    timestamp_column='time',
    tags=['trainer', 'pokemon_id']
)

Accept partial writes and inspect failed lines

accept_partial defaults to True and allows partial success when writing through the V3 API endpoint (use_v2_api=False) and a batch contains invalid lines. On partial failure, the client raises InfluxDBPartialWriteError with structured line_errors.

from influxdb_client_3 import InfluxDBClient3
from influxdb_client_3.exceptions import InfluxDBPartialWriteError

client = InfluxDBClient3(
    host="http://localhost:8181",
    token="token",
    database="db",
    write_use_v2_api=False,
)
lp = "home,room=Sunroom temp=96 1735545600\nhome,room=Sunroom temp=\"hi\" 1735549200"

try:
    client.write(lp)  # accept_partial=True by default on V3 API endpoint
except InfluxDBPartialWriteError as e:
    for line_err in e.line_errors:
        print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})")

Disable partial writes:

client = InfluxDBClient3(
    host="http://localhost:8181",
    token="token",
    database="db",
    write_use_v2_api=False,
    write_accept_partial=False,
)

Compatibility with InfluxDB Clustered and InfluxDB Cloud Dedicated/Serverless

Writes use the V2 API endpoint by default, so no additional configuration is required for these products.

use_v2_api can be configured by:

  • WriteOptions(use_v2_api=False) (for V3 API endpoint features)
  • constructor kwarg: write_use_v2_api=False
  • env var: INFLUX_WRITE_USE_V2_API=false

When use_v2_api=True:

  • accept_partial is not used
  • no_sync=True is invalid and rejected before dispatch with: invalid write options: no_sync cannot be used with use_v2_api

To use no_sync or accept_partial controls, set use_v2_api=False (for example with InfluxDB 3 Core/Enterprise).

Querying

Querying with SQL

query = "select * from measurement"
reader = client.query(query=query, language="sql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

Querying to DataFrame

# Query directly to a pandas DataFrame (default)
df = client.query_dataframe("SELECT * FROM caught WHERE trainer = 'Ash'")

# Query to a polars DataFrame
df = client.query_dataframe("SELECT * FROM caught", frame_type="polars")

Querying with influxql

query = "select * from measurement"
reader = client.query(query=query, language="influxql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

gRPC compression

Request compression

Request compression is not supported by InfluxDB 3 — the client sends uncompressed requests.

Response compression

Response compression is enabled by default. The client sends the grpc-accept-encoding: identity, deflate, gzip header, and the server returns gzip-compressed responses (if supported). The client automatically decompresses them — no configuration required.

To disable response compression:

# Via constructor parameter
client = InfluxDBClient3(
    host="your-host",
    token="your-token",
    database="your-database",
    disable_grpc_compression=True
)

# Or via environment variable
# INFLUX_DISABLE_GRPC_COMPRESSION=true
client = InfluxDBClient3.from_env()

Windows Users

Currently, Windows users require an extra installation when querying via Flight natively. This is due to the fact gRPC cannot locate Windows root certificates. To work around this please follow these steps: Install certifi

pip install certifi

Next include certifi within the flight client options:

import certifi

import influxdb_client_3 as InfluxDBClient3
from influxdb_client_3 import flight_client_options

fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()

client = InfluxDBClient3.InfluxDBClient3(
    token="",
    host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
    database="flightdemo",
    flight_client_options=flight_client_options(
        tls_root_certs=cert))

table = client.query(
    query="SELECT * FROM flight WHERE time > now() - 4h",
    language="influxql")

print(table.to_pandas())

You may include your own root certificate in this manner as well.

If connecting to InfluxDB fails with error DNS resolution failed when using domain name, example www.mydomain.com, then try to set environment variable GRPC_DNS_RESOLVER=native to see if it works.

Contributing

Tests are run using pytest.

# Clone the repository
git clone https://github.com/InfluxCommunity/influxdb3-python
cd influxdb3-python

# Create a virtual environment and activate it
python3 -m venv .venv
source .venv/bin/activate

# Install the package and its dependencies
pip install -e .[pandas,polars,dataframe,test]

# Run the tests
python -m pytest .

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

influxdb3_python-0.20.0.tar.gz (102.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

influxdb3_python-0.20.0-py3-none-any.whl (86.3 kB view details)

Uploaded Python 3

File details

Details for the file influxdb3_python-0.20.0.tar.gz.

File metadata

  • Download URL: influxdb3_python-0.20.0.tar.gz
  • Upload date:
  • Size: 102.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for influxdb3_python-0.20.0.tar.gz
Algorithm Hash digest
SHA256 f1e28c2f4f244d48006beeb82be7aea82ebdd3b3e1807250d124b6f1d53c5943
MD5 49087c3ff7a69cf3b140b878c5993475
BLAKE2b-256 3bf03fb234e83316e439a91d4d7ee5a2f569b9b81e5a709618c4168dca3a087b

See more details on using hashes here.

File details

Details for the file influxdb3_python-0.20.0-py3-none-any.whl.

File metadata

File hashes

Hashes for influxdb3_python-0.20.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0914f05c2ed9b96f2962fb3d410068dda9c411b562dab22fae1d6c6599a37927
MD5 8b04a9c4bbdc5060960c165e594491d0
BLAKE2b-256 cd2f06961cdb1d38d0cdb3c6876970db227e69824a293896af6d1eb983fa1534

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page