No project description provided
Project description
Pulsar plugin for Streamz
This a plugin for Streamz that adds stream nodes for writing and reading data from/to Pulsar.
🛠 Installation
Latest stable version is available on PyPI
pip install streamz_pulsar
Latest development version can be installed from git repo
pip install git+https://github.com/MarekWadinger/streamz_pulsar
⚡️ Quickstart
To start working with streamz_pulsar, follow these 3 steps:
1. Run a standalone Pulsar cluster locally
docker run -it -p 6650:6650 -p 8000:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:latest bin/pulsar standalone
2. Create a consumer
The following example creates a consumer with the my-sub
subscription name on the my-topic
topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.
import pulsar
from streamz import Stream
s = Stream.from_pulsar(
'pulsar://localhost:6650',
['my-topic'],
subscription_name='my-sub'
)
s.map(lambda x: x.decode())
L = s.sink_to_list()
s.start()
while True:
try:
if L:
print(L.pop(0))
except pulsar.Interrupted:
print("Stop receiving messages")
break
3. Create a producer
The following example creates a Python producer for the my-topic
topic and sends 10 messages on that topic:
from streamz import Stream
source = Stream()
producer_ = source.to_pulsar(
'pulsar://localhost:6650',
'my-topic',
)
for i in range(3):
source.emit(('hello-pulsar-%d' % i).encode('utf-8'))
producer_.stop()
producer_.flush()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
streamz_pulsar-0.1.1.tar.gz
(3.6 kB
view hashes)
Built Distribution
Close
Hashes for streamz_pulsar-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e39c96d875332d6eabd24c4cdafb06b39ddad9244e2c255372c0e043a69ccee0 |
|
MD5 | 8d3bdfccf0f9d35b9baeca6ef2d801d2 |
|
BLAKE2b-256 | 9630aaf4eadaf22890084640b5ab90d568f6b889a89b7938e5aa3ebb5998bf9f |