No project description provided
Project description
Pulsar plugin for Streamz
This a plugin for Streamz that adds stream nodes for writing and reading data from/to Pulsar.
🛠 Installation
Latest stable version is available on PyPI
pip install streamz_pulsar
Latest development version can be installed from git repo
pip install git+https://github.com/MarekWadinger/streamz_pulsar
⚡️ Quickstart
To start working with streamz_pulsar, follow these 3 steps:
1. Run a standalone Pulsar cluster locally
docker run -it -p 6650:6650 -p 8000:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:latest bin/pulsar standalone
2. Create a consumer
The following example creates a consumer with the my-sub
subscription name on the my-topic
topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.
import pulsar
from streamz import Stream
s = Stream.from_pulsar(
['my-topic'],
subscription_name='my-sub',
consumer_params={'service_url': 'pulsar://localhost:6650'}
)
s.map(lambda x: x.decode())
L = s.sink_to_list()
s.start()
while True:
try:
if L:
print(L.pop(0))
except pulsar.Interrupted:
print("Stop receiving messages")
break
3. Create a producer
The following example creates a Python producer for the my-topic
topic and sends 10 messages on that topic:
from streamz import Stream
source = Stream()
producer_ = source.to_pulsar(
'my-topic',
producer_config={'service_url': 'pulsar://localhost:6650'}
)
for i in range(3):
source.emit(('hello-pulsar-%d' % i).encode('utf-8'))
producer_.stop()
producer_.flush()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for streamz_pulsar-0.1.0.post2.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5d76476575e98142fe86e1b492d2d618ed42f431f51bdf80d109523b090a9d17 |
|
MD5 | cc1fccf340f75ba63113216a349eaae3 |
|
BLAKE2b-256 | 3705306cb32f021c341fc83738ca3dcb65e841cf389ed5a10e8d0e036a15d434 |
Close
Hashes for streamz_pulsar-0.1.0.post2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4051121e01eda711d5a6e1c30f84c6a0e5d06d3d34f0aa9c84ff67a13637eba4 |
|
MD5 | 63bc6b1bd6d744e7f8c31e780b9a7575 |
|
BLAKE2b-256 | a6d9323365fd272b489e77d19b38e3045a2be47f0aec785f811f1081dd0097b3 |