Skip to main content

SWIM Protocol implementation for P2P membership and failure detection with ZeroMQ integration

Project description

SWIM P2P

A production-ready Python implementation of the SWIM protocol (Scalable Weakly-consistent Infection-style Process Group Membership) for peer-to-peer membership and failure detection in distributed systems.

Overview

SWIM is a gossip-based protocol for maintaining membership in distributed systems. This implementation provides:

  • Scalable membership management with constant message load per node
  • Weakly-consistent membership view across all nodes
  • Failure detection with configurable timeouts and suspicion mechanisms
  • Efficient dissemination of membership changes via piggybacked gossip
  • ZeroMQ integration for reliable application-level messaging
  • Comprehensive metrics and monitoring capabilities
  • Production-ready with extensive testing and documentation

Installation

Install from PyPI:

pip install swim_p2p

Or for development (editable install):

git clone <your-repo-url>
cd swim_p2p
pip install -e .

Quick Start

Using the CLI (swim-node)

After installation, you can run a standalone SWIM node:

swim-node --addr 127.0.0.1:5000 --seeds 127.0.0.1:5001
  • --addr specifies the bind address (host:port).
  • --seeds is a comma-separated list of existing node addresses to join.

As a library in your Python code

from swim.protocol.node import Node
from swim.transport.hybrid import HybridTransport
from swim.protocol.member import MemberList
from swim.protocol.failure_detector import FailureDetector
from swim.protocol.disseminator import GossipService

# Create transport
transport = HybridTransport()
await transport.bind(("127.0.0.1", 5000))

# Create SWIM components
members = MemberList()
failure_detector = FailureDetector(transport, members)
gossip = GossipService(transport, members)

# Create and start node
node = Node(transport, members, failure_detector, gossip)
await node.start()

# Join cluster
await node.join_cluster([("127.0.0.1", 5001)])

# Get cluster state
alive_members = members.get_alive_members()
print(f"Cluster members: {alive_members}")

ZeroMQ Integration

For reliable messaging between nodes:

from swim.integration.agent import ZMQAgentIntegration

# Create ZMQ agent
zmq_agent = ZMQAgentIntegration(
    node_id="127.0.0.1:5000",
    bind_address="127.0.0.1:6000",
    event_dispatcher=event_dispatcher,
    config={"NODE_NAME": "NodeA"}
)

await zmq_agent.start()

# Send reliable message
success = await zmq_agent.send_automated_check_in("127.0.0.1:5001")

Architecture

The library is organized into several key components:

Core Protocol (swim.protocol)

  • Member Management: Track cluster membership and metadata
  • Failure Detection: Detect node failures using SWIM algorithm
  • Gossip Dissemination: Spread membership changes efficiently
  • Node Coordination: Orchestrate all protocol components

Transport Layer (swim.transport)

  • UDP Transport: High-performance for local networks
  • TCP Transport: Reliable for WAN deployments
  • Hybrid Transport: Automatic selection based on conditions

ZeroMQ Integration (swim.integration)

  • Reliable Messaging: Guaranteed message delivery with acknowledgments
  • Circuit Breakers: Fail-fast behavior for unhealthy nodes
  • Flow Control: Prevent overwhelming slow nodes
  • Connection Management: Automatic connection pooling

Metrics & Monitoring (swim.metrics)

  • Protocol Metrics: Membership changes, failure detection times
  • Network Metrics: Bandwidth usage, message rates, latency
  • Performance Metrics: CPU usage, memory consumption
  • Prometheus Integration: Compatible metrics endpoint

Lifeguard Enhancements (swim.lifeguard)

  • Adaptive Probe Rates: Dynamic adjustment based on network conditions
  • Suspicion Timeout Optimization: Intelligent timeout calculation
  • Network Condition Awareness: Automatic adaptation to network quality
  • False Positive Reduction: Advanced algorithms to minimize incorrect failures

Project Structure

swim_p2p/                          # Repository root
├── pyproject.toml                 # Build configuration and dependencies
├── README.md                      # This file - project overview and quick start
├── CHANGELOG.md                   # Version history and release notes
├── CONTRIBUTING.md                # Contribution guidelines
├── LICENSE                        # MIT license
├── MANIFEST.in                    # Package manifest for non-Python files
├── requirements.txt               # Development dependencies
│
├── src/                          # Source code directory
│   └── swim/                     # Main package
│       ├── __init__.py           # Package initialization and exports
│       ├── __version__.py        # Version information
│       ├── py.typed              # Type checking marker
│       ├── main.py               # CLI entry point (swim-node command)
│       ├── config.py             # Configuration management and validation
│       │
│       ├── protocol/             # Core SWIM protocol implementation
│       │   ├── __init__.py
│       │   ├── member.py         # Cluster membership management
│       │   ├── failure_detector.py # SWIM failure detection algorithm
│       │   ├── disseminator.py   # Gossip-based message dissemination
│       │   ├── node.py           # Main node coordination and orchestration
│       │   ├── message.py        # Protocol message definitions and serialization
│       │   └── sync.py           # Cluster synchronization mechanisms
│       │
│       ├── transport/            # Network transport layer
│       │   ├── __init__.py
│       │   ├── base.py           # Abstract transport interface
│       │   ├── udp.py            # High-performance UDP transport
│       │   ├── tcp.py            # Reliable TCP transport
│       │   └── hybrid.py         # Intelligent transport selection
│       │
│       ├── integration/          # ZeroMQ integration layer
│       │   ├── __init__.py
│       │   ├── agent.py          # Main ZMQ agent integration orchestrator
│       │   ├── load_balancer.py  # Load balancing for ZMQ connections
│       │   ├── message_router.py # Message routing and distribution
│       │   │
│       │   ├── messaging/        # Reliable messaging components
│       │   │   ├── __init__.py
│       │   │   ├── ack_system.py # Message acknowledgment system
│       │   │   ├── buffer_monitor.py # Buffer overflow monitoring
│       │   │   ├── circuit_breaker.py # Circuit breaker pattern implementation
│       │   │   ├── congestion.py # Congestion control mechanisms
│       │   │   ├── message_registry.py # Message tracking and management
│       │   │   ├── reliability.py # Reliable message delivery
│       │   │   ├── trace.py      # Distributed tracing support
│       │   │   └── workflow.py   # Message workflow management
│       │   │
│       │   └── zmq/              # ZeroMQ-specific components
│       │       ├── __init__.py
│       │       ├── capacity_tracker.py # Connection capacity monitoring
│       │       ├── connection_manager.py # ZMQ connection lifecycle
│       │       ├── dealer.py      # DEALER socket management
│       │       ├── flow_control.py # Flow control and backpressure
│       │       ├── monitoring.py  # ZMQ socket monitoring
│       │       ├── ordering.py    # Message ordering guarantees
│       │       └── router.py      # ROUTER socket management
│       │
│       ├── lifeguard/            # Reliability enhancements
│       │   ├── __init__.py
│       │   ├── awareness.py      # Network condition awareness
│       │   ├── probe_rate.py     # Adaptive probe rate adjustment
│       │   └── timing.py         # Intelligent timeout calculation
│       │
│       ├── metrics/              # Metrics and monitoring
│       │   ├── __init__.py
│       │   ├── collector.py      # Metrics collection and aggregation
│       │   ├── latency.py        # Latency measurement and tracking
│       │   ├── bandwidth.py      # Bandwidth usage monitoring
│       │   ├── metrics_cli.py    # CLI for metrics access
│       │   │
│       │   └── api/              # Metrics API components
│       │       ├── __init__.py
│       │       ├── cli.py        # Command-line metrics interface
│       │       ├── client.py     # Metrics API client
│       │       ├── integration.py # Integration with external systems
│       │       └── server.py     # Metrics API server
│       │
│       ├── events/               # Event-driven architecture
│       │   ├── __init__.py
│       │   ├── dispatcher.py     # Event dispatching and routing
│       │   ├── handlers.py       # Event handler implementations
│       │   └── types.py          # Event type definitions
│       │
│       ├── diagnostics/          # Diagnostic and debugging tools
│       │   ├── __init__.py
│       │   ├── health_checker.py # Health check implementations
│       │   ├── message_tracer.py # Message tracing and debugging
│       │   └── performance_analyzer.py # Performance analysis tools
│       │
│       └── utils/                # Utility functions and helpers
│           ├── __init__.py
│           ├── logging.py        # Structured logging configuration
│           ├── network.py        # Network utility functions
│           ├── rate_limiter.py   # Rate limiting utilities
│           └── serialization.py  # Data serialization helpers
│
├── tests/                        # Test suite
│   ├── __init__.py
│   ├── unit/                     # Unit tests
│   │   ├── __init__.py
│   │   ├── events/               # Event system tests
│   │   │   ├── test_dispatcher.py
│   │   │   ├── test_handlers.py
│   │   │   └── test_types.py
│   │   ├── lifeguard/            # Reliability enhancement tests
│   │   │   ├── test_awareness.py
│   │   │   ├── test_network_stress.py
│   │   │   ├── test_probe_rate.py
│   │   │   └── test_timing.py
│   │   ├── metrics/              # Metrics system tests
│   │   │   ├── test_bandwidth_monitor.py
│   │   │   ├── test_latency_tracker.py
│   │   │   └── test_metrics_collector.py
│   │   ├── protocol/             # Core protocol tests
│   │   │   ├── test_disseminator.py
│   │   │   ├── test_failure_detector.py
│   │   │   ├── test_member.py
│   │   │   ├── test_message.py
│   │   │   ├── test_node.py
│   │   │   └── test_sync.py
│   │   ├── transport/            # Transport layer tests
│   │   │   ├── __init__.py
│   │   │   ├── test_base.py
│   │   │   ├── test_hybrid.py
│   │   │   ├── test_tcp.py
│   │   │   ├── test_transport_fallback.py
│   │   │   └── test_udp.py
│   │   └── utils/                # Utility function tests
│   │       ├── test_logging.py
│   │       ├── test_network.py
│   │       ├── test_rate_limiter.py
│   │       └── test_serialization.py
│   │
│   ├── integration/              # Integration tests
│   │   ├── network_stress.py     # Network stress testing
│   │   ├── send_big_message.py   # Large message handling tests
│   │   ├── test_message_size.py  # Message size limit tests
│   │   ├── test_swim_integration.py # End-to-end SWIM tests
│   │   └── test_udp_fallback.py # Transport fallback tests
│   │
│   └── swim_zmq/                 # ZMQ integration tests
│       ├── test_both_one_n_two.py # Multi-node ZMQ tests
│       ├── test_enhanced_phase2.py # Enhanced ZMQ functionality tests
│       ├── test_phase_one.py     # Basic ZMQ integration tests
│       └── test_phase_two.py     # Advanced ZMQ feature tests
│
├── examples/                     # Example applications
│   └── transport_example.py      # Transport layer usage example
│
└── docs/                         # Additional documentation
    └── COMPREHENSIVE_DOCUMENTATION.md # Complete API reference

Testing

Run the comprehensive test suite:

# Unit tests
pytest tests/unit/ -v

# Integration tests
pytest tests/integration/ -v

# All tests with coverage
pytest --cov=swim --cov-report=html

Performance

  • Cluster Size: Successfully tested with 1000+ nodes
  • Message Throughput: 10,000+ messages/second per node
  • Failure Detection Time: < 5 seconds average (configurable)
  • Memory Usage: ~50MB per node baseline
  • CPU Usage: < 5% on modern hardware for typical workloads

Configuration

The library supports extensive configuration options:

config = {
    # Protocol timing
    "protocol_period": 1.0,
    "failure_timeout": 5.0,
    "suspect_timeout": 3.0,
    
    # Transport settings
    "transport_type": "hybrid",
    "udp_max_size": 1400,
    
    # ZMQ settings
    "zmq_port_offset": 1000,
    "circuit_breaker_threshold": 5,
    
    # Metrics
    "enable_metrics": True,
    "metrics_port": 8080
}

Documentation

Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • SWIM protocol paper authors: Abhinandan Das, Indranil Gupta, Ashish Motivala
  • HashiCorp Serf and Memberlist projects for Lifeguard enhancements
  • ZeroMQ community for messaging patterns and best practices

Roadmap

  • v1.1.0: Encryption and authentication support
  • v1.2.0: IPv6 support and enhanced security features
  • v1.3.0: Advanced clustering algorithms and optimizations

SWIM P2P v1.0.0 - Production-ready distributed membership protocol implementation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swim_p2p-1.2.6.tar.gz (289.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swim_p2p-1.2.6-py3-none-any.whl (243.7 kB view details)

Uploaded Python 3

File details

Details for the file swim_p2p-1.2.6.tar.gz.

File metadata

  • Download URL: swim_p2p-1.2.6.tar.gz
  • Upload date:
  • Size: 289.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for swim_p2p-1.2.6.tar.gz
Algorithm Hash digest
SHA256 0b776e93f79497ee77cd1d8b79567273a9419f6cc4c8db114d41b0af6750921e
MD5 89d4d8807d2949839341455ae83ce3a9
BLAKE2b-256 b50ca7d3aa6e4cd74e1c072a3a913c9965a998603c7c2114b8bafeb9ac73c277

See more details on using hashes here.

File details

Details for the file swim_p2p-1.2.6-py3-none-any.whl.

File metadata

  • Download URL: swim_p2p-1.2.6-py3-none-any.whl
  • Upload date:
  • Size: 243.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for swim_p2p-1.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 49a0cf8af47719ae27a5a2273d9617c4fcd4fc4cab49d9a5cf60faca575fe30f
MD5 86318a520e2653d5e77bcd32b17490e9
BLAKE2b-256 713fcf138f3fb4488a7770e53d57fc99d392118339a7a9882d3f4c0b7d1ebbd0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page