Skip to main content

ZeroMQ integration for streamz - high-performance streaming data processing

Project description

streamz-zmq

PyPI version GitHub release Python 3.10+

ZeroMQ integration for streamz - enabling high-performance streaming data processing with distributed messaging.

Features

  • ZMQ Source (from_zmq): Receive data streams from ZeroMQ publishers
  • ZMQ Sink (to_zmq): Send processed data to ZeroMQ sockets
  • Async Support: Built with asyncio for high-performance streaming
  • Multiple Patterns: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns
  • Seamless Integration: Extends streamz with familiar API patterns

Installation

pip install streamz-zmq

Or with uv:

uv add streamz-zmq

Quick Start

Receiving data from ZMQ (Source)

from streamz import Stream
import streamz_zmq  # Register the ZMQ extensions
import zmq

# Create a stream that receives from a ZMQ publisher (connect mode, default)
source = Stream.from_zmq("tcp://localhost:5555")
source.sink(print)  # Print received messages

# Start the stream
source.start()

# Or, act as a collector/server and accept connections from publishers:
collector = Stream.from_zmq("tcp://*:6000", sock_type=zmq.PULL, bind=True)
collector.sink(print)
collector.start()

Sending data to ZMQ (Sink)

from streamz import Stream
import streamz_zmq  # Register the ZMQ extensions

# Create a stream and send results to an existing ZMQ service (default: connect mode)
source = Stream.from_iterable([1, 2, 3, 4, 5])
source.map(lambda x: x * 2).to_zmq("tcp://localhost:5556")

# Or, act as a service and accept connections from ZMQ clients (bind mode)
source.map(...).to_zmq("tcp://*:5556", bind=True)

# Start the stream
source.start()

Complete Example: Pipeline with ZMQ

import asyncio
from streamz import Stream
import streamz_zmq

async def main():
    # Receive from one ZMQ socket, process, send to another
    source = Stream.from_zmq("tcp://localhost:5555")

    processed = (source
                .map(lambda x: x.decode('utf-8'))  # Decode bytes
                .map(str.upper)                    # Process data
                .map(str.encode))                  # Encode back to bytes

    processed.to_zmq("tcp://*:5556")

    # Start processing
    await source.start()

if __name__ == "__main__":
    asyncio.run(main())

Examples

Check out the examples/ directory for demonstrations:

  • simple_example.py: Basic example showing ZMQ publisher thread + streamz subscriber
  • comprehensive_example.py: Advanced demonstration showing multiple ZMQ patterns:
    • PUB/SUB: Publisher broadcasts weather updates to topic-specific subscribers
    • PUSH/PULL: Load balancing work distribution across multiple workers
    • Pipeline: Multi-stage data processing pipeline

Run the simple example:

uv run python examples/simple_example.py

Run the comprehensive example:

uv run python examples/comprehensive_example.py

API Reference

Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b"", bind=False)

Creates a stream source that receives messages from a ZMQ socket.

Parameters:

  • connect_str (str): ZMQ connection string (e.g., "tcp://localhost:5555" for connect, or "tcp://*:5555" for bind)
  • sock_type (int, optional): ZMQ socket type. Defaults to zmq.SUB
  • subscribe (bytes, optional): Subscription topic for SUB sockets. Defaults to b"" (all messages)
  • bind (bool, optional): If True, bind the socket (act as a server/collector). If False (default), connect to the address.

stream.to_zmq(connect_str, sock_type=zmq.PUSH, bind=False)

Sends stream elements to a ZMQ socket.

Parameters:

  • connect_str (str): ZMQ connection string (e.g., "tcp://*:5556" for bind, or "tcp://localhost:5556" for connect)
  • sock_type (int, optional): ZMQ socket type. Defaults to zmq.PUSH
  • bind (bool, optional): If True, bind the socket (act as a service). If False (default), connect to the address.

ZMQ Patterns Supported

  • PUB/SUB: Publisher broadcasts to multiple subscribers
  • PUSH/PULL: Load balancing across workers
  • REQ/REP: Request-response (less common for streaming)

Requirements

  • Python 3.8+
  • streamz >= 0.6.4
  • pyzmq >= 27.0.0

Development

# Clone the repository
git clone https://github.com/izzet/streamz-zmq.git
cd streamz-zmq

# Install with uv (uses uv.lock for reproducible builds)
uv sync --dev

# Set up pre-commit hooks (recommended)
uv run pre-commit install

# Run tests
uv run pytest

# Format code
uv run ruff format .

# Check linting
uv run ruff check .

# Build package
uv build

Note: This project uses uv.lock for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.

Pre-commit hooks: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.

License

MIT License. See LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamz_zmq-0.1.4.tar.gz (49.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamz_zmq-0.1.4-py3-none-any.whl (7.6 kB view details)

Uploaded Python 3

File details

Details for the file streamz_zmq-0.1.4.tar.gz.

File metadata

  • Download URL: streamz_zmq-0.1.4.tar.gz
  • Upload date:
  • Size: 49.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for streamz_zmq-0.1.4.tar.gz
Algorithm Hash digest
SHA256 915f239c21aa411f85a0f5faebb6af0c71c6c24e17d6fe063e5009f67a7439e5
MD5 b4c8383d02ba8d3a18bbf56f8e68c018
BLAKE2b-256 2838be99d5917ae7fe87afe41f82c8342ac60e8ae02c794cfcf386e35ad5bed9

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamz_zmq-0.1.4.tar.gz:

Publisher: release.yml on izzet/streamz-zmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file streamz_zmq-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: streamz_zmq-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 7.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for streamz_zmq-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 86d5bd89ea8f5c0fd42e810c95c9f944ac93414b79b5466d4b4ad1f3ad983748
MD5 3e90d8c2aff4a73f139138ae0dd87355
BLAKE2b-256 5ecebfb0a9ab303286ce9dcc106508ee52f964c8769f8375204313e89d75593c

See more details on using hashes here.

Provenance

The following attestation bundles were made for streamz_zmq-0.1.4-py3-none-any.whl:

Publisher: release.yml on izzet/streamz-zmq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page