Kafka utilities for TapHealth Python services
Project description
TapHealth Kafka
A Python library for working with Kafka in TapHealth services. This library provides abstractions over confluent-kafka to simplify producing and consuming Kafka messages.
Requirements
- Python 3.8+
- confluent-kafka >= 2.0.0
Installation
pip install taphealth-kafka
Usage
Connecting to Kafka
from taphealth_kafka import KafkaClient
# Initialize the client
kafka_client = KafkaClient()
# Connect to Kafka brokers
kafka_client.connect(["localhost:9092"])
Producing Messages
from taphealth_kafka import KafkaProducer, Topics
# Create a producer class
class WeeklyPlanProducer(KafkaProducer):
@property
def topic(self):
return Topics.WEEKLY_PLAN_CREATED
# Initialize the producer
producer = WeeklyPlanProducer(kafka_client)
# Send a message
producer.send({
"planId": "plan-123",
"userId": "user-456",
"createdAt": "2025-01-15T10:30:00Z"
})
Consuming Messages
from taphealth_kafka import KafkaConsumer, Topics
# Create a consumer class
class WeeklyPlanConsumer(KafkaConsumer):
@property
def topic(self):
return Topics.WEEKLY_PLAN_CREATED
@property
def group_id(self):
return "plan-processor-group"
def on_message(self, data, message):
# Process the message
print(f"Received plan: {data}")
# Implement your business logic here
# Initialize and start the consumer
consumer = WeeklyPlanConsumer(kafka_client)
consumer.consume() # This will start listening in a blocking manner
Disconnecting
# Clean up when done
kafka_client.disconnect()
API Reference
KafkaClient
The main client for managing Kafka connections.
| Method/Property | Description |
|---|---|
connect(bootstrap_servers: List[str]) |
Connect to Kafka brokers |
disconnect() |
Disconnect and clean up resources |
create_consumer(group_id, **kwargs) |
Create a consumer with the given group ID |
create_topics(topics: List[str]) |
Create topics if they don't exist |
producer |
Property to access the producer instance |
admin |
Property to access the admin client |
bootstrap_servers |
Property to access the list of bootstrap servers |
KafkaProducer (Abstract)
Base class for producers. Subclasses must implement:
| Property/Method | Description |
|---|---|
topic |
Abstract property returning the Topics enum value |
send(data) |
Send a message (data is JSON-serialized automatically) |
KafkaConsumer (Abstract)
Base class for consumers. Subclasses must implement:
| Property/Method | Description |
|---|---|
topic |
Abstract property returning the Topics enum value |
group_id |
Abstract property returning the consumer group ID |
on_message(data, message) |
Called for each received message |
consume() |
Start consuming messages (blocking) |
ensure_topics_exist() |
Ensures the topic exists before consuming |
Topics
Enum containing available Kafka topics:
| Topic | Value |
|---|---|
Topics.WEEKLY_PLAN_CREATED |
weekly-plan-created |
Topics.UPDATE_WEEEKLY_NUTRTION_PLAN |
update-weekly-nutrition-plan |
Topics.UPDATE_WEEKLY_PLAN |
update-weekly-plan |
Topics.CONVERSATION_SUMMARY |
conversation-summary |
Topics.PROFILE_UPDATED |
profile-updated |
Topics.DAILY_PLAN_CREATED |
daily-plan-created |
Topics.DIET_CREATED |
diet-created |
Topics.EXERCISE_LOGGED |
exercise-logged |
Topics.GLUCOSE_LOGGED |
glucose-logged |
Topics.MEAL_LOGGED |
meal-logged |
Topics.METRIC_LOGGED |
metric-logged |
Topics.VOICE_ANALYTICS |
voice-analytics |
Example Service Integration
import threading
import logging
from taphealth_kafka import KafkaClient, KafkaProducer, KafkaConsumer, Topics
logger = logging.getLogger(__name__)
# Initialize Kafka
kafka_client = KafkaClient()
kafka_client.connect(["kafka-broker:9092"])
# Producer
class WeeklyPlanProducer(KafkaProducer):
@property
def topic(self):
return Topics.WEEKLY_PLAN_CREATED
# Consumer
class WeeklyPlanConsumer(KafkaConsumer):
@property
def topic(self):
return Topics.WEEKLY_PLAN_CREATED
@property
def group_id(self):
return "plan-service"
def on_message(self, data, message):
logger.info(f"Processing plan: {data}")
# Start consumer in a separate thread
def start_consumer():
consumer = WeeklyPlanConsumer(kafka_client)
consumer.consume()
consumer_thread = threading.Thread(target=start_consumer)
consumer_thread.daemon = True
consumer_thread.start()
# Use producer in your API endpoints
producer = WeeklyPlanProducer(kafka_client)
def create_plan(plan_data):
producer.send(plan_data)
return {"status": "plan created"}
Configuration
The library uses confluent-kafka's configuration format internally. When creating consumers, you can pass additional configuration options:
consumer = kafka_client.create_consumer(
group_id="my-group",
auto_offset_reset="earliest", # or "latest"
enable_auto_commit=True,
session_timeout_ms=6000,
max_poll_interval_ms=300000
)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taphealth_kafka-0.1.6.tar.gz.
File metadata
- Download URL: taphealth_kafka-0.1.6.tar.gz
- Upload date:
- Size: 23.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
97a8eaceb406bd645beac0fa54b46f9c152a043ee7be032631ce06225b8f69b8
|
|
| MD5 |
3bf82a6e02e890c9abaf8e2c61da3b87
|
|
| BLAKE2b-256 |
197ac2566bf22c70b6ddc00e955af37bdf8430ada42cf5ffb424864f45fdba38
|
Provenance
The following attestation bundles were made for taphealth_kafka-0.1.6.tar.gz:
Publisher:
ci.yml on tap-health/kafka-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taphealth_kafka-0.1.6.tar.gz -
Subject digest:
97a8eaceb406bd645beac0fa54b46f9c152a043ee7be032631ce06225b8f69b8 - Sigstore transparency entry: 836399764
- Sigstore integration time:
-
Permalink:
tap-health/kafka-py@b7ab7bead4ad2df8e0fcfa1ccb8de20dcebeedf4 -
Branch / Tag:
refs/tags/v0.1.6 - Owner: https://github.com/tap-health
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@b7ab7bead4ad2df8e0fcfa1ccb8de20dcebeedf4 -
Trigger Event:
release
-
Statement type:
File details
Details for the file taphealth_kafka-0.1.6-py3-none-any.whl.
File metadata
- Download URL: taphealth_kafka-0.1.6-py3-none-any.whl
- Upload date:
- Size: 7.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c6dc934df46f46f510213d6c58a9794fc9819a264504914575e9fae705d6b6fb
|
|
| MD5 |
d04735984ff3d3db270fffc7f63d0e57
|
|
| BLAKE2b-256 |
9a7c386e870dae79531911a9b2b20d23c96f64ac4f602e072e15692a0ee2d93b
|
Provenance
The following attestation bundles were made for taphealth_kafka-0.1.6-py3-none-any.whl:
Publisher:
ci.yml on tap-health/kafka-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taphealth_kafka-0.1.6-py3-none-any.whl -
Subject digest:
c6dc934df46f46f510213d6c58a9794fc9819a264504914575e9fae705d6b6fb - Sigstore transparency entry: 836399765
- Sigstore integration time:
-
Permalink:
tap-health/kafka-py@b7ab7bead4ad2df8e0fcfa1ccb8de20dcebeedf4 -
Branch / Tag:
refs/tags/v0.1.6 - Owner: https://github.com/tap-health
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@b7ab7bead4ad2df8e0fcfa1ccb8de20dcebeedf4 -
Trigger Event:
release
-
Statement type: