SWIM Protocol implementation for P2P membership and failure detection with ZeroMQ integration
Project description
SWIM P2P
A production-ready Python implementation of the SWIM protocol (Scalable Weakly-consistent Infection-style Process Group Membership) for peer-to-peer membership and failure detection in distributed systems.
Overview
SWIM is a gossip-based protocol for maintaining membership in distributed systems. This implementation provides:
- Scalable membership management with constant message load per node
- Weakly-consistent membership view across all nodes
- Failure detection with configurable timeouts and suspicion mechanisms
- Efficient dissemination of membership changes via piggybacked gossip
- ZeroMQ integration for reliable application-level messaging
- Comprehensive metrics and monitoring capabilities
- Production-ready with extensive testing and documentation
Installation
Install from PyPI:
pip install swim_p2p
Or for development (editable install):
git clone <your-repo-url>
cd swim_p2p
pip install -e .
Quick Start
Using the CLI (swim-node)
After installation, you can run a standalone SWIM node:
swim-node --addr 127.0.0.1:5000 --seeds 127.0.0.1:5001
--addrspecifies the bind address (host:port).--seedsis a comma-separated list of existing node addresses to join.
As a library in your Python code
from swim.protocol.node import Node
from swim.transport.hybrid import HybridTransport
from swim.protocol.member import MemberList
from swim.protocol.failure_detector import FailureDetector
from swim.protocol.disseminator import GossipService
# Create transport
transport = HybridTransport()
await transport.bind(("127.0.0.1", 5000))
# Create SWIM components
members = MemberList()
failure_detector = FailureDetector(transport, members)
gossip = GossipService(transport, members)
# Create and start node
node = Node(transport, members, failure_detector, gossip)
await node.start()
# Join cluster
await node.join_cluster([("127.0.0.1", 5001)])
# Get cluster state
alive_members = members.get_alive_members()
print(f"Cluster members: {alive_members}")
ZeroMQ Integration
For reliable messaging between nodes:
from swim.integration.agent import ZMQAgentIntegration
# Create ZMQ agent
zmq_agent = ZMQAgentIntegration(
node_id="127.0.0.1:5000",
bind_address="127.0.0.1:6000",
event_dispatcher=event_dispatcher,
config={"NODE_NAME": "NodeA"}
)
await zmq_agent.start()
# Send reliable message
success = await zmq_agent.send_automated_check_in("127.0.0.1:5001")
Architecture
The library is organized into several key components:
Core Protocol (swim.protocol)
- Member Management: Track cluster membership and metadata
- Failure Detection: Detect node failures using SWIM algorithm
- Gossip Dissemination: Spread membership changes efficiently
- Node Coordination: Orchestrate all protocol components
Transport Layer (swim.transport)
- UDP Transport: High-performance for local networks
- TCP Transport: Reliable for WAN deployments
- Hybrid Transport: Automatic selection based on conditions
ZeroMQ Integration (swim.integration)
- Reliable Messaging: Guaranteed message delivery with acknowledgments
- Circuit Breakers: Fail-fast behavior for unhealthy nodes
- Flow Control: Prevent overwhelming slow nodes
- Connection Management: Automatic connection pooling
Metrics & Monitoring (swim.metrics)
- Protocol Metrics: Membership changes, failure detection times
- Network Metrics: Bandwidth usage, message rates, latency
- Performance Metrics: CPU usage, memory consumption
- Prometheus Integration: Compatible metrics endpoint
Lifeguard Enhancements (swim.lifeguard)
- Adaptive Probe Rates: Dynamic adjustment based on network conditions
- Suspicion Timeout Optimization: Intelligent timeout calculation
- Network Condition Awareness: Automatic adaptation to network quality
- False Positive Reduction: Advanced algorithms to minimize incorrect failures
Project Structure
swim_p2p/ # Repository root
├── pyproject.toml # Build configuration and dependencies
├── README.md # This file - project overview and quick start
├── CHANGELOG.md # Version history and release notes
├── CONTRIBUTING.md # Contribution guidelines
├── LICENSE # MIT license
├── MANIFEST.in # Package manifest for non-Python files
├── requirements.txt # Development dependencies
│
├── src/ # Source code directory
│ └── swim/ # Main package
│ ├── __init__.py # Package initialization and exports
│ ├── __version__.py # Version information
│ ├── py.typed # Type checking marker
│ ├── main.py # CLI entry point (swim-node command)
│ ├── config.py # Configuration management and validation
│ │
│ ├── protocol/ # Core SWIM protocol implementation
│ │ ├── __init__.py
│ │ ├── member.py # Cluster membership management
│ │ ├── failure_detector.py # SWIM failure detection algorithm
│ │ ├── disseminator.py # Gossip-based message dissemination
│ │ ├── node.py # Main node coordination and orchestration
│ │ ├── message.py # Protocol message definitions and serialization
│ │ └── sync.py # Cluster synchronization mechanisms
│ │
│ ├── transport/ # Network transport layer
│ │ ├── __init__.py
│ │ ├── base.py # Abstract transport interface
│ │ ├── udp.py # High-performance UDP transport
│ │ ├── tcp.py # Reliable TCP transport
│ │ └── hybrid.py # Intelligent transport selection
│ │
│ ├── integration/ # ZeroMQ integration layer
│ │ ├── __init__.py
│ │ ├── agent.py # Main ZMQ agent integration orchestrator
│ │ ├── load_balancer.py # Load balancing for ZMQ connections
│ │ ├── message_router.py # Message routing and distribution
│ │ │
│ │ ├── messaging/ # Reliable messaging components
│ │ │ ├── __init__.py
│ │ │ ├── ack_system.py # Message acknowledgment system
│ │ │ ├── buffer_monitor.py # Buffer overflow monitoring
│ │ │ ├── circuit_breaker.py # Circuit breaker pattern implementation
│ │ │ ├── congestion.py # Congestion control mechanisms
│ │ │ ├── message_registry.py # Message tracking and management
│ │ │ ├── reliability.py # Reliable message delivery
│ │ │ ├── trace.py # Distributed tracing support
│ │ │ └── workflow.py # Message workflow management
│ │ │
│ │ └── zmq/ # ZeroMQ-specific components
│ │ ├── __init__.py
│ │ ├── capacity_tracker.py # Connection capacity monitoring
│ │ ├── connection_manager.py # ZMQ connection lifecycle
│ │ ├── dealer.py # DEALER socket management
│ │ ├── flow_control.py # Flow control and backpressure
│ │ ├── monitoring.py # ZMQ socket monitoring
│ │ ├── ordering.py # Message ordering guarantees
│ │ └── router.py # ROUTER socket management
│ │
│ ├── lifeguard/ # Reliability enhancements
│ │ ├── __init__.py
│ │ ├── awareness.py # Network condition awareness
│ │ ├── probe_rate.py # Adaptive probe rate adjustment
│ │ └── timing.py # Intelligent timeout calculation
│ │
│ ├── metrics/ # Metrics and monitoring
│ │ ├── __init__.py
│ │ ├── collector.py # Metrics collection and aggregation
│ │ ├── latency.py # Latency measurement and tracking
│ │ ├── bandwidth.py # Bandwidth usage monitoring
│ │ ├── metrics_cli.py # CLI for metrics access
│ │ │
│ │ └── api/ # Metrics API components
│ │ ├── __init__.py
│ │ ├── cli.py # Command-line metrics interface
│ │ ├── client.py # Metrics API client
│ │ ├── integration.py # Integration with external systems
│ │ └── server.py # Metrics API server
│ │
│ ├── events/ # Event-driven architecture
│ │ ├── __init__.py
│ │ ├── dispatcher.py # Event dispatching and routing
│ │ ├── handlers.py # Event handler implementations
│ │ └── types.py # Event type definitions
│ │
│ ├── diagnostics/ # Diagnostic and debugging tools
│ │ ├── __init__.py
│ │ ├── health_checker.py # Health check implementations
│ │ ├── message_tracer.py # Message tracing and debugging
│ │ └── performance_analyzer.py # Performance analysis tools
│ │
│ └── utils/ # Utility functions and helpers
│ ├── __init__.py
│ ├── logging.py # Structured logging configuration
│ ├── network.py # Network utility functions
│ ├── rate_limiter.py # Rate limiting utilities
│ └── serialization.py # Data serialization helpers
│
├── tests/ # Test suite
│ ├── __init__.py
│ ├── unit/ # Unit tests
│ │ ├── __init__.py
│ │ ├── events/ # Event system tests
│ │ │ ├── test_dispatcher.py
│ │ │ ├── test_handlers.py
│ │ │ └── test_types.py
│ │ ├── lifeguard/ # Reliability enhancement tests
│ │ │ ├── test_awareness.py
│ │ │ ├── test_network_stress.py
│ │ │ ├── test_probe_rate.py
│ │ │ └── test_timing.py
│ │ ├── metrics/ # Metrics system tests
│ │ │ ├── test_bandwidth_monitor.py
│ │ │ ├── test_latency_tracker.py
│ │ │ └── test_metrics_collector.py
│ │ ├── protocol/ # Core protocol tests
│ │ │ ├── test_disseminator.py
│ │ │ ├── test_failure_detector.py
│ │ │ ├── test_member.py
│ │ │ ├── test_message.py
│ │ │ ├── test_node.py
│ │ │ └── test_sync.py
│ │ ├── transport/ # Transport layer tests
│ │ │ ├── __init__.py
│ │ │ ├── test_base.py
│ │ │ ├── test_hybrid.py
│ │ │ ├── test_tcp.py
│ │ │ ├── test_transport_fallback.py
│ │ │ └── test_udp.py
│ │ └── utils/ # Utility function tests
│ │ ├── test_logging.py
│ │ ├── test_network.py
│ │ ├── test_rate_limiter.py
│ │ └── test_serialization.py
│ │
│ ├── integration/ # Integration tests
│ │ ├── network_stress.py # Network stress testing
│ │ ├── send_big_message.py # Large message handling tests
│ │ ├── test_message_size.py # Message size limit tests
│ │ ├── test_swim_integration.py # End-to-end SWIM tests
│ │ └── test_udp_fallback.py # Transport fallback tests
│ │
│ └── swim_zmq/ # ZMQ integration tests
│ ├── test_both_one_n_two.py # Multi-node ZMQ tests
│ ├── test_enhanced_phase2.py # Enhanced ZMQ functionality tests
│ ├── test_phase_one.py # Basic ZMQ integration tests
│ └── test_phase_two.py # Advanced ZMQ feature tests
│
├── examples/ # Example applications
│ └── transport_example.py # Transport layer usage example
│
└── docs/ # Additional documentation
└── COMPREHENSIVE_DOCUMENTATION.md # Complete API reference
Testing
Run the comprehensive test suite:
# Unit tests
pytest tests/unit/ -v
# Integration tests
pytest tests/integration/ -v
# All tests with coverage
pytest --cov=swim --cov-report=html
Performance
- Cluster Size: Successfully tested with 1000+ nodes
- Message Throughput: 10,000+ messages/second per node
- Failure Detection Time: < 5 seconds average (configurable)
- Memory Usage: ~50MB per node baseline
- CPU Usage: < 5% on modern hardware for typical workloads
Configuration
The library supports extensive configuration options:
config = {
# Protocol timing
"protocol_period": 1.0,
"failure_timeout": 5.0,
"suspect_timeout": 3.0,
# Transport settings
"transport_type": "hybrid",
"udp_max_size": 1400,
# ZMQ settings
"zmq_port_offset": 1000,
"circuit_breaker_threshold": 5,
# Metrics
"enable_metrics": True,
"metrics_port": 8080
}
Documentation
- Comprehensive Documentation: Complete API reference and usage examples
- Agent Integration Patterns: Theoretical guidance for building distributed agent systems
- Configuration Guide: Detailed configuration options
- Performance Tuning: Optimization guidelines
Contributing
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- SWIM protocol paper authors: Abhinandan Das, Indranil Gupta, Ashish Motivala
- HashiCorp Serf and Memberlist projects for Lifeguard enhancements
- ZeroMQ community for messaging patterns and best practices
Roadmap
- v1.1.0: Encryption and authentication support
- v1.2.0: IPv6 support and enhanced security features
- v1.3.0: Advanced clustering algorithms and optimizations
SWIM P2P v1.0.0 - Production-ready distributed membership protocol implementation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file swim_p2p-1.2.6.tar.gz.
File metadata
- Download URL: swim_p2p-1.2.6.tar.gz
- Upload date:
- Size: 289.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b776e93f79497ee77cd1d8b79567273a9419f6cc4c8db114d41b0af6750921e
|
|
| MD5 |
89d4d8807d2949839341455ae83ce3a9
|
|
| BLAKE2b-256 |
b50ca7d3aa6e4cd74e1c072a3a913c9965a998603c7c2114b8bafeb9ac73c277
|
File details
Details for the file swim_p2p-1.2.6-py3-none-any.whl.
File metadata
- Download URL: swim_p2p-1.2.6-py3-none-any.whl
- Upload date:
- Size: 243.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
49a0cf8af47719ae27a5a2273d9617c4fcd4fc4cab49d9a5cf60faca575fe30f
|
|
| MD5 |
86318a520e2653d5e77bcd32b17490e9
|
|
| BLAKE2b-256 |
713fcf138f3fb4488a7770e53d57fc99d392118339a7a9882d3f4c0b7d1ebbd0
|