Skip to main content

SWIM Protocol implementation for P2P membership and failure detection with ZeroMQ integration

Project description

SWIM P2P

A production-ready Python implementation of the SWIM protocol (Scalable Weakly-consistent Infection-style Process Group Membership) for peer-to-peer membership and failure detection in distributed systems.

Overview

SWIM is a gossip-based protocol for maintaining membership in distributed systems. This implementation provides:

  • Scalable membership management with constant message load per node
  • Weakly-consistent membership view across all nodes
  • Failure detection with configurable timeouts and suspicion mechanisms
  • Efficient dissemination of membership changes via piggybacked gossip
  • ZeroMQ integration for reliable application-level messaging
  • Comprehensive metrics and monitoring capabilities
  • Production-ready with extensive testing and documentation

Installation

Install from PyPI:

pip install swim_p2p

Or for development (editable install):

git clone <your-repo-url>
cd swim_p2p
pip install -e .

Quick Start

Using the CLI (swim-node)

After installation, you can run a standalone SWIM node:

swim-node --addr 127.0.0.1:5000 --seeds 127.0.0.1:5001
  • --addr specifies the bind address (host:port).
  • --seeds is a comma-separated list of existing node addresses to join.

As a library in your Python code

from swim.protocol.node import Node
from swim.transport.hybrid import HybridTransport
from swim.protocol.member import MemberList
from swim.protocol.failure_detector import FailureDetector
from swim.protocol.disseminator import GossipService

# Create transport
transport = HybridTransport()
await transport.bind(("127.0.0.1", 5000))

# Create SWIM components
members = MemberList()
failure_detector = FailureDetector(transport, members)
gossip = GossipService(transport, members)

# Create and start node
node = Node(transport, members, failure_detector, gossip)
await node.start()

# Join cluster
await node.join_cluster([("127.0.0.1", 5001)])

# Get cluster state
alive_members = members.get_alive_members()
print(f"Cluster members: {alive_members}")

ZeroMQ Integration

For reliable messaging between nodes:

from swim.integration.agent import ZMQAgentIntegration

# Create ZMQ agent
zmq_agent = ZMQAgentIntegration(
    node_id="127.0.0.1:5000",
    bind_address="127.0.0.1:6000",
    event_dispatcher=event_dispatcher,
    config={"NODE_NAME": "NodeA"}
)

await zmq_agent.start()

# Send reliable message
success = await zmq_agent.send_automated_check_in("127.0.0.1:5001")

Architecture

The library is organized into several key components:

Core Protocol (swim.protocol)

  • Member Management: Track cluster membership and metadata
  • Failure Detection: Detect node failures using SWIM algorithm
  • Gossip Dissemination: Spread membership changes efficiently
  • Node Coordination: Orchestrate all protocol components

Transport Layer (swim.transport)

  • UDP Transport: High-performance for local networks
  • TCP Transport: Reliable for WAN deployments
  • Hybrid Transport: Automatic selection based on conditions

ZeroMQ Integration (swim.integration)

  • Reliable Messaging: Guaranteed message delivery with acknowledgments
  • Circuit Breakers: Fail-fast behavior for unhealthy nodes
  • Flow Control: Prevent overwhelming slow nodes
  • Connection Management: Automatic connection pooling

Metrics & Monitoring (swim.metrics)

  • Protocol Metrics: Membership changes, failure detection times
  • Network Metrics: Bandwidth usage, message rates, latency
  • Performance Metrics: CPU usage, memory consumption
  • Prometheus Integration: Compatible metrics endpoint

Lifeguard Enhancements (swim.lifeguard)

  • Adaptive Probe Rates: Dynamic adjustment based on network conditions
  • Suspicion Timeout Optimization: Intelligent timeout calculation
  • Network Condition Awareness: Automatic adaptation to network quality
  • False Positive Reduction: Advanced algorithms to minimize incorrect failures

Project Structure

swim_p2p/                          # Repository root
├── pyproject.toml                 # Build configuration and dependencies
├── README.md                      # This file - project overview and quick start
├── CHANGELOG.md                   # Version history and release notes
├── CONTRIBUTING.md                # Contribution guidelines
├── LICENSE                        # MIT license
├── MANIFEST.in                    # Package manifest for non-Python files
├── requirements.txt               # Development dependencies
│
├── src/                          # Source code directory
│   └── swim/                     # Main package
│       ├── __init__.py           # Package initialization and exports
│       ├── __version__.py        # Version information
│       ├── py.typed              # Type checking marker
│       ├── main.py               # CLI entry point (swim-node command)
│       ├── config.py             # Configuration management and validation
│       │
│       ├── protocol/             # Core SWIM protocol implementation
│       │   ├── __init__.py
│       │   ├── member.py         # Cluster membership management
│       │   ├── failure_detector.py # SWIM failure detection algorithm
│       │   ├── disseminator.py   # Gossip-based message dissemination
│       │   ├── node.py           # Main node coordination and orchestration
│       │   ├── message.py        # Protocol message definitions and serialization
│       │   └── sync.py           # Cluster synchronization mechanisms
│       │
│       ├── transport/            # Network transport layer
│       │   ├── __init__.py
│       │   ├── base.py           # Abstract transport interface
│       │   ├── udp.py            # High-performance UDP transport
│       │   ├── tcp.py            # Reliable TCP transport
│       │   └── hybrid.py         # Intelligent transport selection
│       │
│       ├── integration/          # ZeroMQ integration layer
│       │   ├── __init__.py
│       │   ├── agent.py          # Main ZMQ agent integration orchestrator
│       │   ├── load_balancer.py  # Load balancing for ZMQ connections
│       │   ├── message_router.py # Message routing and distribution
│       │   │
│       │   ├── messaging/        # Reliable messaging components
│       │   │   ├── __init__.py
│       │   │   ├── ack_system.py # Message acknowledgment system
│       │   │   ├── buffer_monitor.py # Buffer overflow monitoring
│       │   │   ├── circuit_breaker.py # Circuit breaker pattern implementation
│       │   │   ├── congestion.py # Congestion control mechanisms
│       │   │   ├── message_registry.py # Message tracking and management
│       │   │   ├── reliability.py # Reliable message delivery
│       │   │   ├── trace.py      # Distributed tracing support
│       │   │   └── workflow.py   # Message workflow management
│       │   │
│       │   └── zmq/              # ZeroMQ-specific components
│       │       ├── __init__.py
│       │       ├── capacity_tracker.py # Connection capacity monitoring
│       │       ├── connection_manager.py # ZMQ connection lifecycle
│       │       ├── dealer.py      # DEALER socket management
│       │       ├── flow_control.py # Flow control and backpressure
│       │       ├── monitoring.py  # ZMQ socket monitoring
│       │       ├── ordering.py    # Message ordering guarantees
│       │       └── router.py      # ROUTER socket management
│       │
│       ├── lifeguard/            # Reliability enhancements
│       │   ├── __init__.py
│       │   ├── awareness.py      # Network condition awareness
│       │   ├── probe_rate.py     # Adaptive probe rate adjustment
│       │   └── timing.py         # Intelligent timeout calculation
│       │
│       ├── metrics/              # Metrics and monitoring
│       │   ├── __init__.py
│       │   ├── collector.py      # Metrics collection and aggregation
│       │   ├── latency.py        # Latency measurement and tracking
│       │   ├── bandwidth.py      # Bandwidth usage monitoring
│       │   ├── metrics_cli.py    # CLI for metrics access
│       │   │
│       │   └── api/              # Metrics API components
│       │       ├── __init__.py
│       │       ├── cli.py        # Command-line metrics interface
│       │       ├── client.py     # Metrics API client
│       │       ├── integration.py # Integration with external systems
│       │       └── server.py     # Metrics API server
│       │
│       ├── events/               # Event-driven architecture
│       │   ├── __init__.py
│       │   ├── dispatcher.py     # Event dispatching and routing
│       │   ├── handlers.py       # Event handler implementations
│       │   └── types.py          # Event type definitions
│       │
│       ├── diagnostics/          # Diagnostic and debugging tools
│       │   ├── __init__.py
│       │   ├── health_checker.py # Health check implementations
│       │   ├── message_tracer.py # Message tracing and debugging
│       │   └── performance_analyzer.py # Performance analysis tools
│       │
│       └── utils/                # Utility functions and helpers
│           ├── __init__.py
│           ├── logging.py        # Structured logging configuration
│           ├── network.py        # Network utility functions
│           ├── rate_limiter.py   # Rate limiting utilities
│           └── serialization.py  # Data serialization helpers
│
├── tests/                        # Test suite
│   ├── __init__.py
│   ├── unit/                     # Unit tests
│   │   ├── __init__.py
│   │   ├── events/               # Event system tests
│   │   │   ├── test_dispatcher.py
│   │   │   ├── test_handlers.py
│   │   │   └── test_types.py
│   │   ├── lifeguard/            # Reliability enhancement tests
│   │   │   ├── test_awareness.py
│   │   │   ├── test_network_stress.py
│   │   │   ├── test_probe_rate.py
│   │   │   └── test_timing.py
│   │   ├── metrics/              # Metrics system tests
│   │   │   ├── test_bandwidth_monitor.py
│   │   │   ├── test_latency_tracker.py
│   │   │   └── test_metrics_collector.py
│   │   ├── protocol/             # Core protocol tests
│   │   │   ├── test_disseminator.py
│   │   │   ├── test_failure_detector.py
│   │   │   ├── test_member.py
│   │   │   ├── test_message.py
│   │   │   ├── test_node.py
│   │   │   └── test_sync.py
│   │   ├── transport/            # Transport layer tests
│   │   │   ├── __init__.py
│   │   │   ├── test_base.py
│   │   │   ├── test_hybrid.py
│   │   │   ├── test_tcp.py
│   │   │   ├── test_transport_fallback.py
│   │   │   └── test_udp.py
│   │   └── utils/                # Utility function tests
│   │       ├── test_logging.py
│   │       ├── test_network.py
│   │       ├── test_rate_limiter.py
│   │       └── test_serialization.py
│   │
│   ├── integration/              # Integration tests
│   │   ├── network_stress.py     # Network stress testing
│   │   ├── send_big_message.py   # Large message handling tests
│   │   ├── test_message_size.py  # Message size limit tests
│   │   ├── test_swim_integration.py # End-to-end SWIM tests
│   │   └── test_udp_fallback.py # Transport fallback tests
│   │
│   └── swim_zmq/                 # ZMQ integration tests
│       ├── test_both_one_n_two.py # Multi-node ZMQ tests
│       ├── test_enhanced_phase2.py # Enhanced ZMQ functionality tests
│       ├── test_phase_one.py     # Basic ZMQ integration tests
│       └── test_phase_two.py     # Advanced ZMQ feature tests
│
├── examples/                     # Example applications
│   └── transport_example.py      # Transport layer usage example
│
└── docs/                         # Additional documentation
    └── COMPREHENSIVE_DOCUMENTATION.md # Complete API reference

Testing

Run the comprehensive test suite:

# Unit tests
pytest tests/unit/ -v

# Integration tests
pytest tests/integration/ -v

# All tests with coverage
pytest --cov=swim --cov-report=html

Performance

  • Cluster Size: Successfully tested with 1000+ nodes
  • Message Throughput: 10,000+ messages/second per node
  • Failure Detection Time: < 5 seconds average (configurable)
  • Memory Usage: ~50MB per node baseline
  • CPU Usage: < 5% on modern hardware for typical workloads

Configuration

The library supports extensive configuration options:

config = {
    # Protocol timing
    "protocol_period": 1.0,
    "failure_timeout": 5.0,
    "suspect_timeout": 3.0,
    
    # Transport settings
    "transport_type": "hybrid",
    "udp_max_size": 1400,
    
    # ZMQ settings
    "zmq_port_offset": 1000,
    "circuit_breaker_threshold": 5,
    
    # Metrics
    "enable_metrics": True,
    "metrics_port": 8080
}

Documentation

Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • SWIM protocol paper authors: Abhinandan Das, Indranil Gupta, Ashish Motivala
  • HashiCorp Serf and Memberlist projects for Lifeguard enhancements
  • ZeroMQ community for messaging patterns and best practices

Roadmap

  • v1.1.0: Encryption and authentication support
  • v1.2.0: IPv6 support and enhanced security features
  • v1.3.0: Advanced clustering algorithms and optimizations

SWIM P2P v1.0.0 - Production-ready distributed membership protocol implementation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swim_p2p-1.2.0.tar.gz (287.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swim_p2p-1.2.0-py3-none-any.whl (243.7 kB view details)

Uploaded Python 3

File details

Details for the file swim_p2p-1.2.0.tar.gz.

File metadata

  • Download URL: swim_p2p-1.2.0.tar.gz
  • Upload date:
  • Size: 287.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for swim_p2p-1.2.0.tar.gz
Algorithm Hash digest
SHA256 c85e1fba418fbe65c32d602b6eade70b0a5b1eedd671f05d32d90b8c14d903ec
MD5 761e9bfdb9bc6e5dc0b2b6f1338460f3
BLAKE2b-256 119457d1d44e0b4ff9aed0ff5a9e2d486529141ab60844717e87ae8d2b2e2f56

See more details on using hashes here.

File details

Details for the file swim_p2p-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: swim_p2p-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 243.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for swim_p2p-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 eb4ef62af26bfc6e3a4b0842b89a3728fdd32da31f92abb17842a7cb5701843b
MD5 d1459d9b9693303ee12af8465917e0ca
BLAKE2b-256 2fb3fd1a284520c479feb80ce8ef124ddbff992b078b7d317e8e53e37363c736

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page