ZeroMQ integration for streamz - high-performance streaming data processing
Project description
streamz-zmq
ZeroMQ integration for streamz - enabling high-performance streaming data processing with distributed messaging.
Features
- ZMQ Source (
from_zmq): Receive data streams from ZeroMQ publishers - ZMQ Sink (
to_zmq): Send processed data to ZeroMQ sockets - Async Support: Built with asyncio for high-performance streaming
- Multiple Patterns: Support for PUB/SUB, PUSH/PULL, and other ZMQ patterns
- Seamless Integration: Extends streamz with familiar API patterns
Installation
pip install streamz-zmq
Or with uv:
uv add streamz-zmq
Quick Start
Receiving data from ZMQ (Source)
from streamz import Stream
import streamz_zmq # Register the ZMQ extensions
# Create a stream that receives from a ZMQ publisher
source = Stream.from_zmq("tcp://localhost:5555")
source.sink(print) # Print received messages
# Start the stream
source.start()
Sending data to ZMQ (Sink)
from streamz import Stream
import streamz_zmq # Register the ZMQ extensions
# Create a stream and send results to ZMQ
source = Stream.from_iterable([1, 2, 3, 4, 5])
source.map(lambda x: x * 2).to_zmq("tcp://*:5556")
# Start the stream
source.start()
Complete Example: Pipeline with ZMQ
import asyncio
from streamz import Stream
import streamz_zmq
async def main():
# Receive from one ZMQ socket, process, send to another
source = Stream.from_zmq("tcp://localhost:5555")
processed = (source
.map(lambda x: x.decode('utf-8')) # Decode bytes
.map(str.upper) # Process data
.map(str.encode)) # Encode back to bytes
processed.to_zmq("tcp://*:5556")
# Start processing
await source.start()
if __name__ == "__main__":
asyncio.run(main())
Examples
Check out the examples/ directory for demonstrations:
simple_example.py: Basic example showing ZMQ publisher thread + streamz subscribercomprehensive_example.py: Advanced demonstration showing multiple ZMQ patterns:- PUB/SUB: Publisher broadcasts weather updates to topic-specific subscribers
- PUSH/PULL: Load balancing work distribution across multiple workers
- Pipeline: Multi-stage data processing pipeline
Run the simple example:
uv run python examples/simple_example.py
Run the comprehensive example:
uv run python examples/comprehensive_example.py
API Reference
Stream.from_zmq(connect_str, sock_type=zmq.SUB, subscribe=b"")
Creates a stream source that receives messages from a ZMQ socket.
Parameters:
connect_str(str): ZMQ connection string (e.g., "tcp://localhost:5555")sock_type(int, optional): ZMQ socket type. Defaults tozmq.SUBsubscribe(bytes, optional): Subscription topic for SUB sockets. Defaults tob""(all messages)
stream.to_zmq(connect_str, sock_type=zmq.PUSH)
Sends stream elements to a ZMQ socket.
Parameters:
connect_str(str): ZMQ connection string (e.g., "tcp://*:5556")sock_type(int, optional): ZMQ socket type. Defaults tozmq.PUSH
ZMQ Patterns Supported
- PUB/SUB: Publisher broadcasts to multiple subscribers
- PUSH/PULL: Load balancing across workers
- REQ/REP: Request-response (less common for streaming)
Requirements
- Python 3.10+
- streamz >= 0.6.4
- pyzmq >= 27.0.0
Development
# Clone the repository
git clone https://github.com/izzet/streamz-zmq.git
cd streamz-zmq
# Install with uv (uses uv.lock for reproducible builds)
uv sync --dev
# Set up pre-commit hooks (recommended)
uv run pre-commit install
# Run tests
uv run pytest
# Format code
uv run ruff format .
# Check linting
uv run ruff check .
# Build package
uv build
Note: This project uses uv.lock for reproducible dependency management. The lock file is committed to ensure all developers and CI/CD use identical dependency versions.
Pre-commit hooks: The project includes pre-commit hooks that automatically format code, check linting, and run tests before each commit to maintain code quality.
License
MIT License. See LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamz_zmq-0.1.2.tar.gz.
File metadata
- Download URL: streamz_zmq-0.1.2.tar.gz
- Upload date:
- Size: 38.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6b0572089bb6c6f7c9ae2fdfeb0b2144c935dd42c6fa404e384127d53d8d967f
|
|
| MD5 |
1a4a241fb5224e2a3e5a455545b48e93
|
|
| BLAKE2b-256 |
3e6236ad550e7ec9b2212c9c60cefdfabf69110db6f9c358c4b6139d2052563f
|
Provenance
The following attestation bundles were made for streamz_zmq-0.1.2.tar.gz:
Publisher:
release.yml on izzet/streamz-zmq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamz_zmq-0.1.2.tar.gz -
Subject digest:
6b0572089bb6c6f7c9ae2fdfeb0b2144c935dd42c6fa404e384127d53d8d967f - Sigstore transparency entry: 336907306
- Sigstore integration time:
-
Permalink:
izzet/streamz-zmq@049b156431203706b6bac8d5d4432c773682a979 -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/izzet
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@049b156431203706b6bac8d5d4432c773682a979 -
Trigger Event:
release
-
Statement type:
File details
Details for the file streamz_zmq-0.1.2-py3-none-any.whl.
File metadata
- Download URL: streamz_zmq-0.1.2-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e0c813d6aa69990d0792f1c0155d22a1b574a45bf22fe1b98e3f4943bfd8647f
|
|
| MD5 |
75f0a7d28508f92fa8164527491351c4
|
|
| BLAKE2b-256 |
336014e7b72758d19d94ed13855ee584f41880ad5110a44771a921d5a1916b1a
|
Provenance
The following attestation bundles were made for streamz_zmq-0.1.2-py3-none-any.whl:
Publisher:
release.yml on izzet/streamz-zmq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamz_zmq-0.1.2-py3-none-any.whl -
Subject digest:
e0c813d6aa69990d0792f1c0155d22a1b574a45bf22fe1b98e3f4943bfd8647f - Sigstore transparency entry: 336907325
- Sigstore integration time:
-
Permalink:
izzet/streamz-zmq@049b156431203706b6bac8d5d4432c773682a979 -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/izzet
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@049b156431203706b6bac8d5d4432c773682a979 -
Trigger Event:
release
-
Statement type: