High-performance zero-copy inter-process communication library for video streaming
Project description
ZeroBuffer Python Implementation
Native Python implementation of the ZeroBuffer protocol for high-performance zero-copy inter-process communication.
Features
- True Zero-Copy: Uses memoryview and buffer protocol to avoid data copies
- Cross-Platform: Supports Linux, Windows, and macOS
- Protocol Compatible: Binary compatible with C++ and C# implementations
- Pythonic API: Clean, idiomatic Python interface
- Type Safe: Full type hints for better IDE support
- Thread Safe: Built-in synchronization for multi-threaded applications
Requirements
- Python 3.8 or later
posix-ipc(Linux/macOS only):pip install posix-ipcpywin32(Windows only):pip install pywin32
Installation
pip install -e .
Quick Start
Reader Example
from zerobuffer import Reader, BufferConfig
# Create a buffer
config = BufferConfig(metadata_size=1024, payload_size=1024*1024)
with Reader("my-buffer", config) as reader:
# Wait for writer to connect
print("Waiting for writer...")
# Read frames
while True:
frame = reader.read_frame(timeout=5.0)
if frame:
# Option 1: Manual frame management
data = frame.data # This is a memoryview (zero-copy)
print(f"Received frame {frame.sequence}: {len(data)} bytes")
# Process the frame...
reader.release_frame(frame) # Must release to free buffer space
# Option 2: Using context manager (RAII pattern - automatic disposal)
with frame:
data = frame.data # Automatically released when exiting 'with' block
print(f"Received frame {frame.sequence}: {len(data)} bytes")
# Process the frame...
# Frame is automatically released here
Writer Example
from zerobuffer import Writer
# Connect to existing buffer
with Writer("my-buffer") as writer:
# Write some metadata
metadata = b"{'format': 'raw', 'version': 1}"
writer.set_metadata(metadata)
# Write frames
for i in range(100):
data = f"Frame {i}".encode()
writer.write_frame(data)
print(f"Sent frame {i}")
Zero-Copy Advanced Usage
# True zero-copy writing with memoryview
import numpy as np
# Create numpy array
arr = np.arange(1000, dtype=np.float32)
# Get memoryview of array (no copy)
view = memoryview(arr)
# Write with zero-copy (memoryview is handled efficiently)
writer.write_frame(view)
# Or use direct buffer access for maximum performance
buffer = writer.get_frame_buffer(size=4000)
# Write directly into shared memory
buffer[:] = arr.tobytes()
writer.commit_frame()
API Reference
Reader Class
Reader(name: str, config: Optional[BufferConfig] = None)
Creates a new buffer and prepares for reading.
Methods:
get_metadata() -> Optional[memoryview]: Get metadata as zero-copy memoryviewread_frame(timeout: Optional[float] = 5.0) -> Optional[Frame]: Read next framerelease_frame(frame: Frame) -> None: Release frame and free buffer spaceis_writer_connected() -> bool: Check if writer is connectedclose() -> None: Close reader and clean up resources
Writer Class
Writer(name: str)
Connects to an existing buffer for writing.
Methods:
set_metadata(data: Union[bytes, bytearray, memoryview]) -> None: Write metadata (once only)write_frame(data: Union[bytes, bytearray, memoryview]) -> None: Write a frame (zero-copy for memoryview)get_frame_buffer(size: int) -> memoryview: Get buffer for direct writingcommit_frame() -> None: Commit frame after direct writingis_reader_connected() -> bool: Check if reader is connectedclose() -> None: Close writer
Frame Class
Represents a zero-copy reference to frame data with RAII support.
Properties:
data -> memoryview: Zero-copy view of frame datasize -> int: Size of frame datasequence -> int: Sequence numberis_valid -> bool: Check if frame has valid data
RAII Support: The Frame class implements the context manager protocol for automatic resource management:
# Manual management
frame = reader.read_frame()
if frame:
process(frame.data)
reader.release_frame(frame) # Must release manually
# Automatic management with context manager (RAII)
frame = reader.read_frame()
if frame:
with frame: # Enters context, frame is valid
process(frame.data)
# Frame is automatically disposed/released here
Using the context manager ensures frames are always properly released, even if an exception occurs during processing.
BufferConfig Class
BufferConfig(metadata_size: int = 1024, payload_size: int = 1024*1024)
Configuration for creating a buffer.
Zero-Copy Guarantees
The Python implementation provides true zero-copy access through:
- memoryview objects: No data copying when accessing frame data
- Buffer protocol: Direct memory access for compatible objects
- Shared memory: Direct mapping of shared memory into process space
When Copies Occur
- Converting memoryview to bytes:
bytes(frame.data) - Using non-buffer protocol objects with
write_frame() - String encoding:
"text".encode()
Avoiding Copies
- Use
memoryviewobjects whenever possible - Pass memoryview directly to
write_frame()for zero-copy operation - Use numpy arrays or other buffer protocol objects
- Access frame data directly via
frame.datamemoryview - Use
get_frame_buffer()andcommit_frame()for direct memory access
Performance Considerations
- Pre-allocate buffers: Reuse buffers instead of creating new ones
- Batch operations: Process multiple frames before releasing
- Use appropriate buffer sizes: See capacity planning in main README
- Monitor buffer utilization: Avoid buffer full conditions
Logging Configuration
The ZeroBuffer library uses Python's standard logging module. By default, it uses a NullHandler (no output) following Python library best practices.
Enable Logging via Environment Variable
Set the ZEROBUFFER_LOG_LEVEL environment variable:
export ZEROBUFFER_LOG_LEVEL=DEBUG
python your_app.py
Or in Python:
import os
os.environ['ZEROBUFFER_LOG_LEVEL'] = 'DEBUG'
import zerobuffer # Import after setting env var
Configure Logging in Your Application
import logging
import zerobuffer
# Option 1: Basic configuration for debugging
logging.basicConfig(level=logging.DEBUG)
# Option 2: Configure only zerobuffer logging
logging.getLogger('zerobuffer').setLevel(logging.DEBUG)
# Add a handler to see the output
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logging.getLogger('zerobuffer').addHandler(handler)
# Option 3: Configure specific modules
logging.getLogger('zerobuffer.reader').setLevel(logging.DEBUG) # Only reader logs
logging.getLogger('zerobuffer.writer').setLevel(logging.DEBUG) # Only writer logs
logging.getLogger('zerobuffer.duplex.server').setLevel(logging.DEBUG) # Only server logs
Log Levels
- DEBUG: Detailed information for diagnosing issues
- Buffer creation/initialization
- Frame read/write operations with sizes and sequences
- Wrap-around detection and handling
- Semaphore operations
- OIEB state changes
- INFO: General operational information
- Buffer connection/disconnection
- Metadata operations
- Major state changes
- WARNING: Important warnings that don't prevent operation
- Missing metadata
- Retry operations
- ERROR: Error conditions
- Connection failures
- Processing errors
- Resource cleanup failures
Error Handling
All operations may raise exceptions from zerobuffer.exceptions:
WriterDeadException: Writer process died (accepts optional message)ReaderDeadException: Reader process died (accepts optional message)BufferFullException: Buffer is fullFrameTooLargeException: Frame exceeds buffer capacitySequenceError: Frame sequence validation failedMetadataAlreadyWrittenException: Metadata can only be written onceZeroBufferException: Base exception for all ZeroBuffer errors
Thread Safety
The Reader and Writer classes are thread-safe. Multiple threads can:
- Call methods on the same Reader/Writer instance
- Read frames concurrently (with proper frame release)
- Write frames concurrently
However, only one Reader and one Writer can connect to a buffer at a time.
Platform Notes
Linux
- Uses POSIX shared memory (
/dev/shm) - Requires
posix-ipcpackage - File locks in
/tmp/zerobuffer/
Windows
- Uses Windows named shared memory
- Requires
pywin32package - File locks in temp directory
macOS
- Similar to Linux with BSD-specific handling
- Requires
posix-ipcpackage
Testing Utilities
The Python implementation includes testing utilities for cross-platform compatibility with C# and C++ implementations:
BufferNamingService
Ensures unique buffer names across test runs to prevent conflicts:
from zerobuffer_serve.services import BufferNamingService
# Creates unique buffer names for test isolation
naming_service = BufferNamingService(logger)
actual_name = naming_service.get_buffer_name("test-buffer")
# Returns: "test-buffer_<pid>_<timestamp>" or uses Harmony environment variables
TestDataPatterns
Provides consistent test data generation across all language implementations:
from zerobuffer_serve.test_data_patterns import TestDataPatterns
# Generate deterministic frame data
data = TestDataPatterns.generate_frame_data(size=1024, sequence=1)
# Generate simple pattern data
simple_data = TestDataPatterns.generate_simple_frame_data(size=1024)
# Verify data matches pattern
is_valid = TestDataPatterns.verify_simple_frame_data(data)
# Generate test metadata
metadata = TestDataPatterns.generate_metadata(size=256)
These utilities ensure that Python, C#, and C++ implementations can exchange data correctly in cross-platform tests.
Duplex Channel Support
The Python implementation includes full support for duplex channels (bidirectional request-response communication):
from zerobuffer.duplex import DuplexChannelFactory, ImmutableDuplexServer, ProcessingMode
from zerobuffer import BufferConfig, Frame, Writer
# Server side
factory = DuplexChannelFactory()
config = BufferConfig(metadata_size=4096, payload_size=10*1024*1024)
server = factory.create_immutable_server("my-service", config)
def handle_request(frame: Frame, response_writer: Writer) -> None:
"""Process request and send response"""
with frame: # RAII - frame is automatically disposed
# Access request data (zero-copy)
request_data = bytes(frame.data)
# Process the request
response_data = process_data(request_data)
# Send response
response_writer.write_frame(response_data)
# Start server with single-thread processing (default)
server.start(handle_request)
# Or specify processing mode explicitly
# server.start(handle_request, mode=ProcessingMode.SINGLE_THREAD)
# Client side
client = factory.create_client("my-service")
# Send request
request_data = b"Hello, server!"
client.send_request(request_data)
# Receive response with timeout
response = client.receive_response(timeout=5.0)
if response:
with response: # RAII - automatically disposed
print(f"Response: {bytes(response.data)}")
print(f"Sequence: {response.sequence}")
Server Error Handling
from zerobuffer.error_event_args import ErrorEventArgs
def handle_error(args: ErrorEventArgs) -> None:
"""Handle server errors"""
print(f"Server error: {args.exception}")
# Add error handler
server.add_error_handler(handle_error)
# Start server with metadata initialization
def on_init(metadata: memoryview) -> None:
"""Process client metadata on connection"""
client_info = bytes(metadata).decode()
print(f"Client connected with metadata: {client_info}")
server.start(handle_request, on_init=on_init)
See DUPLEX_CHANNEL.md for detailed documentation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zerobuffer_ipc-1.2.7.tar.gz.
File metadata
- Download URL: zerobuffer_ipc-1.2.7.tar.gz
- Upload date:
- Size: 107.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6760c1ed7a926c5280bdbc127f4236f42797ce430fbff906757a6009d89003ac
|
|
| MD5 |
951cb3d16eec3c273ff0d47747e34600
|
|
| BLAKE2b-256 |
692e971dec9830a79fd4c37ed9b18222749f0034c3e622bd12b7a8fed9534da7
|
File details
Details for the file zerobuffer_ipc-1.2.7-py3-none-any.whl.
File metadata
- Download URL: zerobuffer_ipc-1.2.7-py3-none-any.whl
- Upload date:
- Size: 101.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
50f66bf52b57fe8d29898dc6f28ca40c4244d2be8374ddb41d3f00b6fa16b3f2
|
|
| MD5 |
609b18f79743323928ba197d88ed6b7d
|
|
| BLAKE2b-256 |
61093fe5f729b36bd9143e1e31426801651920f16cec021f0a2609197f1dd87c
|