Inlfuxed influx query language and orm
Project description
Source code and development can be followed in this repository
Feel free to open issues or share your experience :)
Installation
Navigate to folder in terminal
Run
python setup.py install
Or
pip install influxed
and your good to go.
Connect to server:
from influxed.orm import engine
engine.add_server('http://influxserverurlorip:port', 'username', 'password', reflect=True)
Or secure:
from influxed.orm import engine
engine.add_server('https://influxserverurlorip:port', 'username', 'password', reflect=True)
Or with asyncio:
from influxed.orm import engine
await engine.add_server('https://influxserverurlorip:port', 'username', 'password', reflect=True, isAsync=True)
Example explorer
Show databases
engine.server.ls()
Show measurements
engine.server.[database_name].ls()
Show fields
engine.server.[database_name].[measurement_name].ls()
In the case that a name of a measurement or a database is not complient with python,
then they can be accessed by using indexing
Example
engine.server['1Db']['measurement one'].ls()
Select data from a measurement:
engine.server.database.meter_usage.query.select('field1', 'field2').all()
Another way is to select it from the measurement itself
engine.server.database.meter_usage.field1.query.all()
Limit selection:
engine.server.database.meter_usage.field1.query.first(5) OR
engine.server.database.meter_usage.field1.query.last(7) OR
engine.server.database.meter_usage.field1.query.limit(2).all()
Filtering:
col = engine.server.database.meter_usage.field1
col.query.filter( # Everything between 5 and 6
col > 5,
col <= 6
).all()
Filtering by time
import datetime as dt
from influxed.ifql import time
engine.server.database.meter_usage.field1.query.filter(
time > dt.datetime.now()
).all()
Filtering using OR statement
from influxed.ifql import OPERATORS
col = engine.server.database.meter_usage.field1
col.query.filter(
OPERATORS.or_(
time > dt.datetime.now(), # or
col > 2,
)
).all()
Aggregators:
col = engine.server.database.meter_usage.field1
col.query.min.all() # Minimum value
col.query.max.all() # Maximum value
col.query.mean.all() # Mean value
col.query.sum.all() # Summed Value
col.query.std.all() # Standard deviation
Group by function
from influxed.ifql import time
col = engine.server.database.meter_usage.field1
col.query.min.group_by(
time('2d') # Group into buckets of 2 days and take the minimum value
).all()
# Available interval selectors:
# week = 'w'
# day = 'd'
# hour = 'h'
# minute = 'm'
# sec = 's'
# milisec = 'ms'
# microsec ='u'
# nanosec = 'ns'
Lets spice things up:
engine.server.dap.meterusage.query.select(
'871694831000088656',
'871690910000005079'
).filter(
time > dt.datetime(2016,1,1),
time > dt.datetime(2016,2,1),
).group_by(
time('1d')
).sum().fill(0).all()
# Will give you:
# 'SELECT SUM("871694831000088656"), SUM("871690910000005079") FROM meterusage WHERE time > \'2016-01-01 00:00:00.000\' AND time > \'2016-02-01 00:00:00.000\' GROUP BY time(1d) FILL(1)'
With algebraic aritmatic:
a = engine.server.database.meter_usage.field1
b = engine.server.database.meter_usage.field2
engine.query.select((a.sum()+2)/b)*42).filter( # Everything between 5 and 6
a > 5,
b <= 6
).all()
# Will give you:
# SELECT ((SUM(field1) + 2) / field2) * 42 WHERE field1 > 5 AND field2 < 6
With alegebaic regex:
from influxed import like, nlike
engine.query.select('a').filter('b' |like| '/_percent/').all()
engine.query.select('a').filter('b' |nlike| '/_percent/').all()
``` Or
from influxed import like, nlike a = engine.server.database.meter_usage.field1 b = engine.server.database.meter_usage.field2
engine.query.select(a).filter(b |like| '/_percent/').all() engine.query.select(a).filter(b.like('/_percent/')).all()
## Test
Run
python -m unittest discover -p '*_test.py' -s src -t .
# Debugging -------
One can always call
.format()
on any statement after the .query
in order to see the raw sql-produced by a given statement
Additional debug information can be obtained by attaching a logger:
import logging logger = logging.getLogger() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) logger.setLevel('DEBUG')
Available logging channels:
- Transport layer = InfluxedClient
- Transport layer Retry logic = InfluxedRetryStrat
Code coverage:
coverage run --source=src/influxed -m xmlrunner -o test-reports discover -s ./src -p *_test.py coverage xml sonar-scanner
### Lastly, if you find bugs or have feature requests feel free to open issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for influxed-0.1.5.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 97e1af7834acf60541b4ae153e98e71def4f35cba78aa44ad65ed1c3bdcb6fd0 |
|
MD5 | f596319f489ca1689373d0687f7a7f43 |
|
BLAKE2b-256 | fa73cb869a6cacec76a66d4e1e189d1b409291c9eea589766466f49d4df9bdcd |