Skip to main content

Community Python client for InfluxDB 3.0

Project description

Your Image

PyPI version PyPI downloads CodeQL analysis CircleCI Code Cov Community Slack

InfluxDB 3.0 Python Client

Introduction

influxdb_client_3 is a Python module that provides a simple and convenient way to interact with InfluxDB 3.0. This module supports both writing data to InfluxDB and querying data using the Flight client, which allows you to execute SQL and InfluxQL queries on InfluxDB 3.0.

We offer a "Getting Started: InfluxDB 3.0 Python Client Library" video that goes over how to use the library and goes over the examples.

Dependencies

  • pyarrow (automatically installed)
  • pandas (optional)

Installation

You can install 'influxdb3-python' using pip:

pip install influxdb3-python

Note: This does not include Pandas support. If you would like to use key features such as to_pandas() and write_file() you will need to install pandas separately.

Note: Please make sure you are using 3.6 or above. For the best performance use 3.11+

Usage

One of the easiest ways to get started is to checkout the "Pokemon Trainer Cookbook". This scenario takes you through the basics of both the client library and Pyarrow.

Importing the Module

from influxdb_client_3 import InfluxDBClient3, Point

Initialization

If you are using InfluxDB Cloud, then you should note that:

  1. Use bucket name for database or bucket in function argument.
client = InfluxDBClient3(token="your-token",
                         host="your-host",
                         database="your-database")

Writing Data

You can write data using the Point class, or supplying line protocol.

Using Points

point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)

Using Line Protocol

point = "measurement fieldname=0"
client.write(point)

Write from file

Users can import data from CSV, JSON, Feather, ORC, Parquet

import influxdb_client_3 as InfluxDBClient3
import pandas as pd
import numpy as np
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError


class BatchingCallback(object):

    def __init__(self):
        self.write_count = 0

    def success(self, conf, data: str):
        self.write_count += 1
        print(f"Written batch: {conf}, data: {data}")

    def error(self, conf, data: str, exception: InfluxDBError):
        print(f"Cannot write batch: {conf}, data: {data} due: {exception}")

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")

callback = BatchingCallback()

write_options = WriteOptions(batch_size=100,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=5,
                                        max_retry_delay=30_000,
                                        exponential_base=2)

wco = write_client_options(success_callback=callback.success,
                          error_callback=callback.error,
                          retry_callback=callback.retry,
                          write_options=write_options
                        )

with  InfluxDBClient3.InfluxDBClient3(
    token="INSERT_TOKEN",
    host="eu-central-1-1.aws.cloud2.influxdata.com",
    database="python", write_client_options=wco) as client:


    client.write_file(
        file='./out.csv',
        timestamp_column='time', tag_columns=["provider", "machineID"])

print(f'DONE writing from csv in {callback.write_count} batch(es)')

Pandas DataFrame

import pandas as pd

# Create a DataFrame with a timestamp column
df = pd.DataFrame({
    'time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']),
    'trainer': ['Ash', 'Misty', 'Brock'],
    'pokemon_id': [25, 120, 74],
    'pokemon_name': ['Pikachu', 'Staryu', 'Geodude']
})

# Write the DataFrame - timestamp_column is required for consistency
client.write_dataframe(
    df,
    measurement='caught',
    timestamp_column='time',
    tags=['trainer', 'pokemon_id']
)

Polars DataFrame

import polars as pl

# Create a DataFrame with a timestamp column
df = pl.DataFrame({
    'time': ['2024-01-01T00:00:00Z', '2024-01-02T00:00:00Z'],
    'trainer': ['Ash', 'Misty'],
    'pokemon_id': [25, 120],
    'pokemon_name': ['Pikachu', 'Staryu']
})

# Write the DataFrame - same API works for both pandas and polars
client.write_dataframe(
    df,
    measurement='caught',
    timestamp_column='time',
    tags=['trainer', 'pokemon_id']
)

Querying

Querying with SQL

query = "select * from measurement"
reader = client.query(query=query, language="sql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

Querying to DataFrame

# Query directly to a pandas DataFrame (default)
df = client.query_dataframe("SELECT * FROM caught WHERE trainer = 'Ash'")

# Query to a polars DataFrame
df = client.query_dataframe("SELECT * FROM caught", frame_type="polars")

Querying with influxql

query = "select * from measurement"
reader = client.query(query=query, language="influxql")
table = reader.read_all()
print(table.to_pandas().to_markdown())

gRPC compression

Request compression

Request compression is not supported by InfluxDB 3 — the client sends uncompressed requests.

Response compression

Response compression is enabled by default. The client sends the grpc-accept-encoding: identity, deflate, gzip header, and the server returns gzip-compressed responses (if supported). The client automatically decompresses them — no configuration required.

To disable response compression:

# Via constructor parameter
client = InfluxDBClient3(
    host="your-host",
    token="your-token",
    database="your-database",
    disable_grpc_compression=True
)

# Or via environment variable
# INFLUX_DISABLE_GRPC_COMPRESSION=true
client = InfluxDBClient3.from_env()

Windows Users

Currently, Windows users require an extra installation when querying via Flight natively. This is due to the fact gRPC cannot locate Windows root certificates. To work around this please follow these steps: Install certifi

pip install certifi

Next include certifi within the flight client options:

import certifi

import influxdb_client_3 as InfluxDBClient3
from influxdb_client_3 import flight_client_options

fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()

client = InfluxDBClient3.InfluxDBClient3(
    token="",
    host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io",
    database="flightdemo",
    flight_client_options=flight_client_options(
        tls_root_certs=cert))

table = client.query(
    query="SELECT * FROM flight WHERE time > now() - 4h",
    language="influxql")

print(table.to_pandas())

You may include your own root certificate in this manner as well.

If connecting to InfluxDB fails with error DNS resolution failed when using domain name, example www.mydomain.com, then try to set environment variable GRPC_DNS_RESOLVER=native to see if it works.

Contributing

Tests are run using pytest.

# Clone the repository
git clone https://github.com/InfluxCommunity/influxdb3-python
cd influxdb3-python

# Create a virtual environment and activate it
python3 -m venv .venv
source .venv/bin/activate

# Install the package and its dependencies
pip install -e .[pandas,polars,dataframe,test]

# Run the tests
python -m pytest .

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

influxdb3_python-0.17.0.tar.gz (96.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

influxdb3_python-0.17.0-py3-none-any.whl (94.1 kB view details)

Uploaded Python 3

File details

Details for the file influxdb3_python-0.17.0.tar.gz.

File metadata

  • Download URL: influxdb3_python-0.17.0.tar.gz
  • Upload date:
  • Size: 96.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for influxdb3_python-0.17.0.tar.gz
Algorithm Hash digest
SHA256 676358f4d8f08d02bbb94b13f03a871f7834175e70f45811bf40a56b39f38769
MD5 8355e1add7b3cfa1870e8c57b4243cf7
BLAKE2b-256 e5704dadc9740f63701ebc1a33df22db1e1c19cea037deee211ba6e2d1b10b98

See more details on using hashes here.

File details

Details for the file influxdb3_python-0.17.0-py3-none-any.whl.

File metadata

File hashes

Hashes for influxdb3_python-0.17.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4847a20529872a66ccddd4d14387737767694c02d4b45031f768b04a72d2d2df
MD5 dc1ca4533d4f6e2e7d364393e8fbad24
BLAKE2b-256 a4164a5199810ba0072c9bbd258adea716a6d162a0990e2b560e6748fd3a22ce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page