Skip to main content

Community Python client for InfluxDB 3.0

Project description

Your Image

Readthedocs document PyPI version PyPI downloads CodeQL analysis CircleCI Code Cov Community Slack

InfluxDB 3.0 Python Client

Introduction

influxdb_client_3 is a Python module that provides a simple and convenient way to interact with InfluxDB 3.0. This module supports both writing data to InfluxDB and querying data using the Flight client, which allows you to execute SQL and InfluxQL queries on InfluxDB 3.0.

We offer a "Getting Started: InfluxDB 3.0 Python Client Library" video that goes over how to use the library and goes over the examples.

Dependencies

  • pyarrow (automatically installed)
  • pandas (optional)

Installation

You can install 'influxdb3-python' using pip:

pip install influxdb3-python

Note: This does not include Pandas support. If you would like to use key features such as to_pandas() and write_file() you will need to install pandas separately.

Note: Please make sure you are using 3.9 or above. For the best performance use 3.11+

Usage

One of the easiest ways to get started is to checkout the "Pokemon Trainer Cookbook". This scenario takes you through the basics of both the client library and Pyarrow.

Importing the Module

from influxdb_client_3 import InfluxDBClient3, Point

Initialization

If you are using InfluxDB Cloud, then you should note that:

  1. Use bucket name for database or bucket in function argument.
client = InfluxDBClient3(token="your-token",
                         host="your-host",
                         database="your-database")

Writing Data

You can write data using the Point class, or supplying line protocol.

Using Points

point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)

Control tag order for first-write column order (InfluxDB 3 Enterprise)

from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, WriteType, write_client_options

point = Point("cpu") \
    .tag("host", "server-a") \
    .tag("region", "us-east") \
    .tag("rack", "r1") \
    .field("usage", 0.42)

write_options = WriteOptions(
    write_type=WriteType.synchronous,
    tag_order=["region", "host"],
)

client = InfluxDBClient3(
    token="your-token",
    host="your-host",
    database="your-database",
    write_client_options=write_client_options(write_options=write_options),
)
client.write(point)

Using Line Protocol

point = "measurement fieldname=0"
client.write(point)

Write from file

Users can import data from CSV, JSON, Feather, ORC, Parquet

import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError


class BatchingCallback(object):

    def __init__(self):
        self.write_count = 0

    def success(self, conf, data: str):
        self.write_count += 1
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf, data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()

write_options = WriteOptions(batch_size=100,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=5,
                                        max_retry_delay=30_000,
                                        exponential_base=2)

wco = write_client_options(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry,
                          write_options=write_options
                        )

with  InfluxDBClient3.InfluxDBClient3(
    token="INSERT_TOKEN",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    database="python", write_client_options=wco) as client:


    client.write_file(
        file='./out.csv',
        timestamp_column='time', tag_columns=["provider", "machineID"])

print(f'DONE writing from csv in {callback.write_count} batch(es)')

Pandas DataFrame

import pandas as pd

# Create a DataFrame with a timestamp column
df = pd.DataFrame({
    'time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']),
    'trainer': ['Ash', 'Misty', 'Brock'],
    'pokemon_id': [25, 120, 74],
    'pokemon_name': ['Pikachu', 'Staryu', 'Geodude']
})

# Write the DataFrame - timestamp_column is required for consistency
client.write_dataframe(
    df,
    measurement='caught',
    timestamp_column='time',
    tags=['trainer', 'pokemon_id']
)

Polars DataFrame

import polars as pl

# Create a DataFrame with a timestamp column
df = pl.DataFrame({
    'time': ['2024-01-01T00:00:00Z', '2024-01-02T00:00:00Z'],
    'trainer': ['Ash', 'Misty'],
    'pokemon_id': [25, 120],
    'pokemon_name': ['Pikachu', 'Staryu']
})

# Write the DataFrame - same API works for both pandas and polars
client.write_dataframe(
    df,
    measurement='caught',
    timestamp_column='time',
    tags=['trainer', 'pokemon_id']
)

Querying

Querying with SQL

query = "select * from measurement"
reader = client.query(query=query, language="sql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

Querying to DataFrame

# Query directly to a pandas DataFrame (default)
df = client.query_dataframe("SELECT * FROM caught WHERE trainer = 'Ash'")

# Query to a polars DataFrame
df = client.query_dataframe("SELECT * FROM caught", frame_type="polars")

Querying with influxql

query = "select * from measurement"
reader = client.query(query=query, language="influxql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

gRPC compression

Request compression

Request compression is not supported by InfluxDB 3 — the client sends uncompressed requests.

Response compression

Response compression is enabled by default. The client sends the grpc-accept-encoding: identity, deflate, gzip header, and the server returns gzip-compressed responses (if supported). The client automatically decompresses them — no configuration required.

To disable response compression:

# Via constructor parameter
client = InfluxDBClient3(
    host="your-host",
    token="your-token",
    database="your-database",
    disable_grpc_compression=True
)

# Or via environment variable
# INFLUX_DISABLE_GRPC_COMPRESSION=true
client = InfluxDBClient3.from_env()

Windows Users

Currently, Windows users require an extra installation when querying via Flight natively. This is due to the fact gRPC cannot locate Windows root certificates. To work around this please follow these steps: Install certifi

pip install certifi

Next include certifi within the flight client options:

import certifi

import influxdb_client_3 as InfluxDBClient3
from influxdb_client_3 import flight_client_options

fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()

client = InfluxDBClient3.InfluxDBClient3(
    token="",
    host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
    database="flightdemo",
    flight_client_options=flight_client_options(
        tls_root_certs=cert))

table = client.query(
    query="SELECT * FROM flight WHERE time > now() - 4h",
    language="influxql")

print(table.to_pandas())

You may include your own root certificate in this manner as well.

If connecting to InfluxDB fails with error DNS resolution failed when using domain name, example www.mydomain.com, then try to set environment variable GRPC_DNS_RESOLVER=native to see if it works.

Contributing

Tests are run using pytest.

# Clone the repository
git clone https://github.com/InfluxCommunity/influxdb3-python
cd influxdb3-python

# Create a virtual environment and activate it
python3 -m venv .venv
source .venv/bin/activate

# Install the package and its dependencies
pip install -e .[pandas,polars,dataframe,test]

# Run the tests
python -m pytest .

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

influxdb3_python-0.19.0.tar.gz (101.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

influxdb3_python-0.19.0-py3-none-any.whl (96.0 kB view details)

Uploaded Python 3

File details

Details for the file influxdb3_python-0.19.0.tar.gz.

File metadata

  • Download URL: influxdb3_python-0.19.0.tar.gz
  • Upload date:
  • Size: 101.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for influxdb3_python-0.19.0.tar.gz
Algorithm Hash digest
SHA256 4654f15cb56116321446daf3bbde6aefe075888014660218c2ef423da9679c3b
MD5 92cad64bd451effa5139e12c15992073
BLAKE2b-256 e7fb93d9df24f2ae288dd43892c4aaf6f31505b377754b8eb974f7803fdd1e51

See more details on using hashes here.

File details

Details for the file influxdb3_python-0.19.0-py3-none-any.whl.

File metadata

File hashes

Hashes for influxdb3_python-0.19.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0ae81565f3ba1428ede4b88c476f006e0b055fe0bf18ccbfeec2dea18440f614
MD5 90996368baabd9f2ca2acba95236b32a
BLAKE2b-256 f986ac8a67a8754b2f0f9743fb4d7b510bc9740e71301545377c1106b168aaac

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page