Skip to main content

Explore Data Pipelines in Apache Kafka.

Project description

Streams Explorer

Explore Data Pipelines in Apache Kafka.

streams-explorer

Contents

Features

  • Visualization of streaming applications, topics, and connectors
  • Monitor all or individual pipelines from multiple namespaces
  • Inspection of Avro schema from schema registry
  • Integration with streams-bootstrap and faust-bootstrap, or custom streaming app config parsing from Kubernetes deployments using plugins
  • Real-time metrics from Prometheus (consumer lag & read rate, replicas, topic size, messages in & out per second, connector tasks)
  • Linking to external services for logging and analysis, such as Kibana, Grafana, Loki, AKHQ, Kowl, and Elasticsearch
  • Customizable through Python plugins

Overview

Visit our introduction blogpost for a complete overview and demo of Streams Explorer.

Installation

Docker Compose

  1. Forward the ports to Prometheus. (Kafka Connect, Schema Registry, and other integrations are optional)
  2. Start the container
docker compose up

Once the container is started visit http://localhost:3000

Deploying to Kubernetes cluster

  1. Add the Helm chart repository
helm repo add streams-explorer https://raw.githubusercontent.com/bakdata/streams-explorer/master/helm-chart/
  1. Install
helm upgrade --install --values helm-chart/values.yaml streams-explorer

Standalone

Backend

  1. Install dependencies using Poetry
poetry install
  1. Forward the ports to Prometheus. (Kafka Connect, Schema Registry, and other integrations are optional)
  2. Configure the backend in settings.yaml.
  3. Start the backend server
poetry run start

Frontend

  1. Install dependencies
npm ci
  1. Start the frontend server
npm run build && npm run prod

Visit http://localhost:3000

Configuration

Depending on your type of installation set the configuration for the backend server in this file:

In the helm-chart/values.yaml configuration is done either through the config section using double underscore notation, e.g. K8S__consumer_group_annotation: consumerGroup or the content of backend/settings.yaml can be pasted under the settings section. Alternatively all configuration options can be written as environment variables using double underscore notation and the prefix SE, e.g. SE_K8S__deployment__cluster=false.

The following configuration options are available:

General

  • graph.update_interval Update the graph every X seconds (int, required, default: 300)
  • graph.layout_arguments Arguments passed to graphviz layout (string, required, default: -Grankdir=LR -Gnodesep=0.8 -Gpad=10)
  • graph.pipeline_distance Increase/decrease vertical space between pipeline graphs by X pixels (int, required, default: 500)
  • graph.resolve.input_pattern_topics.all If true topics that match (extra) input pattern(s) are connected to the streaming app in the graph containing all pipelines (bool, required, default: false)
  • graph.resolve.input_pattern_topics.pipelines If true topics that match (extra) input pattern(s) are connected to the streaming app in pipeline graphs (bool, required, default: false)

Kafka

  • kafka.enable Enable Kafka (bool, default: false)
  • kafka.config librdkafka configuration properties (reference) (dict, default: {"bootstrap.servers": "localhost:9092"})
  • kafka.displayed_information Configuration options of Kafka topics displayed in the frontend (list of dict)
  • kafka.topic_names_cache.ttl Cache for retrieving all topic names (used when input topic patterns are resolved) (int, default: 3600)

Kafka Connect

  • kafkaconnect.url URL of Kafka Connect server (string, default: None)
  • kafkaconnect.displayed_information Configuration options of Kafka connectors displayed in the frontend (list of dict)

Kubernetes

  • k8s.deployment.cluster Whether streams-explorer is deployed to Kubernetes cluster (bool, required, default: false)
  • k8s.deployment.context Name of cluster (string, optional if running in cluster, default: kubernetes-cluster)
  • k8s.deployment.namespaces Kubernetes namespaces (list of string, required, default: ['kubernetes-namespace'])
  • k8s.containers.ignore Name of containers that should be ignored/hidden (list of string, default: ['prometheus-jmx-exporter'])
  • k8s.displayed_information Details of pod that should be displayed (list of dict, default: [{'name': 'Labels', 'key': 'metadata.labels'}])
  • k8s.labels Labels used to set attributes of nodes (list of string, required, default: ['pipeline'])
  • k8s.pipeline.label Attribute of nodes the pipeline name should be extracted from (string, required, default: pipeline)
  • k8s.consumer_group_annotation Annotation the consumer group name should be extracted from (string, required, default: consumerGroup)

Schema Registry / Karapace

  • schemaregistry.url URL of Confluent Schema Registry or Karapace (string, default: None)

Prometheus

  • prometheus.url URL of Prometheus (string, required, default: http://localhost:9090)

The following exporters are required to collect Kafka metrics for Prometheus:

AKHQ

  • akhq.enable Enable AKHQ (bool, default: false)
  • akhq.url URL of AKHQ (string, default: http://localhost:8080)
  • akhq.cluster Name of cluster (string, default: kubernetes-cluster)
  • akhq.connect Name of connect (string, default: None)

Kowl

Kowl can be used instead of AKHQ. (mutually exclusive)

  • kowl.enable Enable Kowl (bool, default: false)
  • kowl.url URL of Kowl (string, default: http://localhost:8080)

Grafana

  • grafana.enable Enable Grafana (bool, default: false)
  • grafana.url URL of Grafana (string, default: http://localhost:3000)
  • grafana.dashboards.topics Path to topics dashboard (string), sample dashboards for topics and consumer groups are included in the ./grafana subfolder
  • grafana.dashboards.consumergroups Path to consumer groups dashboard (string)

Kibana

  • kibanalogs.enable Enable Kibana logs (bool, default: false)
  • kibanalogs.url URL of Kibana logs (string, default: http://localhost:5601)

Loki

Loki can be used instead of Kibana. (mutually exclusive)

  • loki.enable Enable Loki logs (bool, default: false)
  • loki.url URL of Loki logs (string, default: http://localhost:3000)

Elasticsearch

for Kafka Connect Elasticsearch connector

  • esindex.url URL of Elasticsearch index (string, default: http://localhost:5601/app/kibana#/dev_tools/console)

Plugins

  • plugins.path Path to folder containing plugins relative to backend (string, required, default: ./plugins)
  • plugins.extractors.default Whether to load default extractors (bool, required, default: true)

Demo pipeline

demo-pipeline

ATM Fraud detection with streams-bootstrap

Plugin customization

It is possible to create your own config parser, linker, metric provider, and extractors in Python by implementing the K8sConfigParser, LinkingService, MetricProvider, or Extractor classes. This way you can customize it to your specific setup and services. As an example we provide the DefaultLinker as LinkingService. The default MetricProvider supports Prometheus. Furthermore the following default Extractor plugins are included:

If your streaming application deployments are configured through environment variables, following the schema of streams-bootstrap or faust-bootstrap, the Streams Explorer works out-of-the-box with the default deployment parser. For streams-bootstrap deployments configured through CLI arguments a separate parser can be loaded by creating a Python file (e.g. config_parser.py) in the plugins folder with the following import statement:

from streams_explorer.core.k8s_config_parser import StreamsBootstrapArgsParser

For other setups a custom config parser plugin can be created by inheriting from the K8sConfigParser class and implementing the parse method. In this example we're retrieving the streaming app configurations from an external REST API. In order for a deployment to be indentified as streaming app, input and output topics are required.

import httpx

from streams_explorer.core.k8s_config_parser import K8sConfigParser
from streams_explorer.models.k8s_config import K8sConfig


class CustomConfigParser(K8sConfigParser):
    def get_name(self) -> str:
        name = self.k8s_app.metadata.name
        if not name:
            raise TypeError(f"Name is required for {self.k8s_app.get_class_name()}")
        return name

    def parse(self) -> K8sConfig:
        """Retrieve app config from REST endpoint."""
        name = self.get_name()
        data = httpx.get(f"url/config/{name}").json()
        return K8sConfig(**data)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streams-explorer-1.6.0.tar.gz (29.4 kB view hashes)

Uploaded Source

Built Distribution

streams_explorer-1.6.0-py3-none-any.whl (40.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page