A Kafka consumer for InfluxDB
Project description
Kafka-InfluxDB
==============
|Build Status| |Coverage Status| |Code Climate| |PyPi Version|
|Scrutinizer|
| A Kafka consumer for InfluxDB written in Python.
| Supports InfluxDB 0.9.x and up. For InfluxDB 0.8.x support, check out
the `0.3.0 tag <https://github.com/mre/kafka-influxdb/tree/v0.3.0>`__.
Use cases
---------
| Kafka will serve as a buffer for your metric data during high load.
| Also it's useful for sending metrics from offshore data centers with
unreliable connections to your monitoring backend.
.. figure:: https://raw.githubusercontent.com/mre/kafka-influxdb/master/assets/schema-small.png
:alt:
Quickstart
----------
For a quick test, run kafka-influxdb inside a container alongside Kafka
and InfluxDB. Some sample messages are generated automatically on
startup (using kafkacat).
Python 2:
^^^^^^^^^
::
make
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s
Python 3:
^^^^^^^^^
::
make RUNTIME=py3
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s
PyPy 5.x
^^^^^^^^
::
make RUNTIME=pypy
docker exec -it kafkainfluxdb
pypy3 -m kafka_influxdb -c config_example.yaml -s --kafka_reader=kafka_influxdb.reader.kafka_python
(Note that one additional flag is given:
``--kafka_reader=kafka_influxdb.reader.kafka_python``. This is because
PyPy is incompabile with the confluent kafka consumer which is a
C-extension to librdkafka. Therefore we use the kafka\_python library
here, which is compatible with PyPy but a bit slower.)
Installation
------------
::
pip install kafka_influxdb
kafka_influxdb -c config-example.yaml
Performance
-----------
| The following graph shows the number of messages/s read from Kafka for
various Python versions and Kafka consumer plugins.
| This is testing against a Kafka topic with 10 partitions and five
message brokers. As you can see the best performance is achieved on
Python 3 using the ``-O`` flag for bytecode optimization in
combination with the ``confluent-kafka`` reader (default setup). Note
that encoding and sending the data to InfluxDB might lower this
maximum performance although you should still see a significant
performance boost compared to logstash.
.. figure:: assets/benchmark.png
:alt: Build Status
Build Status
Benchmark
---------
For a quick benchmark, you can start a complete
``kafkacat -> Kafka -> kafka_influxdb -> Influxdb`` setup with the
following command:
::
make
This will immediately start reading messages from Kafka and write them
into InfluxDB. To see the output, you can use the InfluxDB cli.
::
docker exec -it docker_influxdb_1 bash # Double check your container name
influx
use metrics
show measurements
Supported formats
-----------------
You can write a custom encoder to support any input and output format
(even fancy things like Protobuf). Look at the examples inside the
``encoder`` directory to get started. The following formats are
officially supported:
Input formats
~~~~~~~~~~~~~
- `Collectd Graphite ASCII
format <https://collectd.org/wiki/index.php/Graphite>`__: :
::
mydatacenter.myhost.load.load.shortterm 0.45 1436357630
- `Collectd JSON format <https://collectd.org/wiki/index.php/JSON>`__:
.. code:: json
[{
"values":[
0.6
],
"dstypes":[
"gauge"
],
"dsnames":[
"value"
],
"time":1444745144.824,
"interval":10.000,
"host":"xx.example.internal",
"plugin":"cpu",
"plugin_instance":"1",
"type":"percent",
"type_instance":"system"
}]
- `Raw InfluxDB line protocol (e.g. for Telegraf
support) <https://github.com/mre/kafka-influxdb/issues/40>`__:
Output formats
~~~~~~~~~~~~~~
- `InfluxDB 0.9.2+ line protocol
format <https://influxdb.com/docs/v0.9/write_protocols/line.html>`__:
:
::
load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630
- `InfluxDB 0.8.x JSON
format <https://influxdb.com/docs/v0.8/api/reading_and_writing_data.html#writing-data-through-http>`__
(*deprecated*)
Configuration
-------------
Take a look at the ``config-example.yaml`` to find out how to create a
config file. You can overwrite the settings from the commandline. The
following parameters are allowed:
+---------------------------+------------------------------------------------+
| Option | Description |
+===========================+================================================+
| ``-h``, ``--help`` | Show help message and exit |
+---------------------------+------------------------------------------------+
| ``--kafka_host KAFKA_HOST | Hostname or IP of Kafka message broker |
| `` | (default: localhost) |
+---------------------------+------------------------------------------------+
| ``--kafka_port KAFKA_PORT | Port of Kafka message broker (default: 9092) |
| `` | |
+---------------------------+------------------------------------------------+
| ``--kafka_topic KAFKA_TOP | Topic for metrics (default: my\_topic) |
| IC`` | |
+---------------------------+------------------------------------------------+
| ``--kafka_group KAFKA_GRO | Kafka consumer group (default: my\_group) |
| UP`` | |
+---------------------------+------------------------------------------------+
| ``--kafka_reader KAFKA_RE | Kafka client library to use (kafka\_python or |
| ADER`` | confluent) (default: |
| | kafka\_influxdb.reader.confluent) |
+---------------------------+------------------------------------------------+
| ``--influxdb_host INFLUXD | InfluxDB hostname or IP (default: localhost) |
| B_HOST`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_port INFLUXD | InfluxDB API port (default: 8086) |
| B_PORT`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_user INFLUXD | InfluxDB username (default: root) |
| B_USER`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_password INF | InfluxDB password (default: root) |
| LUXDB_PASSWORD`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_dbname INFLU | InfluxDB database to write metrics into |
| XDB_DBNAME`` | (default: metrics) |
+---------------------------+------------------------------------------------+
| ``--influxdb_use_ssl`` | Use SSL connection for InfluxDB (default: |
| | False) |
+---------------------------+------------------------------------------------+
| ``--influxdb_verify_ssl`` | Verify the SSL certificate before connecting |
| | (default: False) |
+---------------------------+------------------------------------------------+
| ``--influxdb_timeout INFL | Max number of seconds to establish a |
| UXDB_TIMEOUT`` | connection to InfluxDB (default: 5) |
+---------------------------+------------------------------------------------+
| ``--influxdb_use_udp`` | Use UDP connection for InfluxDB (default: |
| | False) |
+---------------------------+------------------------------------------------+
| ``--influxdb_retention_po | Retention policy for incoming metrics |
| licy INFLUXDB_RETENTION_P | (default: autogen) |
| OLICY`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_time_precisi | Precision of incoming metrics. Can be one of |
| on INFLUXDB_TIME_PRECISIO | 's', 'm', 'ms', 'u' (default: s) |
| N`` | |
+---------------------------+------------------------------------------------+
| ``--encoder ENCODER`` | Input encoder which converts an incoming |
| | message to dictionary (default: |
| | collectd\_graphite\_encoder) |
+---------------------------+------------------------------------------------+
| ``--buffer_size BUFFER_SI | Maximum number of messages that will be |
| ZE`` | collected before flushing to the backend |
| | (default: 1000) |
+---------------------------+------------------------------------------------+
| ``-c CONFIGFILE``, | Configfile path (default: None) |
| ``--configfile CONFIGFILE | |
| `` | |
+---------------------------+------------------------------------------------+
| ``-s``, ``--statistics`` | Show performance statistics (default: True) |
+---------------------------+------------------------------------------------+
| ``-v``, ``--verbose`` | Set verbosity level. Increase verbosity by |
| | adding a v: -v -vv -vvv (default: 0) |
+---------------------------+------------------------------------------------+
| ``--version`` | Show version |
+---------------------------+------------------------------------------------+
Comparison with other tools
---------------------------
There is a Kafka input plugin and an InfluxDB output plugin for
**logstash**. It supports Influxdb 0.9+. We've achieved a message
throughput of around **5000 messages/second** with that setup. Check out
the configuration at docker/logstash/config.conf. You can run the
benchmark yourself:
::
make RUNTIME=logstash
docker exec -it logstash
logstash -f config.conf
Please send a Pull Request if you know of other tools that can be
mentioned here.
.. |Build Status| image:: https://travis-ci.org/mre/kafka-influxdb.svg?branch=master
:target: https://travis-ci.org/mre/kafka-influxdb
.. |Coverage Status| image:: https://codecov.io/gh/mre/kafka-influxdb/branch/master/graph/badge.svg
:target: https://codecov.io/gh/mre/kafka-influxdb
.. |Code Climate| image:: https://codeclimate.com/github/mre/kafka-influxdb/badges/gpa.svg
:target: https://codeclimate.com/github/mre/kafka-influxdb
.. |PyPi Version| image:: https://badge.fury.io/py/kafka_influxdb.svg
:target: https://badge.fury.io/py/kafka_influxdb
.. |Scrutinizer| image:: https://scrutinizer-ci.com/g/mre/kafka-influxdb/badges/quality-score.png?b=master
:target: https://scrutinizer-ci.com/g/mre/kafka-influxdb/?branch=master
==============
|Build Status| |Coverage Status| |Code Climate| |PyPi Version|
|Scrutinizer|
| A Kafka consumer for InfluxDB written in Python.
| Supports InfluxDB 0.9.x and up. For InfluxDB 0.8.x support, check out
the `0.3.0 tag <https://github.com/mre/kafka-influxdb/tree/v0.3.0>`__.
Use cases
---------
| Kafka will serve as a buffer for your metric data during high load.
| Also it's useful for sending metrics from offshore data centers with
unreliable connections to your monitoring backend.
.. figure:: https://raw.githubusercontent.com/mre/kafka-influxdb/master/assets/schema-small.png
:alt:
Quickstart
----------
For a quick test, run kafka-influxdb inside a container alongside Kafka
and InfluxDB. Some sample messages are generated automatically on
startup (using kafkacat).
Python 2:
^^^^^^^^^
::
make
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s
Python 3:
^^^^^^^^^
::
make RUNTIME=py3
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s
PyPy 5.x
^^^^^^^^
::
make RUNTIME=pypy
docker exec -it kafkainfluxdb
pypy3 -m kafka_influxdb -c config_example.yaml -s --kafka_reader=kafka_influxdb.reader.kafka_python
(Note that one additional flag is given:
``--kafka_reader=kafka_influxdb.reader.kafka_python``. This is because
PyPy is incompabile with the confluent kafka consumer which is a
C-extension to librdkafka. Therefore we use the kafka\_python library
here, which is compatible with PyPy but a bit slower.)
Installation
------------
::
pip install kafka_influxdb
kafka_influxdb -c config-example.yaml
Performance
-----------
| The following graph shows the number of messages/s read from Kafka for
various Python versions and Kafka consumer plugins.
| This is testing against a Kafka topic with 10 partitions and five
message brokers. As you can see the best performance is achieved on
Python 3 using the ``-O`` flag for bytecode optimization in
combination with the ``confluent-kafka`` reader (default setup). Note
that encoding and sending the data to InfluxDB might lower this
maximum performance although you should still see a significant
performance boost compared to logstash.
.. figure:: assets/benchmark.png
:alt: Build Status
Build Status
Benchmark
---------
For a quick benchmark, you can start a complete
``kafkacat -> Kafka -> kafka_influxdb -> Influxdb`` setup with the
following command:
::
make
This will immediately start reading messages from Kafka and write them
into InfluxDB. To see the output, you can use the InfluxDB cli.
::
docker exec -it docker_influxdb_1 bash # Double check your container name
influx
use metrics
show measurements
Supported formats
-----------------
You can write a custom encoder to support any input and output format
(even fancy things like Protobuf). Look at the examples inside the
``encoder`` directory to get started. The following formats are
officially supported:
Input formats
~~~~~~~~~~~~~
- `Collectd Graphite ASCII
format <https://collectd.org/wiki/index.php/Graphite>`__: :
::
mydatacenter.myhost.load.load.shortterm 0.45 1436357630
- `Collectd JSON format <https://collectd.org/wiki/index.php/JSON>`__:
.. code:: json
[{
"values":[
0.6
],
"dstypes":[
"gauge"
],
"dsnames":[
"value"
],
"time":1444745144.824,
"interval":10.000,
"host":"xx.example.internal",
"plugin":"cpu",
"plugin_instance":"1",
"type":"percent",
"type_instance":"system"
}]
- `Raw InfluxDB line protocol (e.g. for Telegraf
support) <https://github.com/mre/kafka-influxdb/issues/40>`__:
Output formats
~~~~~~~~~~~~~~
- `InfluxDB 0.9.2+ line protocol
format <https://influxdb.com/docs/v0.9/write_protocols/line.html>`__:
:
::
load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630
- `InfluxDB 0.8.x JSON
format <https://influxdb.com/docs/v0.8/api/reading_and_writing_data.html#writing-data-through-http>`__
(*deprecated*)
Configuration
-------------
Take a look at the ``config-example.yaml`` to find out how to create a
config file. You can overwrite the settings from the commandline. The
following parameters are allowed:
+---------------------------+------------------------------------------------+
| Option | Description |
+===========================+================================================+
| ``-h``, ``--help`` | Show help message and exit |
+---------------------------+------------------------------------------------+
| ``--kafka_host KAFKA_HOST | Hostname or IP of Kafka message broker |
| `` | (default: localhost) |
+---------------------------+------------------------------------------------+
| ``--kafka_port KAFKA_PORT | Port of Kafka message broker (default: 9092) |
| `` | |
+---------------------------+------------------------------------------------+
| ``--kafka_topic KAFKA_TOP | Topic for metrics (default: my\_topic) |
| IC`` | |
+---------------------------+------------------------------------------------+
| ``--kafka_group KAFKA_GRO | Kafka consumer group (default: my\_group) |
| UP`` | |
+---------------------------+------------------------------------------------+
| ``--kafka_reader KAFKA_RE | Kafka client library to use (kafka\_python or |
| ADER`` | confluent) (default: |
| | kafka\_influxdb.reader.confluent) |
+---------------------------+------------------------------------------------+
| ``--influxdb_host INFLUXD | InfluxDB hostname or IP (default: localhost) |
| B_HOST`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_port INFLUXD | InfluxDB API port (default: 8086) |
| B_PORT`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_user INFLUXD | InfluxDB username (default: root) |
| B_USER`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_password INF | InfluxDB password (default: root) |
| LUXDB_PASSWORD`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_dbname INFLU | InfluxDB database to write metrics into |
| XDB_DBNAME`` | (default: metrics) |
+---------------------------+------------------------------------------------+
| ``--influxdb_use_ssl`` | Use SSL connection for InfluxDB (default: |
| | False) |
+---------------------------+------------------------------------------------+
| ``--influxdb_verify_ssl`` | Verify the SSL certificate before connecting |
| | (default: False) |
+---------------------------+------------------------------------------------+
| ``--influxdb_timeout INFL | Max number of seconds to establish a |
| UXDB_TIMEOUT`` | connection to InfluxDB (default: 5) |
+---------------------------+------------------------------------------------+
| ``--influxdb_use_udp`` | Use UDP connection for InfluxDB (default: |
| | False) |
+---------------------------+------------------------------------------------+
| ``--influxdb_retention_po | Retention policy for incoming metrics |
| licy INFLUXDB_RETENTION_P | (default: autogen) |
| OLICY`` | |
+---------------------------+------------------------------------------------+
| ``--influxdb_time_precisi | Precision of incoming metrics. Can be one of |
| on INFLUXDB_TIME_PRECISIO | 's', 'm', 'ms', 'u' (default: s) |
| N`` | |
+---------------------------+------------------------------------------------+
| ``--encoder ENCODER`` | Input encoder which converts an incoming |
| | message to dictionary (default: |
| | collectd\_graphite\_encoder) |
+---------------------------+------------------------------------------------+
| ``--buffer_size BUFFER_SI | Maximum number of messages that will be |
| ZE`` | collected before flushing to the backend |
| | (default: 1000) |
+---------------------------+------------------------------------------------+
| ``-c CONFIGFILE``, | Configfile path (default: None) |
| ``--configfile CONFIGFILE | |
| `` | |
+---------------------------+------------------------------------------------+
| ``-s``, ``--statistics`` | Show performance statistics (default: True) |
+---------------------------+------------------------------------------------+
| ``-v``, ``--verbose`` | Set verbosity level. Increase verbosity by |
| | adding a v: -v -vv -vvv (default: 0) |
+---------------------------+------------------------------------------------+
| ``--version`` | Show version |
+---------------------------+------------------------------------------------+
Comparison with other tools
---------------------------
There is a Kafka input plugin and an InfluxDB output plugin for
**logstash**. It supports Influxdb 0.9+. We've achieved a message
throughput of around **5000 messages/second** with that setup. Check out
the configuration at docker/logstash/config.conf. You can run the
benchmark yourself:
::
make RUNTIME=logstash
docker exec -it logstash
logstash -f config.conf
Please send a Pull Request if you know of other tools that can be
mentioned here.
.. |Build Status| image:: https://travis-ci.org/mre/kafka-influxdb.svg?branch=master
:target: https://travis-ci.org/mre/kafka-influxdb
.. |Coverage Status| image:: https://codecov.io/gh/mre/kafka-influxdb/branch/master/graph/badge.svg
:target: https://codecov.io/gh/mre/kafka-influxdb
.. |Code Climate| image:: https://codeclimate.com/github/mre/kafka-influxdb/badges/gpa.svg
:target: https://codeclimate.com/github/mre/kafka-influxdb
.. |PyPi Version| image:: https://badge.fury.io/py/kafka_influxdb.svg
:target: https://badge.fury.io/py/kafka_influxdb
.. |Scrutinizer| image:: https://scrutinizer-ci.com/g/mre/kafka-influxdb/badges/quality-score.png?b=master
:target: https://scrutinizer-ci.com/g/mre/kafka-influxdb/?branch=master
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
kafka_influxdb-0.8.0.tar.gz
(18.2 kB
view details)
File details
Details for the file kafka_influxdb-0.8.0.tar.gz
.
File metadata
- Download URL: kafka_influxdb-0.8.0.tar.gz
- Upload date:
- Size: 18.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 08224a453dab3bdf8f2c2ce8321c79defb6a302490fe0410d54db4873195207f |
|
MD5 | 5a1a26eb4b5a0fd63107e070361011fd |
|
BLAKE2b-256 | 6d17095b416a9ac380462f273a321516552f125b43c141e780293cf853450be9 |