Veli Kafka Client
Project description
Kafka Client for VELI.STORE
Description
This module is used for producing kafka events, subscribing to kafka topics and consuming kafka events.
To remember:
Each kafka topic event has a predefined structure which you can see in velikafkaclient/eventregistration, producer
and consumer both have built in validation, meaning that if incorrect object structure is sent to a topic then producer
will raise an exception as well as consumer.
How to use:
- Setup Producer In a separate file (for example:
kafka.py)
from velikafkaclient.producer import AsyncKafkaEventProducer
producer: AsyncKafkaEventProducer = None
async def init_producer(bootstrap_servers):
global producer
producer = AsyncKafkaEventProducer(bootstrap_servers)
return producer
def get_client() -> AsyncKafkaEventProducer:
global producer
return producer
- Initialize Producer:
from kafka import init_producer
producer = await init_producer(bootstrap_servers)
await producer.start()
- Produce events to a topic:
from kafka import get_client
from events.base import KafkaEvent
from topics.base import BaseTopic
kafka_event = KafkaEvent()
get_client().produce_event(BaseTopic, kafka_event)
- Setup and start consumer
from velikafkaclient.consumer import AsyncKafkaConsumer
from config import BOOTSTRAP_SERVER, GROUP_ID
from velikafkaclient.decorators import tracing
from velikafkaclient.events.base import KafkaEvent
from velikafkaclient.topics.base import BaseTopic
class KafkaClientGracefulKiller:
def __init__(self, client):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
self.client = client
def exit_gracefully(self, signum, frame):
print("Gracefully killing kafka client <3")
asyncio.create_task(self.client.stop())
@tracing
async def base_handler(kafka_event: KafkaEvent):
print(str(kafka_event))
bootstrap_servers = BOOTSTRAP_SERVER
consumer = AsyncKafkaConsumer(bootstrap_servers, group_id=GROUP_ID)
KafkaClientGracefulKiller(consumer)
consumer.subscribe(BaseTopic, base_handler)
await consumer.start()
await consumer.consume()
How to add topics:
To add a new topic you need three things:
- Topic name
- Topic event structure
- Topic event structure registration
All changes are done on
velikafkaclientlibrary level (in the repoveli_libs/kafka-client)
- To add a topic go to
kafka-client/velikafkaclient/topicsand add topic like this:
from enum import Enum
class KafkaTopic(str, Enum):
USER_REGISTRATIONS = 'user_registrations'
- To add an event structure go to
kafka-client/velikafkaclient/eventsand create pydantic model for event structure:
class UserRegistrationEvent(KafkaEvent):
id: int
username: str
password: str
name: str
surname: str
- To register event structure to a topic go to
kafka-client/velikafkaclient/eventregistration.pyand add:
kafka_topic_events.register_topic_event_model(KafkaTopic.USER_REGISTRATIONS, UserRegistrationEvent)
Once done follow the instructions in the veli_libs readme to update the library on pyip
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file velikafkaclient-1.0.3.tar.gz.
File metadata
- Download URL: velikafkaclient-1.0.3.tar.gz
- Upload date:
- Size: 6.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
110f06fa291e8e6cb9d932de1dd50c099a3a82d9bd28b968ec2cdb881b4d5c3e
|
|
| MD5 |
6dde118cb2d0fcdab91d746a5e4af7dd
|
|
| BLAKE2b-256 |
71fc03f715ad225ba3afb7c6adcfd2432756623bc101e2d8e8384e80cfcbe276
|
File details
Details for the file velikafkaclient-1.0.3-py3-none-any.whl.
File metadata
- Download URL: velikafkaclient-1.0.3-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.9.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3d4932a1b5ae97250ac74ff0b1bdc619923974726c6d861bb4ed039c537f4d41
|
|
| MD5 |
9e5884f78165641e4fa4bf3a18d0c4bf
|
|
| BLAKE2b-256 |
f7ec7e52dd49dd11de24b243a6356caaff0fbb5cdce29af7979afb42f483411e
|