Skip to main content

Kafka utilities for TapHealth Python services

Project description

TapHealth Kafka

A Python library for working with Kafka in TapHealth services. This library provides abstractions over confluent-kafka to simplify producing and consuming Kafka messages.

Requirements

  • Python 3.8+
  • confluent-kafka >= 2.0.0

Installation

pip install taphealth-kafka

Usage

Connecting to Kafka

from taphealth_kafka import KafkaClient

# Initialize the client
kafka_client = KafkaClient()

# Connect to Kafka brokers
kafka_client.connect(["localhost:9092"])

Producing Messages

from taphealth_kafka import KafkaProducer, Topics

# Create a producer class
class WeeklyPlanProducer(KafkaProducer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

# Initialize the producer
producer = WeeklyPlanProducer(kafka_client)

# Send a message
producer.send({
    "planId": "plan-123",
    "userId": "user-456",
    "createdAt": "2025-01-15T10:30:00Z"
})

Consuming Messages

from taphealth_kafka import KafkaConsumer, Topics

# Create a consumer class
class WeeklyPlanConsumer(KafkaConsumer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

    @property
    def group_id(self):
        return "plan-processor-group"

    def on_message(self, data, message):
        # Process the message
        print(f"Received plan: {data}")
        # Implement your business logic here

# Initialize and start the consumer
consumer = WeeklyPlanConsumer(kafka_client)
consumer.consume()  # This will start listening in a blocking manner

Disconnecting

# Clean up when done
kafka_client.disconnect()

API Reference

KafkaClient

The main client for managing Kafka connections.

Method/Property Description
connect(bootstrap_servers: List[str]) Connect to Kafka brokers
disconnect() Disconnect and clean up resources
create_consumer(group_id, **kwargs) Create a consumer with the given group ID
create_topics(topics: List[str]) Create topics if they don't exist
producer Property to access the producer instance
admin Property to access the admin client
bootstrap_servers Property to access the list of bootstrap servers

KafkaProducer (Abstract)

Base class for producers. Subclasses must implement:

Property/Method Description
topic Abstract property returning the Topics enum value
send(data) Send a message (data is JSON-serialized automatically)

KafkaConsumer (Abstract)

Base class for consumers. Subclasses must implement:

Property/Method Description
topic Abstract property returning the Topics enum value
group_id Abstract property returning the consumer group ID
on_message(data, message) Called for each received message
consume() Start consuming messages (blocking)
ensure_topics_exist() Ensures the topic exists before consuming

Topics

Enum containing available Kafka topics:

Topic Value
Topics.WEEKLY_PLAN_CREATED weekly-plan-created
Topics.UPDATE_WEEEKLY_NUTRTION_PLAN update-weekly-nutrition-plan
Topics.UPDATE_WEEKLY_PLAN update-weekly-plan
Topics.CONVERSATION_SUMMARY conversation-summary
Topics.PROFILE_UPDATED profile-updated
Topics.DAILY_PLAN_CREATED daily-plan-created
Topics.DIET_CREATED diet-created
Topics.EXERCISE_LOGGED exercise-logged
Topics.GLUCOSE_LOGGED glucose-logged
Topics.MEAL_LOGGED meal-logged
Topics.METRIC_LOGGED metric-logged
Topics.VOICE_ANALYTICS voice-analytics

Example Service Integration

import threading
import logging
from taphealth_kafka import KafkaClient, KafkaProducer, KafkaConsumer, Topics

logger = logging.getLogger(__name__)

# Initialize Kafka
kafka_client = KafkaClient()
kafka_client.connect(["kafka-broker:9092"])

# Producer
class WeeklyPlanProducer(KafkaProducer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

# Consumer
class WeeklyPlanConsumer(KafkaConsumer):
    @property
    def topic(self):
        return Topics.WEEKLY_PLAN_CREATED

    @property
    def group_id(self):
        return "plan-service"

    def on_message(self, data, message):
        logger.info(f"Processing plan: {data}")

# Start consumer in a separate thread
def start_consumer():
    consumer = WeeklyPlanConsumer(kafka_client)
    consumer.consume()

consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.daemon = True
consumer_thread.start()

# Use producer in your API endpoints
producer = WeeklyPlanProducer(kafka_client)

def create_plan(plan_data):
    producer.send(plan_data)
    return {"status": "plan created"}

Configuration

The library uses confluent-kafka's configuration format internally. When creating consumers, you can pass additional configuration options:

consumer = kafka_client.create_consumer(
    group_id="my-group",
    auto_offset_reset="earliest",      # or "latest"
    enable_auto_commit=True,
    session_timeout_ms=6000,
    max_poll_interval_ms=300000
)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taphealth_kafka-0.1.6.tar.gz (23.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taphealth_kafka-0.1.6-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file taphealth_kafka-0.1.6.tar.gz.

File metadata

  • Download URL: taphealth_kafka-0.1.6.tar.gz
  • Upload date:
  • Size: 23.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for taphealth_kafka-0.1.6.tar.gz
Algorithm Hash digest
SHA256 97a8eaceb406bd645beac0fa54b46f9c152a043ee7be032631ce06225b8f69b8
MD5 3bf82a6e02e890c9abaf8e2c61da3b87
BLAKE2b-256 197ac2566bf22c70b6ddc00e955af37bdf8430ada42cf5ffb424864f45fdba38

See more details on using hashes here.

Provenance

The following attestation bundles were made for taphealth_kafka-0.1.6.tar.gz:

Publisher: ci.yml on tap-health/kafka-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taphealth_kafka-0.1.6-py3-none-any.whl.

File metadata

File hashes

Hashes for taphealth_kafka-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 c6dc934df46f46f510213d6c58a9794fc9819a264504914575e9fae705d6b6fb
MD5 d04735984ff3d3db270fffc7f63d0e57
BLAKE2b-256 9a7c386e870dae79531911a9b2b20d23c96f64ac4f602e072e15692a0ee2d93b

See more details on using hashes here.

Provenance

The following attestation bundles were made for taphealth_kafka-0.1.6-py3-none-any.whl:

Publisher: ci.yml on tap-health/kafka-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page