Skip to main content

Inlfuxed influx query language and orm

Project description

Source code and development can be followed in this repository

Feel free to open issues or share your experience :)

Installation

Navigate to folder in terminal

Run

python setup.py install

Or

pip install influxed

and your good to go.

Connect to server:

from influxed.orm import engine
engine.add_server('http://influxserverurlorip:port', 'username', 'password', reflect=True)

Or secure:

from influxed.orm import engine
engine.add_server('https://influxserverurlorip:port', 'username', 'password', reflect=True)

Or with asyncio:

from influxed.orm import engine
await engine.add_server('https://influxserverurlorip:port', 'username', 'password', reflect=True, isAsync=True)

Example explorer

Show databases

engine.server.ls()

Show measurements

engine.server.[database_name].ls()

Show fields

engine.server.[database_name].[measurement_name].ls()

In the case that a name of a measurement or a database is not complient with python,

then they can be accessed by using indexing

Example

engine.server['1Db']['measurement one'].ls()

Select data from a measurement:

engine.server.database.meter_usage.query.select('field1', 'field2').all()

Another way is to select it from the measurement itself

engine.server.database.meter_usage.field1.query.all()

Limit selection:

engine.server.database.meter_usage.field1.query.first(5) OR
engine.server.database.meter_usage.field1.query.last(7) OR
engine.server.database.meter_usage.field1.query.limit(2).all()

Filtering:

col = engine.server.database.meter_usage.field1
col.query.filter( # Everything between 5 and 6
    col > 5,
    col <= 6
).all()

Filtering by time

import datetime as dt
from influxed.ifql import time

engine.server.database.meter_usage.field1.query.filter(
    time > dt.datetime.now()
).all()

Filtering using OR statement

from influxed.ifql import OPERATORS

col = engine.server.database.meter_usage.field1
col.query.filter(
    OPERATORS.or_(
        time > dt.datetime.now(), # or
        col > 2,
    )

).all()

Aggregators:

col = engine.server.database.meter_usage.field1
col.query.min.all() # Minimum value
col.query.max.all() # Maximum value
col.query.mean.all() # Mean value
col.query.sum.all() # Summed Value
col.query.std.all() # Standard deviation

Group by function

from influxed.ifql import time

col = engine.server.database.meter_usage.field1

col.query.min.group_by(
    time('2d') # Group into buckets of 2 days and take the minimum value
).all()

# Available interval selectors:
# week = 'w'
# day = 'd'
# hour = 'h'
# minute = 'm'
# sec = 's'
# milisec = 'ms'
# microsec ='u'
# nanosec = 'ns'

Lets spice things up:


engine.server.dap.meterusage.query.select(
    '871694831000088656',
    '871690910000005079'
).filter(
    time > dt.datetime(2016,1,1),
    time > dt.datetime(2016,2,1),
).group_by(
    time('1d')
).sum().fill(0).all()

# Will give you:
# 'SELECT SUM("871694831000088656"), SUM("871690910000005079") FROM meterusage WHERE time > \'2016-01-01 00:00:00.000\' AND time > \'2016-02-01 00:00:00.000\'  GROUP BY time(1d) FILL(1)'

With algebraic aritmatic:

a = engine.server.database.meter_usage.field1
b = engine.server.database.meter_usage.field2
engine.query.select((a.sum()+2)/b)*42).filter( # Everything between 5 and 6
    a > 5,
    b <= 6
).all()
# Will give you:
# SELECT ((SUM(field1) + 2) / field2) * 42 WHERE field1 > 5 AND field2 < 6

With alegebaic regex:

from influxed import like, nlike
engine.query.select('a').filter('b' |like|  '/_percent/').all()
engine.query.select('a').filter('b' |nlike|  '/_percent/').all()
``` Or

from influxed import like, nlike a = engine.server.database.meter_usage.field1 b = engine.server.database.meter_usage.field2

engine.query.select(a).filter(b |like| '/_percent/').all() engine.query.select(a).filter(b.like('/_percent/')).all()




## Test
Run
python -m unittest discover -p '*_test.py' -s src -t .

# Debugging -------
One can always call
    .format()
on any statement after the .query
in order to see the raw sql-produced by a given statement
Additional debug information can be obtained by attaching a logger:

import logging logger = logging.getLogger() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) logger.setLevel('DEBUG')


Available logging channels:
    - Transport layer = InfluxedClient
    - Transport layer Retry logic = InfluxedRetryStrat


Code coverage:

coverage run --source=src/influxed -m xmlrunner -o test-reports discover -s ./src -p *_test.py coverage xml sonar-scanner


### Lastly, if you find bugs or have feature requests feel free to open issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

influxed-0.1.5.2-py3-none-any.whl (56.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page