easy to use kafka(kb)
Project description
Easy to use python kafka(for kb first)
As a message queue, Kafka has the following characteristics:
Stable, No data loss.
Save sent messages.
Allow larger data.
Allow slower speed.
Quick Start
Installation: pip install easy_kafka
1.config
Edit conf/conf.yml
kafka: # kafka config
bootstrap_servers: 127.0.0.1:9092 # kafka servers, multiple, 172.1.0.2:9092,172.1.0.1:9092
group_id: group # consumer group
topic_subscribe: # topic, multiple
- topic1
- topic2
topic_produce: topic1_result # producer default topic
2.demo-consumer
import json
from easy_kafka.kafka_consumer import EasyKafkaConsumer
def start_consumer():
kafka_consumer = EasyKafkaConsumer('conf/conf.yml')
print('consumer iterator started')
for record in kafka_consumer:
print('record', record.value)
print('json', json.loads(record.value))
if __name__ == "__main__":
start_consumer()
3.demo-consumer-callback
import json
from easy_kafka.kafka_consumer import EasyKafkaConsumer
from easy_kafka.kafka_producer import EasyKafkaProducer
kafka_producer = EasyKafkaProducer('conf/conf.yml')
def consumer_task(record):
"""
consumer callback
:param record: object
:return:
"""
print('consumer_task', record.value)
print('json', json.loads(record.value))
if record.topic is 'topic1':
kafka_producer.produce_msg({'type': 'task result'})
def start_consumer():
kafka_consumer = EasyKafkaConsumer('../conf/conf.yml')
print('consumer task started')
kafka_consumer.subscribe(fn=consumer_task)
if __name__ == "__main__":
start_consumer()
4.demo-producer
from easy_kafka.kafka_config import EasyKafkaConfig
from easy_kafka.kafka_producer import EasyKafkaProducer
def demo_produce_msg():
kafka_producer = EasyKafkaProducer('conf/conf.yml')
kafka_producer.produce_msg({'name': 'ksust'})
def demo_produce_msg_with_config():
config = EasyKafkaConfig('conf/conf.yml')
print('config', config.__dict__)
kafka_producer = EasyKafkaProducer(config)
kafka_producer.produce_msg({'name': 'ksust'})
kafka_producer.produce_msg_topic('topic1', {'name': 'ksust'})
if __name__ == "__main__":
demo_produce_msg()
demo_produce_msg_with_config()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
easy_kafka-0.0.4.tar.gz
(3.4 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file easy_kafka-0.0.4.tar.gz.
File metadata
- Download URL: easy_kafka-0.0.4.tar.gz
- Upload date:
- Size: 3.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.2.0.post20200210 requests-toolbelt/0.9.1 tqdm/4.42.1 CPython/3.7.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
36275bbc21c0d0d76be70de3e2abd4d007be5e77699c1cf11a066bb851d0f8f5
|
|
| MD5 |
7daaa478813b6556415488f485614f9c
|
|
| BLAKE2b-256 |
a23cc1366b52f23ed2ebe7e5e1c6b8254876a4e516e0c775edd7c84b18d96cca
|
File details
Details for the file easy_kafka-0.0.4-py3-none-any.whl.
File metadata
- Download URL: easy_kafka-0.0.4-py3-none-any.whl
- Upload date:
- Size: 4.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.2.0.post20200210 requests-toolbelt/0.9.1 tqdm/4.42.1 CPython/3.7.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
65ea552b797485ea4a9448f057ad501f61b4efab3d82c94be95e04c7bf4e9e1a
|
|
| MD5 |
a06988bfdac512e8924e4f9eac0442c2
|
|
| BLAKE2b-256 |
89c93539425592bb96df541dcfa56a3a7a33ad887680e51dc0bec8d99f8ccdd2
|