Interactive Kafka stream visualization API for Jupyter using TensorWatch.
Project description
TensorWatch API (tensorwatch-api)
tensorwatch-api is a Python library designed to simplify real-time streaming data visualization from Kafka directly inside Jupyter notebooks.
It wraps TensorWatch and provides an interactive, high-level API with ipywidgets for controlling visualization parameters on the fly.
This package is ideal for data scientists, engineers, and analysts who want to inspect, monitor, and visualize streaming data without leaving JupyterLab.
Features
- Interactive UI: Control plot parameters like window size, colors, and history in real-time.
- Chainable API: Fluent API for connecting to Kafka, defining streams, and drawing visualizations.
- Dual Kafka Backends: Supports
confluent-kafka(default) andpykafka. - Live Metrics: Display real-time throughput and latency metrics alongside visualizations.
- Custom Processing: Apply Python lambda expressions for aggregation or preprocessing.
- Efficient Updates: Debouncing ensures smooth UI updates without overloading the kernel.
Installation
Install the package and its dependencies:
pip install tensorwatch-api
pip install tensorwatch ipywidgets matplotlib
Install a Kafka client:
# Default Kafka connector
pip install confluent-kafka
# Or pykafka connector
pip install pykafka
##Quick Start
Enable interactive matplotlib backend
%matplotlib widget
Import the library
from tensorwatch_api import twapi as tw
Initialize the API
test = tw()
Create a connector to a Kafka topic
test.connector( topic='mytopic', host='localhost:9092', cluster_size=5, queue_length=100000, parsetype="protobuf", # optional: json, pickle, avro, protobuf parser_extra="benchmark_pb2", # module name for protobuf protobuf_message="BenchmarkMessage",# class name for protobuf schema_path=r"C:\path\to\protobuf" # path to protobuf module )
Define the stream processing logic
test.stream(expr='lambda d: sum(msg["seq"] for msg in d.data) if d.data else 0')
Draw the UI with metrics
test.draw_with_metrics()
API Reference
twapi()
Initializes the API wrapper and UI components.
.connector(...)
Creates a Kafka consumer in a background thread, fetching data and reporting metrics.
Arguments:
topic(str): Kafka topic to consume.host(str): Broker host (localhost:9092).conn_type(str): Connector type ('kafka'or'pykafka'). Default is'kafka'.parsetype(str): Message format ('json','pickle','avro','protobuf'). Default'json'.cluster_size(int): Number of consumer threads. Default 1.queue_length(int): Max messages in memory. Default 50000.- Other parameters:
schema_path,protobuf_message,parser_extra,random_sampling,countmin_width,countmin_depth,ordering_field.
.stream(expr)
Defines the data processing logic for the stream.
expr(str): A Python lambda expression. Receivesd(connector instance) and returns a single numerical value to plot.
.draw() / .draw_with_metrics()
Renders interactive UI widgets and plot area.
.draw(): Standard UI..draw_with_metrics(): Includes real-time benchmark metrics.
UI Controls
- Reset Button: Reset all options and clear the plot.
- Start / Apply Changes Button: Initially
"Start". Activates on first Kafka message, then applies UI changes. - Accordion Options:
- Window Size: Number of points shown.
- Window Width: Width of plot.
- Pick a Color: Plot line color.
- Date / Use Offset / Dim History: TensorWatch visualization options.
License
This project is licensed under the MIT License. See LICENSE for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tensorwatch_api-1.1.0-py3-none-any.whl.
File metadata
- Download URL: tensorwatch_api-1.1.0-py3-none-any.whl
- Upload date:
- Size: 7.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.10.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
04b7aaa16945798baeca00c64caaac302d096536e8bd8487b38c1c53a366e721
|
|
| MD5 |
32f80fded599f45153e7d3832b1c6288
|
|
| BLAKE2b-256 |
f690a4626ff68a1d58c9c9b8a18e9bf7cec873c2bd620659744f63be45a86e63
|