Kafka Client with smarter protocol described, support for Gevent and tornado
Project description
a Kafka client which using easy described protocol tool, also be able to be used with Gevent and tornado.
WARNING: Under development, Now only support simple send method. Not support Python3.
USAGE
Install using pypi:
pip install kafkaka
Install from source:
git clone https://github.com/wesdu/kafkaka.git cd kafkaka python setup.py install
EXAMPLE
simple block mode:
# coding: utf8
from kafkaka.client import KafkaClient
import time
if __name__ == "__main__":
c = KafkaClient("tx-storm1:9092")
c.send_message('im-msg', 'hi', str(time.time()))
c.send_message('im-msg', u'你好'.encode('utf8'), str(time.time()))
print 'this will block'
using with Gevent:
# coding: utf8
from kafkaka.gevent_patch import KafkaClient
from gevent import spawn
from gevent import sleep
import time
if __name__ == "__main__":
c = KafkaClient("t-storm1:9092", topic_names=['im-msg'])
print ''
for i in xrange(50):
c.send_message('im-msg', u'你好'.encode('utf8'), str(time.time()), str(i))
c.send_message('im-msg', 'hi', str(time.time()), str(i))
print 'this will not block'
for i in xrange(50):
c.send_message('im-msg', u'你好'.encode('utf8'), str(time.time()), str(i))
c.send_message('im-msg', 'hi', str(time.time()), str(i))
sleep(0.1)
print 'but this will block'
sleep(30)
you can set the number of max parallel connections by using pool_size param:
# coding: utf8
from kafkaka.gevent_patch import KafkaClient
from gevent import joinall
import time
if __name__ == "__main__":
c = KafkaClient("t-storm1:9092",
topic_names=['im-msg'],
pool_size=10 # the number of max parallel connections.
)
start = time.time()
all = []
print ''
for i in xrange(50):
all.append(c.send_message('im-msg', u'你好'.encode('utf8'), str(time.time()), str(i)))
all.append(c.send_message('im-msg', 'hi', str(time.time()), str(i)))
print 'this will not block'
for i in xrange(50):
all.append(c.send_message('im-msg', u'你好'.encode('utf8'), str(time.time()), str(i)))
all.append(c.send_message('im-msg', 'hi', str(time.time()), str(i)))
joinall(all)
print 'but this will block'
print time.time() - start
using with tornado:
# coding: utf8
from kafkaka.tornado_patch import KafkaClient
import tornado.ioloop
import time
if __name__ == "__main__":
c = KafkaClient("t-storm1:9092", topic_names=['im-msg'])
start = time.time()
print ''
for i in xrange(500):
c.send_message('im-msg', u'你好'.encode('utf8'), str(time.time()), str(i))
c.send_message('im-msg', 'hi', str(time.time()), str(i))
for i in xrange(500):
c.send_message('im-msg', u'你好'.encode('utf8'), str(time.time()), str(i))
c.send_message('im-msg', 'hi', str(time.time()), str(i))
print time.time() - start
print 'this will not block'
tornado.ioloop.IOLoop.instance().start()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
kafkaka-0.5.0.tar.gz
(34.4 kB
view details)
File details
Details for the file kafkaka-0.5.0.tar.gz.
File metadata
- Download URL: kafkaka-0.5.0.tar.gz
- Upload date:
- Size: 34.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06a4cf0ba53178c89178499c33b6d254bd2473825ea0c4f2ee17d74eb34c9e8c
|
|
| MD5 |
c12515befa6dc08fb6f3b1276ac3df9e
|
|
| BLAKE2b-256 |
ecd9e0f77cf99bfdc019fff3f4aeafc5567b2395f61d456e88f018d4f7a9ebee
|