Skip to main content

High-performance zero-copy inter-process communication library for video streaming

Project description

ZeroBuffer Python Implementation

Native Python implementation of the ZeroBuffer protocol for high-performance zero-copy inter-process communication.

Features

  • True Zero-Copy: Uses memoryview and buffer protocol to avoid data copies
  • Cross-Platform: Supports Linux, Windows, and macOS
  • Protocol Compatible: Binary compatible with C++ and C# implementations
  • Pythonic API: Clean, idiomatic Python interface
  • Type Safe: Full type hints for better IDE support
  • Thread Safe: Built-in synchronization for multi-threaded applications

Requirements

  • Python 3.8 or later
  • posix-ipc (Linux/macOS only): pip install posix-ipc
  • pywin32 (Windows only): pip install pywin32

Installation

pip install -e .

Quick Start

Reader Example

from zerobuffer import Reader, BufferConfig

# Create a buffer
config = BufferConfig(metadata_size=1024, payload_size=1024*1024)
with Reader("my-buffer", config) as reader:
    # Wait for writer to connect
    print("Waiting for writer...")
    
    # Read frames
    while True:
        frame = reader.read_frame(timeout=5.0)
        if frame:
            # Option 1: Manual frame management
            data = frame.data  # This is a memoryview (zero-copy)
            print(f"Received frame {frame.sequence}: {len(data)} bytes")
            # Process the frame...
            reader.release_frame(frame)  # Must release to free buffer space
            
            # Option 2: Using context manager (RAII pattern - automatic disposal)
            with frame:
                data = frame.data  # Automatically released when exiting 'with' block
                print(f"Received frame {frame.sequence}: {len(data)} bytes")
                # Process the frame...
                # Frame is automatically released here

Writer Example

from zerobuffer import Writer

# Connect to existing buffer
with Writer("my-buffer") as writer:
    # Write some metadata
    metadata = b"{'format': 'raw', 'version': 1}"
    writer.set_metadata(metadata)
    
    # Write frames
    for i in range(100):
        data = f"Frame {i}".encode()
        writer.write_frame(data)
        print(f"Sent frame {i}")

Zero-Copy Advanced Usage

# True zero-copy writing with memoryview
import numpy as np

# Create numpy array
arr = np.arange(1000, dtype=np.float32)

# Get memoryview of array (no copy)
view = memoryview(arr)

# Write with zero-copy (memoryview is handled efficiently)
writer.write_frame(view)

# Or use direct buffer access for maximum performance
buffer = writer.get_frame_buffer(size=4000)
# Write directly into shared memory
buffer[:] = arr.tobytes()
writer.commit_frame()

API Reference

Reader Class

Reader(name: str, config: Optional[BufferConfig] = None)

Creates a new buffer and prepares for reading.

Methods:

  • get_metadata() -> Optional[memoryview]: Get metadata as zero-copy memoryview
  • read_frame(timeout: Optional[float] = 5.0) -> Optional[Frame]: Read next frame
  • release_frame(frame: Frame) -> None: Release frame and free buffer space
  • is_writer_connected() -> bool: Check if writer is connected
  • close() -> None: Close reader and clean up resources

Writer Class

Writer(name: str)

Connects to an existing buffer for writing.

Methods:

  • set_metadata(data: Union[bytes, bytearray, memoryview]) -> None: Write metadata (once only)
  • write_frame(data: Union[bytes, bytearray, memoryview]) -> None: Write a frame (zero-copy for memoryview)
  • get_frame_buffer(size: int) -> memoryview: Get buffer for direct writing
  • commit_frame() -> None: Commit frame after direct writing
  • is_reader_connected() -> bool: Check if reader is connected
  • close() -> None: Close writer

Frame Class

Represents a zero-copy reference to frame data with RAII support.

Properties:

  • data -> memoryview: Zero-copy view of frame data
  • size -> int: Size of frame data
  • sequence -> int: Sequence number
  • is_valid -> bool: Check if frame has valid data

RAII Support: The Frame class implements the context manager protocol for automatic resource management:

# Manual management
frame = reader.read_frame()
if frame:
    process(frame.data)
    reader.release_frame(frame)  # Must release manually

# Automatic management with context manager (RAII)
frame = reader.read_frame()
if frame:
    with frame:  # Enters context, frame is valid
        process(frame.data)
    # Frame is automatically disposed/released here

Using the context manager ensures frames are always properly released, even if an exception occurs during processing.

BufferConfig Class

BufferConfig(metadata_size: int = 1024, payload_size: int = 1024*1024)

Configuration for creating a buffer.

Zero-Copy Guarantees

The Python implementation provides true zero-copy access through:

  1. memoryview objects: No data copying when accessing frame data
  2. Buffer protocol: Direct memory access for compatible objects
  3. Shared memory: Direct mapping of shared memory into process space

When Copies Occur

  • Converting memoryview to bytes: bytes(frame.data)
  • Using non-buffer protocol objects with write_frame()
  • String encoding: "text".encode()

Avoiding Copies

  • Use memoryview objects whenever possible
  • Pass memoryview directly to write_frame() for zero-copy operation
  • Use numpy arrays or other buffer protocol objects
  • Access frame data directly via frame.data memoryview
  • Use get_frame_buffer() and commit_frame() for direct memory access

Performance Considerations

  1. Pre-allocate buffers: Reuse buffers instead of creating new ones
  2. Batch operations: Process multiple frames before releasing
  3. Use appropriate buffer sizes: See capacity planning in main README
  4. Monitor buffer utilization: Avoid buffer full conditions

Logging Configuration

The ZeroBuffer library uses Python's standard logging module. By default, it uses a NullHandler (no output) following Python library best practices.

Enable Logging via Environment Variable

Set the ZEROBUFFER_LOG_LEVEL environment variable:

export ZEROBUFFER_LOG_LEVEL=DEBUG
python your_app.py

Or in Python:

import os
os.environ['ZEROBUFFER_LOG_LEVEL'] = 'DEBUG'
import zerobuffer  # Import after setting env var

Configure Logging in Your Application

import logging
import zerobuffer

# Option 1: Basic configuration for debugging
logging.basicConfig(level=logging.DEBUG)

# Option 2: Configure only zerobuffer logging
logging.getLogger('zerobuffer').setLevel(logging.DEBUG)

# Add a handler to see the output
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logging.getLogger('zerobuffer').addHandler(handler)

# Option 3: Configure specific modules
logging.getLogger('zerobuffer.reader').setLevel(logging.DEBUG)  # Only reader logs
logging.getLogger('zerobuffer.writer').setLevel(logging.DEBUG)  # Only writer logs
logging.getLogger('zerobuffer.duplex.server').setLevel(logging.DEBUG)  # Only server logs

Log Levels

  • DEBUG: Detailed information for diagnosing issues
    • Buffer creation/initialization
    • Frame read/write operations with sizes and sequences
    • Wrap-around detection and handling
    • Semaphore operations
    • OIEB state changes
  • INFO: General operational information
    • Buffer connection/disconnection
    • Metadata operations
    • Major state changes
  • WARNING: Important warnings that don't prevent operation
    • Missing metadata
    • Retry operations
  • ERROR: Error conditions
    • Connection failures
    • Processing errors
    • Resource cleanup failures

Error Handling

All operations may raise exceptions from zerobuffer.exceptions:

  • WriterDeadException: Writer process died (accepts optional message)
  • ReaderDeadException: Reader process died (accepts optional message)
  • BufferFullException: Buffer is full
  • FrameTooLargeException: Frame exceeds buffer capacity
  • SequenceError: Frame sequence validation failed
  • MetadataAlreadyWrittenException: Metadata can only be written once
  • ZeroBufferException: Base exception for all ZeroBuffer errors

Thread Safety

The Reader and Writer classes are thread-safe. Multiple threads can:

  • Call methods on the same Reader/Writer instance
  • Read frames concurrently (with proper frame release)
  • Write frames concurrently

However, only one Reader and one Writer can connect to a buffer at a time.

Platform Notes

Linux

  • Uses POSIX shared memory (/dev/shm)
  • Requires posix-ipc package
  • File locks in /tmp/zerobuffer/

Windows

  • Uses Windows named shared memory
  • Requires pywin32 package
  • File locks in temp directory

macOS

  • Similar to Linux with BSD-specific handling
  • Requires posix-ipc package

Testing Utilities

The Python implementation includes testing utilities for cross-platform compatibility with C# and C++ implementations:

BufferNamingService

Ensures unique buffer names across test runs to prevent conflicts:

from zerobuffer_serve.services import BufferNamingService

# Creates unique buffer names for test isolation
naming_service = BufferNamingService(logger)
actual_name = naming_service.get_buffer_name("test-buffer")
# Returns: "test-buffer_<pid>_<timestamp>" or uses Harmony environment variables

TestDataPatterns

Provides consistent test data generation across all language implementations:

from zerobuffer_serve.test_data_patterns import TestDataPatterns

# Generate deterministic frame data
data = TestDataPatterns.generate_frame_data(size=1024, sequence=1)

# Generate simple pattern data
simple_data = TestDataPatterns.generate_simple_frame_data(size=1024)

# Verify data matches pattern
is_valid = TestDataPatterns.verify_simple_frame_data(data)

# Generate test metadata
metadata = TestDataPatterns.generate_metadata(size=256)

These utilities ensure that Python, C#, and C++ implementations can exchange data correctly in cross-platform tests.

Duplex Channel Support

The Python implementation includes full support for duplex channels (bidirectional request-response communication):

from zerobuffer.duplex import DuplexChannelFactory, ImmutableDuplexServer, ProcessingMode
from zerobuffer import BufferConfig, Frame, Writer

# Server side
factory = DuplexChannelFactory()
config = BufferConfig(metadata_size=4096, payload_size=10*1024*1024)
server = factory.create_immutable_server("my-service", config)

def handle_request(frame: Frame, response_writer: Writer) -> None:
    """Process request and send response"""
    with frame:  # RAII - frame is automatically disposed
        # Access request data (zero-copy)
        request_data = bytes(frame.data)
        
        # Process the request
        response_data = process_data(request_data)
        
        # Send response
        response_writer.write_frame(response_data)

# Start server with single-thread processing (default)
server.start(handle_request)
# Or specify processing mode explicitly
# server.start(handle_request, mode=ProcessingMode.SINGLE_THREAD)

# Client side
client = factory.create_client("my-service")

# Send request
request_data = b"Hello, server!"
client.send_request(request_data)

# Receive response with timeout
response = client.receive_response(timeout=5.0)
if response:
    with response:  # RAII - automatically disposed
        print(f"Response: {bytes(response.data)}")
        print(f"Sequence: {response.sequence}")

Server Error Handling

from zerobuffer.error_event_args import ErrorEventArgs

def handle_error(args: ErrorEventArgs) -> None:
    """Handle server errors"""
    print(f"Server error: {args.exception}")

# Add error handler
server.add_error_handler(handle_error)

# Start server with metadata initialization
def on_init(metadata: memoryview) -> None:
    """Process client metadata on connection"""
    client_info = bytes(metadata).decode()
    print(f"Client connected with metadata: {client_info}")

server.start(handle_request, on_init=on_init)

See DUPLEX_CHANNEL.md for detailed documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zerobuffer_ipc-1.2.4.tar.gz (107.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zerobuffer_ipc-1.2.4-py3-none-any.whl (101.7 kB view details)

Uploaded Python 3

File details

Details for the file zerobuffer_ipc-1.2.4.tar.gz.

File metadata

  • Download URL: zerobuffer_ipc-1.2.4.tar.gz
  • Upload date:
  • Size: 107.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for zerobuffer_ipc-1.2.4.tar.gz
Algorithm Hash digest
SHA256 f774f4411bf0292cafb2cc15ad242784fa1af88b8537c2ed59e89f4c53ebecfb
MD5 33ec51a2a53a60595e65469b6ef33657
BLAKE2b-256 b476afe56a50e3b1cc06ba9313d64186eea78b433037a780a442c3af477d5057

See more details on using hashes here.

File details

Details for the file zerobuffer_ipc-1.2.4-py3-none-any.whl.

File metadata

  • Download URL: zerobuffer_ipc-1.2.4-py3-none-any.whl
  • Upload date:
  • Size: 101.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.13

File hashes

Hashes for zerobuffer_ipc-1.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 cd655f9e6e4d3e7c873ee852c8010425aec60376ea820902113d1d72c3c5b174
MD5 9fea594e0563e437a363f42d1ddd7ad8
BLAKE2b-256 3c43aeb3b93dc877e6c4cd3150936c6121e4fecc0e53f15f95203bcd7760fa68

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page