Explore Data Pipelines in Apache Kafka.
Project description
Streams Explorer
Explore Apache Kafka data pipelines in Kubernetes.
Note We are participating in the annual Hacktoberfest. If you're looking to contribute, please see our open issues and use the standalone installation for development.
Contents
Features
- Visualization of streaming applications, topics, and connectors
- Monitor all or individual pipelines from multiple namespaces
- Inspection of Avro schema from schema registry
- Integration with streams-bootstrap and faust-bootstrap, or custom streaming app config parsing from Kubernetes deployments using plugins
- Real-time metrics from Prometheus (consumer lag & read rate, replicas, topic size, messages in & out per second, connector tasks)
- Linking to external services for logging and analysis, such as Kibana, Grafana, Loki, AKHQ, Redpanda Console, and Elasticsearch
- Customizable through Python plugins
Overview
Visit our introduction blogpost for a complete overview and demo of Streams Explorer.
Installation
Prerequisites Access to a Kubernetes cluster, where streaming apps and services are deployed.
Docker Compose
- Forward the ports to Prometheus. (Kafka Connect, Schema Registry, and other integrations are optional)
- Start the container
docker compose up
Once the container is started visit http://localhost:8080
Deploying to Kubernetes cluster
- Add the Helm chart repository
helm repo add streams-explorer https://bakdata.github.io/streams-explorer
- Install
helm upgrade --install --values helm-chart/values.yaml streams-explorer streams-explorer/streams-explorer
Standalone
Backend
- Install dependencies using Poetry
poetry install
- Forward the ports to Prometheus. (Kafka Connect, Schema Registry, and other integrations are optional)
- Configure the backend in settings.yaml.
- Start the backend server
poetry run start
Frontend
- Install dependencies
npm ci
- Start the frontend server
npm run build && npm run prod
Visit http://localhost:3000
Configuration
Depending on your type of installation set the configuration for the backend server in this file:
- Docker Compose: docker-compose.yaml
- Kubernetes: helm-chart/values.yaml
- standalone: backend/settings.yaml
In the helm-chart/values.yaml configuration is done either through the config
section using double underscore notation, e.g. K8S__consumer_group_annotation: consumerGroup
or the content of backend/settings.yaml can be pasted under the settings
section. Alternatively all configuration options can be written as environment variables using double underscore notation and the prefix SE
, e.g. SE_K8S__deployment__cluster=false
.
The following configuration options are available:
General
graph.update_interval
Render the graph every x seconds (int, required, default:30
)graph.layout_arguments
Arguments passed to graphviz layout (string, required, default:-Grankdir=LR -Gnodesep=0.8 -Gpad=10
)graph.pipeline_distance
Increase/decrease vertical space between pipeline graphs by X pixels (int, required, default:500
)graph.resolve.input_pattern_topics.all
If true topics that match (extra) input pattern(s) are connected to the streaming app in the graph containing all pipelines (bool, required, default:false
)graph.resolve.input_pattern_topics.pipelines
If true topics that match (extra) input pattern(s) are connected to the streaming app in pipeline graphs (bool, required, default:false
)
Kafka
kafka.enable
Enable Kafka (bool, default:false
)kafka.config
librdkafka configuration properties (reference) (dict, default:{"bootstrap.servers": "localhost:9092"}
)kafka.displayed_information
Configuration options of Kafka topics displayed in the frontend (list of dict)kafka.topic_names_cache.ttl
Cache for retrieving all topic names (used when input topic patterns are resolved) (int, default:3600
)
Kafka Connect
kafkaconnect.url
URL of Kafka Connect server (string, default: None)kafkaconnect.update_interval
Fetch connectors every x seconds (int, default:300
)kafkaconnect.displayed_information
Configuration options of Kafka connectors displayed in the frontend (list of dict)
Kubernetes
k8s.deployment.cluster
Whether streams-explorer is deployed to Kubernetes cluster (bool, required, default:false
)k8s.deployment.context
Name of cluster (string, optional if running in cluster, default:kubernetes-cluster
)k8s.deployment.namespaces
Kubernetes namespaces (list of string, required, default:['kubernetes-namespace']
)k8s.containers.ignore
Name of containers that should be ignored/hidden (list of string, default:['prometheus-jmx-exporter']
)k8s.displayed_information
Details of pod that should be displayed (list of dict, default:[{'name': 'Labels', 'key': 'metadata.labels'}]
)k8s.labels
Labels used to set attributes of nodes (list of string, required, default:['pipeline']
)k8s.pipeline.label
Attribute of nodes the pipeline name should be extracted from (string, required, default:pipeline
)k8s.consumer_group_annotation
Annotation the consumer group name should be extracted from (string, required, default:consumerGroup
)
Schema Registry / Karapace
schemaregistry.url
URL of Confluent Schema Registry or Karapace (string, default: None)
Prometheus
prometheus.url
URL of Prometheus (string, required, default:http://localhost:9090
)
The following exporters are required to collect Kafka metrics for Prometheus:
AKHQ
akhq.enable
Enable AKHQ (bool, default:false
)akhq.url
URL of AKHQ (string, default:http://localhost:8080
)akhq.cluster
Name of cluster (string, default:kubernetes-cluster
)akhq.connect
Name of connect (string, default: None)
Redpanda Console
Redpanda Console can be used instead of AKHQ. (mutually exclusive)
redpanda_console.enable
Enable Redpanda Console (bool, default:false
)redpanda_console.url
URL of Redpanda Console (string, default:http://localhost:8080
)
Grafana
grafana.enable
Enable Grafana (bool, default:false
)grafana.url
URL of Grafana (string, default:http://localhost:3000
)grafana.dashboards.topics
Path to topics dashboard (string), sample dashboards for topics and consumer groups are included in the./grafana
subfoldergrafana.dashboards.consumergroups
Path to consumer groups dashboard (string)
Kibana
kibanalogs.enable
Enable Kibana logs (bool, default:false
)kibanalogs.url
URL of Kibana logs (string, default:http://localhost:5601
)
Loki
Loki can be used instead of Kibana. (mutually exclusive)
loki.enable
Enable Loki logs (bool, default:false
)loki.url
URL of Loki logs (string, default:http://localhost:3000
)
Elasticsearch
for Kafka Connect Elasticsearch connector
esindex.url
URL of Elasticsearch index (string, default:http://localhost:5601/app/kibana#/dev_tools/console
)
Plugins
plugins.path
Path to folder containing plugins relative to backend (string, required, default:./plugins
)plugins.extractors.default
Whether to load default extractors (bool, required, default:true
)
Demo pipeline
ATM Fraud detection with streams-bootstrap
Plugin customization
It is possible to create your own config parser, linker, metric provider, and extractors in Python by implementing the K8sConfigParser
, LinkingService
, MetricProvider
, or Extractor
classes. This way you can customize it to your specific setup and services. As an example we provide the DefaultLinker
as LinkingService
. The default MetricProvider
supports Prometheus. Furthermore the following default Extractor
plugins are included:
If your streaming application deployments are configured through environment variables, following the schema of streams-bootstrap or faust-bootstrap, the Streams Explorer works out-of-the-box with the default deployment parser.
For streams-bootstrap deployments configured through CLI arguments a separate parser can be loaded by creating a Python file (e.g. config_parser.py
) in the plugins folder with the following import statement:
from streams_explorer.core.k8s_config_parser import StreamsBootstrapArgsParser
For other setups a custom config parser plugin can be created by inheriting from the K8sConfigParser
class and implementing the parse
method. In this example we're retrieving the streaming app configurations from an external REST API. In order for a deployment to be indentified as streaming app, input and output topics are required.
import httpx
from streams_explorer.core.k8s_config_parser import K8sConfigParser
from streams_explorer.models.k8s import K8sConfig
class CustomConfigParser(K8sConfigParser):
def get_name(self) -> str:
name = self.k8s_app.metadata.name
if not name:
raise TypeError(f"Name is required for {self.k8s_app.class_name}")
return name
def parse(self) -> K8sConfig:
"""Retrieve app config from REST endpoint."""
name = self.get_name()
data = httpx.get(f"url/config/{name}").json()
return K8sConfig(**data)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file streams_explorer-2.4.0.tar.gz
.
File metadata
- Download URL: streams_explorer-2.4.0.tar.gz
- Upload date:
- Size: 34.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.10.12 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 34a8e0d116c8ebaee91040fc8cb35576e8ec02616139dbc9b0e664347a803f1c |
|
MD5 | 7fdf5c7773fe6066e7dd1cf08ab60486 |
|
BLAKE2b-256 | 6aacd0c421f173460c802f075886fe71a7ab111463fdf15cc9fe2bc14ace566f |
File details
Details for the file streams_explorer-2.4.0-py3-none-any.whl
.
File metadata
- Download URL: streams_explorer-2.4.0-py3-none-any.whl
- Upload date:
- Size: 47.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.10.12 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d15bd8675cc49ba2dc53da0e0cbf20b06c324a07fc77141c6ddbe552ae869246 |
|
MD5 | 37c1d21a156e2b1bbfefc7276a793228 |
|
BLAKE2b-256 | 204b441750fa33b489cb57cf6f437a356efe470ba7bc951d1560d0ba951309f3 |