High-performance PyO3 bindings for tensor_iroh
Project description
Tensor Protocol - Direct Streaming Implementation
This is a direct streaming tensor transfer protocol built on top of Iroh's QUIC networking stack. Unlike traditional approaches, this implementation streams tensor data directly over QUIC connections for maximum performance with intelligent connection pooling.
Installation
You can install the PyO3-based Python bindings directly from PyPI using pip:
pip install tensor_iroh
Why This Tensor Protocol is Faster
1. Direct QUIC Streaming vs Traditional Approaches
- Traditional: Request → Store → Download (3-step process with disk I/O)
- This Protocol: Direct stream transfer (1-step, memory-to-memory)
- Performance Gain: 10-100x faster for repeated sends due to connection reuse
2. Connection Pooling Architecture
- Smart Reuse: Maintains QUIC connections for 5 minutes after use
- Zero Setup Overhead: Subsequent sends to same peer skip connection establishment
- Latency Reduction: Saves 100-500ms per send after initial connection
3. Zero-Copy Design
- Memory Efficiency: Tensors stream directly without intermediate buffers
- Reduced GC Pressure: Minimal Python object creation during transfer
- Low-Jitter Receive: Awaitable, non-polling receive path avoids busy-loop sleeps and reduces event-loop jitter
Key Features
- Direct QUIC Streaming: Tensors are sent directly over QUIC streams without intermediate blob storage
- Connection Pooling: Intelligent reuse of QUIC connections for improved performance
- Zero-Copy Design: Minimal data copying for efficient memory usage
- Dual Python Bindings: Both PyO3 (high-performance) and UniFFI (stable) options
- Async/Await Support: Full async support for non-blocking operations
- Async Receive without Polling:
await node.wait_for_tensor()yields the next tensor as soon as it arrives - Security: TLS 1.3 encryption by default
Architecture
Core Components
- TensorProtocolHandler: Implements Iroh's
ProtocolHandlertrait for custom protocol handling - TensorNode: Main API for sending/receiving tensors with connection pooling
- ConnectionPool: Manages QUIC connection reuse for performance optimization
- Direct Streaming: Uses QUIC bidirectional streams for immediate data transfer
- Custom ALPN: Uses
"tensor-iroh/direct/0"for protocol identification
Connection Pool Architecture
pub struct ConnectionPool {
connections: Arc<AsyncMutex<HashMap<String, PooledConnection>>>,
max_idle_time: Duration, // 5 minutes default
max_connections: usize, // 10 connections default
}
pub struct PooledConnection {
connections: Connection, // Iroh QUIC connection
last_used: Instant, // Last usage timestamp
is_idle: bool, // Connection state
}
Protocol Flow
Node A Node B
| |
|-- Connect (QUIC) ------>|
| (or reuse existing) |-- Accept Connection
|-- Send TensorMessage -->|
| (with tensor data) |-- Process & Store
| |
|-- Return to Pool ------>|
| (connection reuse) |
Message Types
enum TensorMessage {
Request { tensor_name: String }, // Request specific tensor
Response { tensor_name: String, data: TensorData }, // Send tensor data
Error { message: String }, // Error response
}
Performance Optimizations
Connection Pooling Benefits
- Reduced Latency: Eliminates connection setup overhead (~100-500ms per send)
- Better Throughput: Maintained connections have superior performance
- Resource Efficiency: Fewer active connections to manage
- Scalability: Handles high-frequency tensor sends efficiently
Pool Management
- Automatic Cleanup: Idle connections are cleaned up after 5 minutes
- Thread Safety: Uses
tokio::sync::AsyncMutexfor async-aware locking - Connection Limits: Maximum 10 concurrent connections per node
- Smart Reuse: Connections are marked idle and reused for subsequent sends
- Async Receiver: The internal receive queue is protected by an async-aware lock and supports true async waiting (no polling)
Performance Comparison
| Feature | Traditional Approaches | This Tensor Protocol |
|---|---|---|
| Latency | High (3-step process) | Low (direct transfer + connection reuse) |
| Memory | Stores data on disk | Streams directly with pooling |
| Complexity | Request→Store→Download | Single stream transfer |
| Scalability | Limited by storage | Limited by network + connection pool |
| Performance | Network + storage overhead | Optimized for repeated sends |
| Connection Reuse | None | Intelligent pooling (5min idle) |
| Setup Overhead | Per-request | Once per peer |
Building and Testing
Prerequisites
- Rust 1.70+
- Python 3.8+
- WSL (for Windows users)
Build Options
Option 1: PyO3 Bindings (Recommended for Performance)
# Build PyO3 wheel with torch support
chmod +x build_pyo3_bindings.sh
./build_pyo3_bindings.sh
# Test PyO3 bindings
python python/test_tensor_protocol_pyo3.py
Option 2: UniFFI Bindings (Stable)
# Build UniFFI bindings
chmod +x build_uniffi_and_test.sh
./build_uniffi_and_test.sh
# Test UniFFI bindings
python python/test_tensor_protocol_uniffi.py
Manual Build
# Navigate to the project directory
cd tensor-iroh
# Build Rust library
cargo build --release
# For PyO3 bindings
maturin build --release -F "python,torch" --out ./target/wheels
# For UniFFI bindings
uniffi-bindgen generate src/tensor_protocol.udl --language python --out-dir .
mkdir -p tensor_protocol_py
cp tensor_protocol.py tensor_protocol_py/
# Copy library files as shown in build_uniffi_and_test.sh
Usage Example
PyO3 Bindings (Recommended)
import asyncio
import tensor_iroh as tp
async def main():
# Create nodes (with connection pooling enabled)
sender = tp.PyTensorNode()
receiver = tp.PyTensorNode()
# Start nodes
await sender.start()
await receiver.start()
# Get addresses
receiver_addr = await receiver.get_node_addr()
# Create tensor data
tensor_data = tp.PyTensorData(
b"tensor_bytes_here", # raw bytes
[2, 3], # shape
"float32", # dtype
False # requires_grad
)
# Send tensor directly (connection will be pooled)
await sender.send_tensor(receiver_addr, "my_tensor", tensor_data)
# Send again to same peer (connection will be reused - much faster!)
await sender.send_tensor(receiver_addr, "my_tensor2", tensor_data)
# Check pool size (should be 1 for single peer)
pool_size = await sender.pool_size()
print(f"Connection pool size: {pool_size}")
# Receive tensor without polling: wait for next tensor
name, received_td = await receiver.wait_for_tensor()
print(f"Received tensor '{name}' with shape: {received_td.shape}")
# Cleanup
sender.shutdown()
receiver.shutdown()
asyncio.run(main())
UniFFI Bindings
import asyncio
from tensor_protocol import create_node, TensorData, TensorMetadata
async def main():
# Create nodes (with connection pooling enabled)
sender = create_node(None)
receiver = create_node(None)
# Start nodes
await sender.start()
await receiver.start()
# Get addresses
receiver_addr = await receiver.get_node_addr()
# Create tensor
tensor_data = TensorData(
metadata=TensorMetadata(
shape=[2, 3],
dtype="float32",
requires_grad=False
),
data=b"tensor_bytes_here"
)
# Send tensor directly (connection will be pooled)
await sender.send_tensor_direct(receiver_addr, "my_tensor", tensor_data)
# Send again to same peer (connection will be reused)
await sender.send_tensor_direct(receiver_addr, "my_tensor2", tensor_data)
# Check pool size (should be 1 for single peer)
pool_size = await sender.get_pool_size()
print(f"Connection pool size: {pool_size}")
# Receive tensor
received = await receiver.receive_tensor()
print(f"Received tensor: {received}")
# Cleanup
sender.shutdown()
receiver.shutdown()
asyncio.run(main())
Performance Characteristics
- Small tensors (< 1MB): ~1-5ms latency
- Large tensors (> 100MB): Limited by network bandwidth
- Connection reuse: ~100-500ms saved per subsequent send to same peer
- Throughput: Optimized by connection pooling
- Memory usage: Minimal buffering, streaming design with intelligent pooling
- Stable Latency: Awaitable receive avoids busy-polling jitter in Python event loops
Comprehensive Testing
The protocol includes 13 comprehensive stress tests:
- Basic Functionality: Core tensor send/receive
- Pull/Request Pattern: Control plane operations
- Concurrent Sends: Race condition testing
- Rapid Fire Sends: Timing stress testing
- Large Tensor Transfer: 1MB+ tensor handling
- Multiple Receivers: Broadcast scenarios
- Send Before Ready: Timing edge cases
- Immediate Shutdown: Resource cleanup
- Timeout Scenarios: Network timeout handling
- Non-existent Tensor: Error handling
- Bad Ticket Parsing: Invalid address handling
- Post-shutdown Behavior: Cleanup validation
- Connection Pool Reuse: Pool functionality validation
Error Handling
The protocol includes comprehensive error handling:
TensorError::Io: Network I/O errorsTensorError::Serialization: Data serialization errorsTensorError::Connection: QUIC connection errorsTensorError::Protocol: Protocol-level errors
Thread Safety
- Async-aware locking: Uses
tokio::sync::AsyncMutexfor connection pool - Async-aware receiver: Internal receiver channel is protected by an async lock and supports
recv().await - Non-blocking operations: All async operations are non-blocking
- Concurrent access: Multiple threads can safely access the connection pool
- Resource management: Automatic cleanup of idle connections
Future Enhancements
- Compression: Add tensor compression for network efficiency
- Streaming: Support for tensor streaming (partial sends)
- Authentication: Add peer authentication and authorization
- Monitoring: Add metrics and performance monitoring
- Batching: Support for batched tensor transfers
- Pool Metrics: Connection pool performance monitoring
- Adaptive Pooling: Dynamic pool size based on usage patterns
License
This implementation is designed to be compatible with Iroh's dual Apache-2.0/MIT license structure.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tensor_iroh-0.1.1a2-cp38-abi3-manylinux_2_34_x86_64.whl.
File metadata
- Download URL: tensor_iroh-0.1.1a2-cp38-abi3-manylinux_2_34_x86_64.whl
- Upload date:
- Size: 7.1 MB
- Tags: CPython 3.8+, manylinux: glibc 2.34+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7cca583a4f4c9c7ae31642fa763023efb21b133ad67c707fafda0c72dec6b18b
|
|
| MD5 |
0bc21edab9fd7af4cc367dc8eda598b2
|
|
| BLAKE2b-256 |
4a5acbdec259739195a570f6dce70e152ecfe43f7d8bed2fce3d3c0edb2cb916
|