Skip to main content

InfluxDB 2.0 Python client library

Project description

CircleCI codecov CI status PyPI package Supported Python versions Documentation status Slack Status

InfluxDB 2.0 python client library.

Note: This library is for use with InfluxDB 2.x. For connecting to InfluxDB 1.x instances, please use the influxdb-python.

InfluxDB 2.0 client features

Installation

InfluxDB python library uses RxPY - The Reactive Extensions for Python (RxPY).

Python 3.6 or later is required.

pip install

The python package is hosted on PyPI, you can install latest version directly:

pip install influxdb-client

Then import the package:

import influxdb_client

Setuptools

Install via Setuptools.

python setup.py install --user

(or sudo python setup.py install to install the package for all users)

Getting Started

Please follow the Installation and then run the following:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "my-bucket"

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)

write_api.write(bucket=bucket, org="my-org", record=p)

## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for table in tables:
    print(table)
    for row in table.records:
        print (row.values)


## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
    for cell in row:
        val_count += 1

Client configuration

Via File

A client can be configured via *.ini file in segment influx2.

The following options are supported:

  • url - the url to connect to InfluxDB

  • org - default destination organization for writes and queries

  • token - the token to use for the authorization

  • timeout - socket timeout in ms (default value is 10000)

self.client = InfluxDBClient.from_config_file("config.ini")
[influx2]
url=http://localhost:9999
org=my-org
token=my-token
timeout=6000

Via Environment Properties

A client can be configured via environment properties.

Supported properties are:

  • INFLUXDB_V2_URL - the url to connect to InfluxDB

  • INFLUXDB_V2_ORG - default destination organization for writes and queries

  • INFLUXDB_V2_TOKEN - the token to use for the authorization

  • INFLUXDB_V2_TIMEOUT - socket timeout in ms (default value is 10000)

self.client = InfluxDBClient.from_env_properties()

How to use

Writes

The WriteApi supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a InfluxDB Line Protocol, Data Point or Observable stream.

The default instance of WriteApi use batching.

The data could be written as

  1. string or bytes that is formatted as a InfluxDB’s line protocol

  2. Data Point structure

  3. Dictionary style mapping with keys: measurement, tags, fields and time

  4. List of above items

  5. A batching type of write also supports an Observable that produce one of an above item

Batching

The batching is configurable by write_options:

Property

Description

Default Value

batch_size

the number of data pointx to collect in a batch

1000

flush_interval

the number of milliseconds before the batch is written

1000

jitter_interval

the number of milliseconds to increase the batch flush interval by a random amount

0

retry_interval

the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify “Retry-After” header.

1000

import rx
from rx import operators as ops

from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

_client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,
                                                             flush_interval=10_000,
                                                             jitter_interval=2_000,
                                                             retry_interval=5_000))

"""
Write Line Protocol formatted as string
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
                                            "h2o_feet,location=coyote_creek water_level=3.0 3"])

"""
Write Line Protocol formatted as byte array
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
                                            "h2o_feet,location=coyote_creek water_level=3.0 3".encode()])

"""
Write Dictionary-style object
"""
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
                                            "fields": {"water_level": 1.0}, "time": 1})
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
                                            "fields": {"water_level": 2.0}, "time": 2},
                                            {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
                                            "fields": {"water_level": 3.0}, "time": 3}])

"""
Write Data Point
"""
_write_client.write("my-bucket", "my-org", Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org", [Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
                                            Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])

"""
Write Observable stream
"""
_data = rx \
    .range(7, 11) \
    .pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))

_write_client.write("my-bucket", "my-org", _data)


"""
Close client
"""
_write_client.__del__()
_client.__del__()

Default Tags

Sometimes is useful to store same information in every measurement e.g. hostname, location, customer. The client is able to use static value or env property as a tag value.

The expressions:

  • California Miner - static value

  • ${env.hostname} - environment property

Via API
point_settings = PointSettings()
point_settings.add_default_tag("id", "132-987-655")
point_settings.add_default_tag("customer", "California Miner")
point_settings.add_default_tag("data_center", "${env.data_center}")

self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)
self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
                                              point_settings=PointSettings(**{"id": "132-987-655",
                                                                              "customer": "California Miner"}))
Via Configuration file

In a ini configuration file you are able to specify default tags by tags segment.

self.client = InfluxDBClient.from_config_file("config.ini")
[influx2]
url=http://localhost:9999
org=my-org
token=my-token
timeout=6000

[tags]
id = 132-987-655
customer = California Miner
data_center = ${env.data_center}
Via Environment Properties

You are able to specify default tags by environment properties with prefix INFLUXDB_V2_TAG_.

Examples:

  • INFLUXDB_V2_TAG_ID

  • INFLUXDB_V2_TAG_HOSTNAME

self.client = InfluxDBClient.from_env_properties()

Asynchronous client

Data are writes in an asynchronous HTTP request.

from influxdb_client  import InfluxDBClient
from influxdb_client.client.write_api import ASYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
write_client = client.write_api(write_options=ASYNCHRONOUS)

...

client.__del__()

Synchronous client

Data are writes in a synchronous HTTP request.

from influxdb_client  import InfluxDBClient
from influxdb_client .client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
write_client = client.write_api(write_options=SYNCHRONOUS)

...

client.__del__()

Queries

The result retrieved by QueryApi could be formatted as a:

  1. Flux data structure: FluxTable, FluxColumn and FluxRecord

  2. csv.reader which will iterate over CSV lines

  3. Raw unprocessed results as a str iterator

  4. Pandas DataFrame

The API also support streaming FluxRecord via query_stream, see example below:

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

"""
Prepare data
"""

_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)

write_api.write(bucket="my-bucket", org="my-org", record=[_point1, _point2])

"""
Query: using Table structure
"""
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for table in tables:
    print(table)
    for record in table.records:
        print(record.values)

print()
print()

"""
Query: using Stream
"""
records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)')

for record in records:
    print(f'Temperature in {record["location"]} is {record["_value"]}')

"""
Interrupt a stream after retrieve a required data
"""
large_stream = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)')
for record in large_stream:
    if record["location"] == "New York":
        print(f'New York temperature: {record["_value"]}')
        break

large_stream.close()

print()
print()

"""
Query: using csv library
"""
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)',
                                 dialect=Dialect(header=False, delimiter=",", comment_prefix="#", annotations=[],
                                                 date_time_format="RFC3339"))
for csv_line in csv_result:
    if not len(csv_line) == 0:
        print(f'Temperature in {csv_line[9]} is {csv_line[6]}')

"""
Close client
"""
client.__del__()

Pandas DataFrame

The client is able to retrieve data in Pandas DataFrame format thought query_data_frame:

from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

"""
Prepare data
"""

_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)

write_api.write(bucket="my-bucket", org="my-org", record=[_point1, _point2])

"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
                                        '|> range(start: -10m) '
                                        '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
                                        '|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())

"""
Close client
"""
client.__del__()

Output:

    result table  location  temperature
0  _result     0  New York         24.3
1  _result     1    Prague         25.3

Examples

How to efficiently import large dataset

The following example shows how to import dataset with dozen megabytes. If you would like to import gigabytes of data then use our multiprocessing example: import_data_set_multiprocessing.py for use a full capability of your hardware.

"""
Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0

https://datahub.io/core/finance-vix#data
"""

from collections import OrderedDict
from csv import DictReader

import rx
from rx import operators as ops

from influxdb_client import InfluxDBClient, Point, WriteOptions

def parse_row(row: OrderedDict):
    """Parse row of CSV file into Point with structure:

        financial-analysis,type=ily close=18.47,high=19.82,low=18.28,open=19.82 1198195200000000000

    CSV format:
        Date,VIX Open,VIX High,VIX Low,VIX Close\n
        2004-01-02,17.96,18.68,17.54,18.22\n
        2004-01-05,18.45,18.49,17.44,17.49\n
        2004-01-06,17.66,17.67,16.19,16.73\n
        2004-01-07,16.72,16.75,15.5,15.5\n
        2004-01-08,15.42,15.68,15.32,15.61\n
        2004-01-09,16.15,16.88,15.57,16.75\n
        ...

    :param row: the row of CSV file
    :return: Parsed csv row to [Point]
    """

    """
     For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead:
     """
     # from pytz import UTC
     # import ciso8601
     # from influxdb_client.client.write.point import EPOCH
     #
     # time = (UTC.localize(ciso8601.parse_datetime(row["Date"])) - EPOCH).total_seconds() * 1e9
     # return f"financial-analysis,type=vix-daily" \
     #        f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \
     #        f" {int(time)}"

    return Point("financial-analysis") \
        .tag("type", "vix-daily") \
        .field("open", float(row['VIX Open'])) \
        .field("high", float(row['VIX High'])) \
        .field("low", float(row['VIX Low'])) \
        .field("close", float(row['VIX Close'])) \
        .time(row['Date'])


"""
Converts vix-daily.csv into sequence of datad point
"""
data = rx \
    .from_iterable(DictReader(open('vix-daily.csv', 'r'))) \
    .pipe(ops.map(lambda row: parse_row(row)))

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=True)

"""
Create client that writes data in batches with 50_000 items.
"""
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))

"""
Write data into InfluxDB
"""
write_api.write(org="my-org", bucket="my-bucket", record=data)
write_api.__del__()

"""
Querying max value of CBOE Volatility Index
"""
query = 'from(bucket:"my-bucket")' \
        ' |> range(start: 0, stop: now())' \
        ' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
        ' |> max()'
result = client.query_api().query(org="my-org", query=query)

"""
Processing results
"""
print()
print("=== results ===")
print()
for table in result:
    for record in table.records:
        print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))

"""
Close client
"""
client.__del__()

Efficiency write data from IOT sensor

"""
Efficiency write data from IOT sensor - write changed temperature every minute
"""
import atexit
import platform
from datetime import timedelta

import psutil as psutil
import rx
from rx import operators as ops

from influxdb_client import InfluxDBClient, WriteApi, WriteOptions

def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
    """Close clients after terminate a script.

    :param db_client: InfluxDB client
    :param write_api: WriteApi
    :return: nothing
    """
    write_api.__del__()
    db_client.__del__()


def sensor_temperature():
    """Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].

    :return: actual CPU temperature
    """
    os_name = platform.system()
    if os_name == 'Darwin':
        from subprocess import check_output
        output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
        import re
        return re.findall(r'\d+', str(output))[0]
    else:
        return psutil.sensors_temperatures()["coretemp"][0]


def line_protocol(temperature):
    """Create a InfluxDB line protocol with structure:

        iot_sensor,hostname=mine_sensor_12,type=temperature value=68

    :param temperature: the sensor temperature
    :return: Line protocol to write into InfluxDB
    """

    import socket
    return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)


"""
Read temperature every minute; distinct_until_changed - produce only if temperature change
"""
data = rx\
    .interval(period=timedelta(seconds=60))\
    .pipe(ops.map(lambda t: sensor_temperature()),
          ops.distinct_until_changed(),
          ops.map(lambda temperature: line_protocol(temperature)))

_db_client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=True)

"""
Create client that writes data into InfluxDB
"""
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
_write_api.write(org="my-org", bucket="my-bucket", record=data)


"""
Call after terminate a script
"""
atexit.register(on_exit, _db_client, _write_api)

input()

Connect to InfluxDB Cloud

The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud.

At first point you should create an authentication token as is described here.

After that you should configure properties: influx_cloud_url, influx_cloud_token, bucket and org in a influx_cloud.py example.

The last step is run a python script via: python3 influx_cloud.py.

"""
Connect to InfluxDB 2.0 - write data and query them
"""

from datetime import datetime

from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

"""
Configure credentials
"""
influx_cloud_url = 'https://us-west-2-1.aws.cloud2.influxdata.com'
influx_cloud_token = '...'
bucket = '...'
org = '...'

client = InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token)
try:
    kind = 'temperature'
    host = 'host1'
    device = 'opt-123'

    """
    Write data by Point structure
    """
    point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3).time(time=datetime.utcnow())

    print(f'Writing to InfluxDB cloud: {point.to_line_protocol()} ...')

    write_api = client.write_api(write_options=SYNCHRONOUS)
    write_api.write(bucket=bucket, org=org, record=point)

    print()
    print('success')
    print()
    print()

    """
    Query written data
    """
    query = f'from(bucket: "{bucket}") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "{kind}")'
    print(f'Querying from InfluxDB cloud: "{query}" ...')
    print()

    query_api = client.query_api()
    tables = query_api.query(query=query, org=org)

    for table in tables:
        for row in table.records:
            print(f'{row.values["_time"]}: host={row.values["host"]},device={row.values["device"]} '
                  f'{row.values["_value"]} °C')

    print()
    print('success')

except Exception as e:
    print(e)
finally:
    client.close()

How to use Jupyter + Pandas + InfluxDB 2

The first example shows how to use client capabilities to predict stock price via Keras, TensorFlow, sklearn:

The example is taken from Kaggle.

https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction.gif

Result:

https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction-results.png

The second example shows how to use client capabilities to realtime visualization via hvPlot, Streamz, RxPY:

https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/realtime-result.gif

Advanced Usage

Gzip support

InfluxDBClient does not enable gzip compression for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

from influxdb_client import InfluxDBClient

_db_client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", enable_gzip=True)

Local tests

# start/restart InfluxDB2 on local machine using docker
./scripts/influxdb-restart.sh

# install requirements
pip install -r requirements.txt --user
pip install -r extra-requirements.txt --user
pip install -r test-requirements.txt --user

# run unit & integration tests
pytest tests

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/influxdata/influxdb-client-python.

License

The gem is available as open source under the terms of the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

influxdb_client-1.5.0.tar.gz (196.6 kB view details)

Uploaded Source

Built Distribution

influxdb_client-1.5.0-py3-none-any.whl (484.2 kB view details)

Uploaded Python 3

File details

Details for the file influxdb_client-1.5.0.tar.gz.

File metadata

  • Download URL: influxdb_client-1.5.0.tar.gz
  • Upload date:
  • Size: 196.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.42.1 CPython/3.7.6

File hashes

Hashes for influxdb_client-1.5.0.tar.gz
Algorithm Hash digest
SHA256 3f84400107dfce9e5ac044468b9bf2c4f185de81162813811f8c74453e4d62c2
MD5 764d7db38121d0643f33133fa2cecb0e
BLAKE2b-256 8cec3fe6a73a636444e0813df0d2713b6a09ffe9ca913015aa95e7ebe364ed78

See more details on using hashes here.

File details

Details for the file influxdb_client-1.5.0-py3-none-any.whl.

File metadata

  • Download URL: influxdb_client-1.5.0-py3-none-any.whl
  • Upload date:
  • Size: 484.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.42.1 CPython/3.7.6

File hashes

Hashes for influxdb_client-1.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1a5ebe0f8d4835a123308fbc23459a9458c7f6a392aaac9fa99f6e1c7794e08b
MD5 e7a687f58d97c002f548180d9716a15d
BLAKE2b-256 cbbb06f064277ba3e8e5a09fa627947d338b7d1663f68a7650fbe5ff9e4a9585

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page