No project description provided
Project description
KAFKA-CLIENT-NEW
Примеры использования:
Через manager
import json
import logging
from kafka_client import (
DefaultBatchKafkaManager,
DefaultBatchProducer,
DefaultBatchConsumer,
)
from kafka_client.handler import EmptyHandler
logging.basicConfig(level=logging.INFO)
if __name__ == '__main__':
batch_size = 10
consumer_settings = {
'bootstrap_servers': '...',
'value_deserializer': lambda v: json.loads(v.decode('utf-8')),
'key_deserializer': lambda k: k.decode('utf-8'),
'client_id': '...',
'group_id': '...',
'enable_auto_commit': False,
'auto_offset_reset': 'earliest'
}
consumer = DefaultBatchConsumer(
topic='...',
batch_size=batch_size,
consumer_settings=consumer_settings
)
producer_settings = {
'bootstrap_servers': '...',
'key_serializer': str.encode,
'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
}
producer = DefaultBatchProducer(topic='...', producer_settings=producer_settings)
manager = DefaultBatchKafkaManager(consumer, producer)
manager.set_handler(handler=EmptyHandler())
manager.run()
Через функцию run
задаем переменные окружения
BATCH_SIZE
[optional] - размер батчаBOOTSTRAP_SERVERS
- брокеры кафкиINPUT_TOPIC
- входной топикOUTPUT_TOPIC
- выходной топикCLIENT_ID
[optional] - имя приложения, как то отсылается в кафкуGROUP_ID
- kafka consumer group
Запускаем python
:
import logging
from kafka_client import run
from kafka_client.handler import EmptyHandler
logging.basicConfig(level=logging.INFO)
if __name__ == '__main__':
run(handler=EmptyHandler())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for kafka_client_new-0.4.dev2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 488f0c9d9290199a45b061545d6522b8235328bf563a20e262fbe935fd94aeb0 |
|
MD5 | 05464c05b702822688848487fe8c03b5 |
|
BLAKE2b-256 | 4712e01647bbc5d82e8f6974521052c05b5107ff75f414917d83009626b220a3 |
Close
Hashes for kafka_client_new-0.4.dev2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | af3765eec46dc15d0fb73ad836b15324c09613f11acc9356e90985d4eac3aaa9 |
|
MD5 | b6e7eb7c9de912f1e259917aa256b6ec |
|
BLAKE2b-256 | 74fcc91d5fc6620b6eff49ba3b236f3ac2752767dda1de684475d9bcfcab81c0 |