Kafka utilities for TapHealth Python services
Project description
TapHealth Kafka
A Python library for working with Kafka in TapHealth services. This library provides abstractions over confluent-kafka to simplify producing and consuming Kafka messages.
Requirements
- Python 3.8+
- confluent-kafka >= 2.0.0
Installation
pip install taphealth-kafka
Usage
This package is designed to be installed and used across TapHealth services. The pattern is:
- Install the package in your project
- Extend the abstract classes (
KafkaProducer/KafkaConsumer) and implement the required abstract methods - Use the provided event dataclasses to structure your messages
Connecting to Kafka
from taphealth_kafka import KafkaClient
# Using context manager (recommended — auto-disconnects)
with KafkaClient() as client:
client.connect(["localhost:9092"])
# ... use client ...
# Or manually manage the lifecycle
client = KafkaClient()
client.connect(["localhost:9092"])
# ... use client ...
client.disconnect()
# Disable automatic topic creation on connect
client.connect(["localhost:9092"], auto_create_topics=False)
Producing Messages
Extend KafkaProducer and implement the topic property:
from taphealth_kafka import KafkaProducer, Topics
class GlucoseLoggedProducer(KafkaProducer):
@property
def topic(self) -> Topics:
return Topics.GLUCOSE_LOGGED
producer = GlucoseLoggedProducer(kafka_client)
Use event dataclasses to structure your messages:
from datetime import datetime
from taphealth_kafka.events import GlucoseLoggedData, GlucoseType, GlucoseRange
glucose_data = GlucoseLoggedData(
user_id="user-12345",
date=datetime.now().strftime("%Y-%m-%d"),
glucose_type=GlucoseType.FASTING,
glucose_reading=105.0,
glucose_range=GlucoseRange.NORMAL,
is_critical=False,
timezone="UTC",
)
# Send the dataclass directly — auto-serialized to camelCase JSON
producer.send(glucose_data.to_dict())
Consuming Messages
Extend KafkaConsumer and implement the required abstract methods:
from taphealth_kafka import KafkaConsumer, Topics
class GlucoseLoggedConsumer(KafkaConsumer):
@property
def topic(self) -> Topics:
return Topics.GLUCOSE_LOGGED
@property
def group_id(self) -> str:
return "glucose-processor-group"
def on_message(self, data: dict, message) -> None:
# Deserialize back into a dataclass
from taphealth_kafka.events import GlucoseLoggedData
glucose = GlucoseLoggedData.from_dict(data)
print(f"User: {glucose.user_id}, Reading: {glucose.glucose_reading} mg/dL")
consumer = GlucoseLoggedConsumer(kafka_client)
consumer.consume() # Blocking call
Override consumer_config to customize consumer settings:
class LatestOffsetConsumer(KafkaConsumer):
@property
def consumer_config(self) -> dict:
return {
"auto_offset_reset": "latest",
"enable_auto_commit": False,
}
# ... implement topic, group_id, on_message ...
Serialization
All event dataclasses support bidirectional serialization between Python snake_case and Kafka camelCase:
for common TapHealth contracts, enum fields follow kafka/ numeric enum encoding.
from taphealth_kafka.events import GlucoseLoggedData, GlucoseType, GlucoseRange
# Serialize to camelCase dict
glucose_data = GlucoseLoggedData(
user_id="user-12345",
date="2026-01-15",
glucose_type=GlucoseType.FASTING,
glucose_reading=105.0,
glucose_range=GlucoseRange.NORMAL,
is_critical=False,
timezone="UTC",
)
data_dict = glucose_data.to_dict()
# {"userId": "user-12345", "date": "2026-01-15", "type": 0, "glucoseRange": 1, ...}
# Deserialize from camelCase dict
glucose = GlucoseLoggedData.from_dict(data_dict)
assert glucose.user_id == "user-12345"
For advanced use cases, the serialization utilities are available directly:
from taphealth_kafka import dataclass_to_dict, dataclass_from_dict
# Convert any dataclass to camelCase dict
data_dict = dataclass_to_dict(my_dataclass)
# Reconstruct from camelCase dict with full type awareness
obj = dataclass_from_dict(MyDataclass, data_dict)
Disconnecting
# Preferred: use the context manager (see "Connecting to Kafka")
# Manual: call disconnect() to flush pending messages and close consumers
kafka_client.disconnect()
Complete Example
Here's a complete example showing a producer and consumer for glucose logging:
"""
Glucose Logging Example
Usage:
# Terminal 1 - Start the consumer
python example.py consumer
# Terminal 2 - Send a message with the producer
python example.py producer
"""
import sys
from datetime import datetime
from taphealth_kafka import (
KafkaClient,
KafkaConsumer,
KafkaProducer,
Topics,
)
from taphealth_kafka.events import (
GlucoseLoggedData,
GlucoseRange,
GlucoseType,
)
class GlucoseLoggedProducer(KafkaProducer):
@property
def topic(self) -> Topics:
return Topics.GLUCOSE_LOGGED
class GlucoseLoggedConsumer(KafkaConsumer):
@property
def topic(self) -> Topics:
return Topics.GLUCOSE_LOGGED
@property
def group_id(self) -> str:
return "glucose-example-group"
def on_message(self, data: dict, message) -> None:
glucose = GlucoseLoggedData.from_dict(data)
print(f"Received: user={glucose.user_id}, reading={glucose.glucose_reading}")
def run_producer(client: KafkaClient) -> None:
producer = GlucoseLoggedProducer(client)
glucose_data = GlucoseLoggedData(
user_id="user-12345",
date=datetime.now().strftime("%Y-%m-%d"),
glucose_type=GlucoseType.FASTING,
glucose_reading=105.0,
glucose_range=GlucoseRange.NORMAL,
is_critical=False,
timezone="UTC",
)
producer.send(glucose_data.to_dict())
print("Event sent!")
def run_consumer(client: KafkaClient) -> None:
consumer = GlucoseLoggedConsumer(client)
print("Listening for events... (Ctrl+C to stop)")
consumer.consume()
def main() -> None:
with KafkaClient() as client:
client.connect(["localhost:9092"])
mode = sys.argv[1] if len(sys.argv) > 1 else "consumer"
if mode == "producer":
run_producer(client)
else:
run_consumer(client)
if __name__ == "__main__":
main()
API Reference
KafkaClient
The main client for managing Kafka connections. Supports the context manager protocol.
| Method/Property | Description |
|---|---|
connect(bootstrap_servers, *, auto_create_topics=True) |
Connect to Kafka brokers. Auto-creates all topics by default. |
disconnect() |
Disconnect and clean up resources |
create_consumer(group_id, **kwargs) |
Create a consumer with the given group ID |
create_topics(topics, *, num_partitions=1, replication_factor=1) |
Create topics if they don't exist |
producer |
Property to access the producer instance |
admin |
Property to access the admin client |
bootstrap_servers |
Property to access the list of bootstrap servers |
KafkaProducer (Abstract)
Base class for producers. Subclasses must implement:
| Property/Method | Description |
|---|---|
topic |
Abstract property returning the Topics enum value |
send(data) |
Send a message (data is JSON-serialized automatically) |
KafkaConsumer (Abstract)
Base class for consumers. Subclasses must implement:
| Property/Method | Description |
|---|---|
topic |
Abstract property returning the Topics enum value |
group_id |
Abstract property returning the consumer group ID |
on_message(data, message) |
Called for each received message |
consumer_config |
Override to customize consumer settings (default: auto_offset_reset="earliest", enable_auto_commit=True) |
consume() |
Start consuming messages (blocking) |
ensure_topics_exist() |
Ensures the topic exists before consuming |
Topics
Enum containing available Kafka topics:
| Topic | Value |
|---|---|
Topics.WEEKLY_PLAN_CREATED |
weekly-plan-created |
Topics.UPDATE_WEEKLY_NUTRITION_PLAN |
update-weekly-nutrition-plan |
Topics.UPDATE_WEEKLY_PLAN |
update-weekly-plan |
Topics.CONVERSATION_SUMMARY |
conversation-summary |
Topics.PROFILE_UPDATED |
profile-updated |
Topics.DAILY_PLAN_CREATED |
daily-plan-created |
Topics.DIET_CREATED |
diet-created |
Topics.EXERCISE_LOGGED |
exercise-logged |
Topics.GLUCOSE_LOGGED |
glucose-logged |
Topics.MEAL_LOGGED |
meal-logged |
Topics.METRIC_LOGGED |
metric-logged |
Topics.PLAN_UPDATE |
plan-update |
Topics.PLAN_UPDATED |
plan-updated |
Topics.VOICE_ANALYTICS |
voice-analytics |
Deprecated:
Topics.UPDATE_WEEEKLY_NUTRTION_PLANstill works but emits aDeprecationWarning. UseTopics.UPDATE_WEEKLY_NUTRITION_PLANinstead.
Use Topics.get_all_topics() to get all topic names as a list of strings.
Serialization Utilities
| Function | Description |
|---|---|
dataclass_to_dict(obj, key_overrides=None) |
Convert a dataclass to a camelCase dict. Recursively handles nested dataclasses, enums, and lists. Skips None fields. |
dataclass_from_dict(cls, data, key_overrides=None) |
Construct a dataclass from a camelCase dict. Recursively deserializes nested types. |
Event Dataclasses
The package provides typed dataclasses for structuring event messages. All dataclasses have to_dict() and from_dict() methods for bidirectional camelCase serialization:
| Event | Description |
|---|---|
ConversationSummaryData |
AI conversation summary |
VoiceCallAnalyticsData |
Voice call analytics |
GlucoseLoggedData |
Glucose reading log |
MetricLoggedData |
Health metric log |
ExerciseLoggedData |
Exercise log |
MealLoggedData |
Logged meal with feedback |
DietCreatedData |
Generated diet plan |
ProfileUpdatedData |
User profile update |
PlanUpdateData |
Plan update command (request to update a plan) |
PlanUpdatedData |
Plan updated event (confirms plan was updated) |
DailyPlanCreatedData |
Daily plan with nutrition, glucose, exercise, and education plans |
Supporting types and enums:
| Type | Values / Description |
|---|---|
GlucoseType |
FASTING, PRE_LUNCH, PRE_DINNER, POST_BREAKFAST, POST_LUNCH, POST_DINNER, RANDOM |
GlucoseRange |
LOW, NORMAL, HIGH, VERY_HIGH, VERY_LOW |
GlucoseSchedule |
Dataclass with fasting, post_lunch, bedtime, post_exercise fields |
MealType |
BREAKFAST, LUNCH, DINNER, SNACK |
MealVariation |
EASY, MODERATE, CHALLENGING |
MetricType |
Various health metrics (weight, blood pressure, HbA1c, etc.) |
WorkoutStatus |
COMPLETE, INCOMPLETE, INCOMPLETE_TOO_EASY, INCOMPLETE_TOO_HARD |
WorkoutDifficulty |
Workout difficulty levels |
DailyTaskType |
Types for daily plan tasks |
PlanUpdatedTrigger |
ROUTINE, LAB_REPORT, SYMPTOM — trigger type for plan update events |
HabitKey |
Habit tracking keys |
Configuration
When creating consumers, you can pass additional configuration options:
consumer = kafka_client.create_consumer(
group_id="my-group",
auto_offset_reset="earliest", # or "latest"
enable_auto_commit=True,
session_timeout_ms=6000,
max_poll_interval_ms=300000
)
Or override consumer_config in your KafkaConsumer subclass (see Consuming Messages).
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taphealth_kafka-0.2.4.tar.gz.
File metadata
- Download URL: taphealth_kafka-0.2.4.tar.gz
- Upload date:
- Size: 69.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d724f576586a923555e1a0a10b66824a829908717623d3df31b31ec21df6c97
|
|
| MD5 |
f9d58601c6ef37ec90c8144ca9944a9a
|
|
| BLAKE2b-256 |
007c70d71aebb7dd600a45171f2841d35a424959be1dcd576e9fa2f22fdd8e87
|
Provenance
The following attestation bundles were made for taphealth_kafka-0.2.4.tar.gz:
Publisher:
ci.yml on tap-health/kafka-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taphealth_kafka-0.2.4.tar.gz -
Subject digest:
5d724f576586a923555e1a0a10b66824a829908717623d3df31b31ec21df6c97 - Sigstore transparency entry: 1205265511
- Sigstore integration time:
-
Permalink:
tap-health/kafka-py@6ac664eb49d66eb185fdfd60443d444124e3c328 -
Branch / Tag:
refs/tags/v0.2.04 - Owner: https://github.com/tap-health
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@6ac664eb49d66eb185fdfd60443d444124e3c328 -
Trigger Event:
release
-
Statement type:
File details
Details for the file taphealth_kafka-0.2.4-py3-none-any.whl.
File metadata
- Download URL: taphealth_kafka-0.2.4-py3-none-any.whl
- Upload date:
- Size: 35.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ac3611018abe438f190e406872abda568b82c8fa5f3b4664c76a4185f5e58cdc
|
|
| MD5 |
9f881527c8602b1edcb477abe94b71b9
|
|
| BLAKE2b-256 |
2bee6f0cca8aac6bc87b75291ce9b974bcf3abdaf68e1165ceba22a77b0d0884
|
Provenance
The following attestation bundles were made for taphealth_kafka-0.2.4-py3-none-any.whl:
Publisher:
ci.yml on tap-health/kafka-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taphealth_kafka-0.2.4-py3-none-any.whl -
Subject digest:
ac3611018abe438f190e406872abda568b82c8fa5f3b4664c76a4185f5e58cdc - Sigstore transparency entry: 1205265513
- Sigstore integration time:
-
Permalink:
tap-health/kafka-py@6ac664eb49d66eb185fdfd60443d444124e3c328 -
Branch / Tag:
refs/tags/v0.2.04 - Owner: https://github.com/tap-health
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
self-hosted -
Publication workflow:
ci.yml@6ac664eb49d66eb185fdfd60443d444124e3c328 -
Trigger Event:
release
-
Statement type: