Veli Kafka Client
Project description
Kafka Client for VELI.STORE
Description
This module is used for producing kafka events, subscribing to kafka topics and consuming kafka events.
To remember:
Each kafka topic event has a predefined structure which you can see in velikafkaclient/eventregistration, producer
and consumer both have built in validation, meaning that if incorrect object structure is sent to a topic then producer
will raise an exception as well as consumer.
How to use Producer/Consumer:
- Initialize kafka producer:
BOOTSTRAP_SERVERS = "kafka bootstrap servers uri"
kafka_producer: AsyncKafkaEventProducer = AsyncKafkaEventProducer(BOOTSTRAP_SERVERS)
await kafka_producer.start()
- Produce events to a topic:
from events.base import KafkaEvent
from topics.base import BaseTopic
kafka_event = KafkaEvent()
await kafka_producer.produce_event(BaseTopic, kafka_event)
- Setup and start consumer
async def base_handler(kafka_event: KafkaEvent):
print(str(kafka_event))
bootstrap_servers = BOOTSTRAP_SERVER
consumer = AsyncKafkaConsumer(bootstrap_servers, group_id=GROUP_ID)
consumer.subscribe(BaseTopic, base_handler)
await consumer.start()
await consumer.consume()
How to add topics:
To add a new topic you need three things:
- Topic itself
- Topic event structure
- Register Event structure to a Topic
All changes are done in velikafkaclient library (in the veli_libs repo)
- To add a topic go to
kafka-client/velikafkaclient/topicsand add topic like this:
class SomeTopicCollection(KafkaTopic):
USER_REGISTRATIONS = 'user_registrations'
- To add an event structure go to
kafka-client/velikafkaclient/eventsand create pydantic model for event structure:
class UserRegistrationEvent(KafkaEvent):
id: int
username: str
password: str
username: str
- To register event structure to a topic go to
kafka-client/velikafkaclient/eventregistration.pyand add:
kafka_topic_events.register_topic_event_model(KafkaTopic.USER_REGISTRATIONS, UserRegistrationEvent)
Once done follow the instructions in the veli_libs readme to update the library on pyip
How to manage topics with TopicManager:
from velikafkaclient.topicmanager import KafkaTopicManager
client = KafkaTopicManager(BOOTSTRAP_SERVERS)
# Create Topic
client.create_topic('topic_name')
# Create Multiple Topics
topics = ['topic_name_1', 'topic_name_2']
client.create_topics(topics)
# List Topics
print(client.list_topics())
# Delete Topic
client.delete_topic('topic_name')
# Delete Multiple Topics
topics = ['topic_name_1', 'topic_name_2']
client.delete_topics(topics)
# Check if topic exists
print(client.topic_exists('topic_name'))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file velikafkaclient-1.1.0.tar.gz.
File metadata
- Download URL: velikafkaclient-1.1.0.tar.gz
- Upload date:
- Size: 6.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a085aeb085f0c52409a3bad6de7a1a3e6cbbd1a14bb875a40a6cf12696c55be2
|
|
| MD5 |
41dcb106a6c00508f6be8eace558aebd
|
|
| BLAKE2b-256 |
d9a267116247ad279a9764c89dc38c4cbcd28c6612f2ed6c3a8d24cc0521e7ab
|
File details
Details for the file velikafkaclient-1.1.0-py3-none-any.whl.
File metadata
- Download URL: velikafkaclient-1.1.0-py3-none-any.whl
- Upload date:
- Size: 8.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3008b717ee583f5687075d1ca6dd9d70df823ee2758bd3a6c5bec006ce0db43b
|
|
| MD5 |
efae3ed227674c0e266ac9df4e30b462
|
|
| BLAKE2b-256 |
ccc21f0b70fc996fe9bc4d48a7e82deae57c006e826ba0829ed3cbcd7cefd319
|