Skip to main content

Kafka utilities for TapHealth Python services

Project description

TapHealth Kafka

A Python library for working with Kafka in TapHealth services. This library provides abstractions over confluent-kafka to simplify producing and consuming Kafka messages.

Requirements

  • Python 3.8+
  • confluent-kafka >= 2.0.0

Installation

pip install taphealth-kafka

Usage

This package is designed to be installed and used across TapHealth services. The pattern is:

  1. Install the package in your project
  2. Extend the abstract classes (KafkaProducer/KafkaConsumer) and implement the required abstract methods
  3. Use the provided event dataclasses to structure your messages

Connecting to Kafka

from taphealth_kafka import KafkaClient

# Using context manager (recommended — auto-disconnects)
with KafkaClient() as client:
    client.connect(["localhost:9092"])
    # ... use client ...

# Or manually manage the lifecycle
client = KafkaClient()
client.connect(["localhost:9092"])
# ... use client ...
client.disconnect()

# Disable automatic topic creation on connect
client.connect(["localhost:9092"], auto_create_topics=False)

Producing Messages

Extend KafkaProducer and implement the topic property:

from taphealth_kafka import KafkaProducer, Topics

class GlucoseLoggedProducer(KafkaProducer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED

producer = GlucoseLoggedProducer(kafka_client)

Use event dataclasses to structure your messages:

from datetime import datetime
from taphealth_kafka.events import GlucoseLoggedData, GlucoseType, GlucoseRange

glucose_data = GlucoseLoggedData(
    user_id="user-12345",
    date=datetime.now().strftime("%Y-%m-%d"),
    glucose_type=GlucoseType.FASTING,
    glucose_reading=105.0,
    glucose_range=GlucoseRange.NORMAL,
    is_critical=False,
    timezone="UTC",
)

# Send the dataclass directly — auto-serialized to camelCase JSON
producer.send(glucose_data.to_dict())

Consuming Messages

Extend KafkaConsumer and implement the required abstract methods:

from taphealth_kafka import KafkaConsumer, Topics

class GlucoseLoggedConsumer(KafkaConsumer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED

    @property
    def group_id(self) -> str:
        return "glucose-processor-group"

    def on_message(self, data: dict, message) -> None:
        # Deserialize back into a dataclass
        from taphealth_kafka.events import GlucoseLoggedData
        glucose = GlucoseLoggedData.from_dict(data)
        print(f"User: {glucose.user_id}, Reading: {glucose.glucose_reading} mg/dL")

consumer = GlucoseLoggedConsumer(kafka_client)
consumer.consume()  # Blocking call

Override consumer_config to customize consumer settings:

class LatestOffsetConsumer(KafkaConsumer):
    @property
    def consumer_config(self) -> dict:
        return {
            "auto_offset_reset": "latest",
            "enable_auto_commit": False,
        }

    # ... implement topic, group_id, on_message ...

Serialization

All event dataclasses support bidirectional serialization between Python snake_case and Kafka camelCase: for common TapHealth contracts, enum fields follow kafka/ numeric enum encoding.

from taphealth_kafka.events import GlucoseLoggedData, GlucoseType, GlucoseRange

# Serialize to camelCase dict
glucose_data = GlucoseLoggedData(
    user_id="user-12345",
    date="2026-01-15",
    glucose_type=GlucoseType.FASTING,
    glucose_reading=105.0,
    glucose_range=GlucoseRange.NORMAL,
    is_critical=False,
    timezone="UTC",
)
data_dict = glucose_data.to_dict()
# {"userId": "user-12345", "date": "2026-01-15", "type": 0, "glucoseRange": 1, ...}

# Deserialize from camelCase dict
glucose = GlucoseLoggedData.from_dict(data_dict)
assert glucose.user_id == "user-12345"

For advanced use cases, the serialization utilities are available directly:

from taphealth_kafka import dataclass_to_dict, dataclass_from_dict

# Convert any dataclass to camelCase dict
data_dict = dataclass_to_dict(my_dataclass)

# Reconstruct from camelCase dict with full type awareness
obj = dataclass_from_dict(MyDataclass, data_dict)

Disconnecting

# Preferred: use the context manager (see "Connecting to Kafka")
# Manual: call disconnect() to flush pending messages and close consumers
kafka_client.disconnect()

Complete Example

Here's a complete example showing a producer and consumer for glucose logging:

"""
Glucose Logging Example

Usage:
    # Terminal 1 - Start the consumer
    python example.py consumer

    # Terminal 2 - Send a message with the producer
    python example.py producer
"""

import sys
from datetime import datetime

from taphealth_kafka import (
    KafkaClient,
    KafkaConsumer,
    KafkaProducer,
    Topics,
)
from taphealth_kafka.events import (
    GlucoseLoggedData,
    GlucoseRange,
    GlucoseType,
)


class GlucoseLoggedProducer(KafkaProducer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED


class GlucoseLoggedConsumer(KafkaConsumer):
    @property
    def topic(self) -> Topics:
        return Topics.GLUCOSE_LOGGED

    @property
    def group_id(self) -> str:
        return "glucose-example-group"

    def on_message(self, data: dict, message) -> None:
        glucose = GlucoseLoggedData.from_dict(data)
        print(f"Received: user={glucose.user_id}, reading={glucose.glucose_reading}")


def run_producer(client: KafkaClient) -> None:
    producer = GlucoseLoggedProducer(client)

    glucose_data = GlucoseLoggedData(
        user_id="user-12345",
        date=datetime.now().strftime("%Y-%m-%d"),
        glucose_type=GlucoseType.FASTING,
        glucose_reading=105.0,
        glucose_range=GlucoseRange.NORMAL,
        is_critical=False,
        timezone="UTC",
    )

    producer.send(glucose_data.to_dict())
    print("Event sent!")


def run_consumer(client: KafkaClient) -> None:
    consumer = GlucoseLoggedConsumer(client)
    print("Listening for events... (Ctrl+C to stop)")
    consumer.consume()


def main() -> None:
    with KafkaClient() as client:
        client.connect(["localhost:9092"])

        mode = sys.argv[1] if len(sys.argv) > 1 else "consumer"

        if mode == "producer":
            run_producer(client)
        else:
            run_consumer(client)


if __name__ == "__main__":
    main()

API Reference

KafkaClient

The main client for managing Kafka connections. Supports the context manager protocol.

Method/Property Description
connect(bootstrap_servers, *, auto_create_topics=True) Connect to Kafka brokers. Auto-creates all topics by default.
disconnect() Disconnect and clean up resources
create_consumer(group_id, **kwargs) Create a consumer with the given group ID
create_topics(topics, *, num_partitions=1, replication_factor=1) Create topics if they don't exist
producer Property to access the producer instance
admin Property to access the admin client
bootstrap_servers Property to access the list of bootstrap servers

KafkaProducer (Abstract)

Base class for producers. Subclasses must implement:

Property/Method Description
topic Abstract property returning the Topics enum value
send(data) Send a message (data is JSON-serialized automatically)

KafkaConsumer (Abstract)

Base class for consumers. Subclasses must implement:

Property/Method Description
topic Abstract property returning the Topics enum value
group_id Abstract property returning the consumer group ID
on_message(data, message) Called for each received message
consumer_config Override to customize consumer settings (default: auto_offset_reset="earliest", enable_auto_commit=True)
consume() Start consuming messages (blocking)
ensure_topics_exist() Ensures the topic exists before consuming

Topics

Enum containing available Kafka topics:

Topic Value
Topics.WEEKLY_PLAN_CREATED weekly-plan-created
Topics.UPDATE_WEEKLY_NUTRITION_PLAN update-weekly-nutrition-plan
Topics.UPDATE_WEEKLY_PLAN update-weekly-plan
Topics.CONVERSATION_SUMMARY conversation-summary
Topics.PROFILE_UPDATED profile-updated
Topics.DAILY_PLAN_CREATED daily-plan-created
Topics.DIET_CREATED diet-created
Topics.EXERCISE_LOGGED exercise-logged
Topics.GLUCOSE_LOGGED glucose-logged
Topics.MEAL_LOGGED meal-logged
Topics.METRIC_LOGGED metric-logged
Topics.PLAN_UPDATE plan-update
Topics.PLAN_UPDATED plan-updated
Topics.VOICE_ANALYTICS voice-analytics

Deprecated: Topics.UPDATE_WEEEKLY_NUTRTION_PLAN still works but emits a DeprecationWarning. Use Topics.UPDATE_WEEKLY_NUTRITION_PLAN instead.

Use Topics.get_all_topics() to get all topic names as a list of strings.

Serialization Utilities

Function Description
dataclass_to_dict(obj, key_overrides=None) Convert a dataclass to a camelCase dict. Recursively handles nested dataclasses, enums, and lists. Skips None fields.
dataclass_from_dict(cls, data, key_overrides=None) Construct a dataclass from a camelCase dict. Recursively deserializes nested types.

Event Dataclasses

The package provides typed dataclasses for structuring event messages. All dataclasses have to_dict() and from_dict() methods for bidirectional camelCase serialization:

Event Description
ConversationSummaryData AI conversation summary
VoiceCallAnalyticsData Voice call analytics
GlucoseLoggedData Glucose reading log
MetricLoggedData Health metric log
ExerciseLoggedData Exercise log
MealLoggedData Logged meal with feedback
DietCreatedData Generated diet plan
ProfileUpdatedData User profile update
PlanUpdateData Plan update command (request to update a plan)
PlanUpdatedData Plan updated event (confirms plan was updated)
DailyPlanCreatedData Daily plan with nutrition, glucose, exercise, and education plans

Supporting types and enums:

Type Values / Description
GlucoseType FASTING, PRE_LUNCH, PRE_DINNER, POST_BREAKFAST, POST_LUNCH, POST_DINNER, RANDOM
GlucoseRange LOW, NORMAL, HIGH, VERY_HIGH, VERY_LOW
GlucoseSchedule Dataclass with fasting, post_lunch, bedtime, post_exercise fields
MealType BREAKFAST, LUNCH, DINNER, SNACK
MealVariation EASY, MODERATE, CHALLENGING
MetricType Various health metrics (weight, blood pressure, HbA1c, etc.)
WorkoutStatus COMPLETE, INCOMPLETE, INCOMPLETE_TOO_EASY, INCOMPLETE_TOO_HARD
WorkoutDifficulty Workout difficulty levels
DailyTaskType Types for daily plan tasks
PlanUpdatedTrigger ROUTINE, LAB_REPORT, SYMPTOM — trigger type for plan update events
HabitKey Habit tracking keys

Configuration

When creating consumers, you can pass additional configuration options:

consumer = kafka_client.create_consumer(
    group_id="my-group",
    auto_offset_reset="earliest",      # or "latest"
    enable_auto_commit=True,
    session_timeout_ms=6000,
    max_poll_interval_ms=300000
)

Or override consumer_config in your KafkaConsumer subclass (see Consuming Messages).

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taphealth_kafka-0.2.3.tar.gz (67.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taphealth_kafka-0.2.3-py3-none-any.whl (35.3 kB view details)

Uploaded Python 3

File details

Details for the file taphealth_kafka-0.2.3.tar.gz.

File metadata

  • Download URL: taphealth_kafka-0.2.3.tar.gz
  • Upload date:
  • Size: 67.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for taphealth_kafka-0.2.3.tar.gz
Algorithm Hash digest
SHA256 eb952df598eb92ea0dd8d5952a203b24ba6708f5c63a66690b682abe4615f920
MD5 4490708f35886a95fbf9a0881d6a2f16
BLAKE2b-256 6412545b18b0c1a0d381ef9cb83ac77815f775ae4186f05a794bc554cc918dc5

See more details on using hashes here.

Provenance

The following attestation bundles were made for taphealth_kafka-0.2.3.tar.gz:

Publisher: ci.yml on tap-health/kafka-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taphealth_kafka-0.2.3-py3-none-any.whl.

File metadata

File hashes

Hashes for taphealth_kafka-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 13fd6f5f6641be3e0a71e551bd7829f8f1000b369d2a1712be8172f47be98400
MD5 9f936b70da400d9c15e32cf95e9afb68
BLAKE2b-256 32360377c4ea825e3e6841c38f7ece6609f979d870fb32aa11dadc56977c6c8a

See more details on using hashes here.

Provenance

The following attestation bundles were made for taphealth_kafka-0.2.3-py3-none-any.whl:

Publisher: ci.yml on tap-health/kafka-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page