Skip to main content

No project description provided

Project description

Pulsar plugin for Streamz

This a plugin for Streamz that adds stream nodes for writing and reading data from/to Pulsar.

🛠 Installation

Latest stable version is available on PyPI

pip install streamz_pulsar

Latest development version can be installed from git repo

pip install git+https://github.com/MarekWadinger/streamz_pulsar

⚡️ Quickstart

To start working with streamz_pulsar, follow these 3 steps:

1. Run a standalone Pulsar cluster locally

docker run -it -p 6650:6650 -p 8000:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:latest bin/pulsar standalone

2. Create a consumer

The following example creates a consumer with the my-sub subscription name on the my-topic topic, receives incoming messages, prints the content and ID of messages that arrive, and acknowledges each message to the Pulsar broker.

import pulsar
from streamz import Stream

s = Stream.from_pulsar(
    ['my-topic'],
    subscription_name='my-sub',
    consumer_params={'service_url': 'pulsar://localhost:6650'}
    )

s.map(lambda x: x.decode())
L = s.sink_to_list()

s.start()
while True:
    try:
        if L:
            print(L.pop(0))
    except pulsar.Interrupted:
        print("Stop receiving messages")
        break

3. Create a producer

The following example creates a Python producer for the my-topic topic and sends 10 messages on that topic:

from streamz import Stream

source = Stream()
producer_ = source.to_pulsar(
    'my-topic',
    producer_config={'service_url': 'pulsar://localhost:6650'}
    )

for i in range(3):
    source.emit(('hello-pulsar-%d' % i).encode('utf-8'))

producer_.stop()
producer_.flush()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamz_pulsar-0.1.0.post2.tar.gz (3.6 kB view hashes)

Uploaded Source

Built Distribution

streamz_pulsar-0.1.0.post2-py3-none-any.whl (5.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page