Skip to main content

SWIM Protocol implementation for P2P membership and failure detection with ZeroMQ integration

Project description

SWIM P2P

A production-ready Python implementation of the SWIM protocol (Scalable Weakly-consistent Infection-style Process Group Membership) for peer-to-peer membership and failure detection in distributed systems.

Overview

SWIM is a gossip-based protocol for maintaining membership in distributed systems. This implementation provides:

  • Scalable membership management with constant message load per node
  • Weakly-consistent membership view across all nodes
  • Failure detection with configurable timeouts and suspicion mechanisms
  • Efficient dissemination of membership changes via piggybacked gossip
  • ZeroMQ integration for reliable application-level messaging
  • Comprehensive metrics and monitoring capabilities
  • Production-ready with extensive testing and documentation

Installation

Install from PyPI:

pip install swim_p2p

Or for development (editable install):

git clone <your-repo-url>
cd swim_p2p
pip install -e .

Quick Start

Using the CLI (swim-node)

After installation, you can run a standalone SWIM node:

swim-node --addr 127.0.0.1:5000 --seeds 127.0.0.1:5001
  • --addr specifies the bind address (host:port).
  • --seeds is a comma-separated list of existing node addresses to join.

As a library in your Python code

from swim.protocol.node import Node
from swim.transport.hybrid import HybridTransport
from swim.protocol.member import MemberList
from swim.protocol.failure_detector import FailureDetector
from swim.protocol.disseminator import GossipService

# Create transport
transport = HybridTransport()
await transport.bind(("127.0.0.1", 5000))

# Create SWIM components
members = MemberList()
failure_detector = FailureDetector(transport, members)
gossip = GossipService(transport, members)

# Create and start node
node = Node(transport, members, failure_detector, gossip)
await node.start()

# Join cluster
await node.join_cluster([("127.0.0.1", 5001)])

# Get cluster state
alive_members = members.get_alive_members()
print(f"Cluster members: {alive_members}")

ZeroMQ Integration

For reliable messaging between nodes:

from swim.integration.agent import ZMQAgentIntegration

# Create ZMQ agent
zmq_agent = ZMQAgentIntegration(
    node_id="127.0.0.1:5000",
    bind_address="127.0.0.1:6000",
    event_dispatcher=event_dispatcher,
    config={"NODE_NAME": "NodeA"}
)

await zmq_agent.start()

# Send reliable message
success = await zmq_agent.send_automated_check_in("127.0.0.1:5001")

Architecture

The library is organized into several key components:

Core Protocol (swim.protocol)

  • Member Management: Track cluster membership and metadata
  • Failure Detection: Detect node failures using SWIM algorithm
  • Gossip Dissemination: Spread membership changes efficiently
  • Node Coordination: Orchestrate all protocol components

Transport Layer (swim.transport)

  • UDP Transport: High-performance for local networks
  • TCP Transport: Reliable for WAN deployments
  • Hybrid Transport: Automatic selection based on conditions

ZeroMQ Integration (swim.integration)

  • Reliable Messaging: Guaranteed message delivery with acknowledgments
  • Circuit Breakers: Fail-fast behavior for unhealthy nodes
  • Flow Control: Prevent overwhelming slow nodes
  • Connection Management: Automatic connection pooling

Metrics & Monitoring (swim.metrics)

  • Protocol Metrics: Membership changes, failure detection times
  • Network Metrics: Bandwidth usage, message rates, latency
  • Performance Metrics: CPU usage, memory consumption
  • Prometheus Integration: Compatible metrics endpoint

Lifeguard Enhancements (swim.lifeguard)

  • Adaptive Probe Rates: Dynamic adjustment based on network conditions
  • Suspicion Timeout Optimization: Intelligent timeout calculation
  • Network Condition Awareness: Automatic adaptation to network quality
  • False Positive Reduction: Advanced algorithms to minimize incorrect failures

Project Structure

swim_p2p/                          # Repository root
├── pyproject.toml                 # Build configuration and dependencies
├── README.md                      # This file - project overview and quick start
├── CHANGELOG.md                   # Version history and release notes
├── CONTRIBUTING.md                # Contribution guidelines
├── LICENSE                        # MIT license
├── MANIFEST.in                    # Package manifest for non-Python files
├── requirements.txt               # Development dependencies
│
├── src/                          # Source code directory
│   └── swim/                     # Main package
│       ├── __init__.py           # Package initialization and exports
│       ├── __version__.py        # Version information
│       ├── py.typed              # Type checking marker
│       ├── main.py               # CLI entry point (swim-node command)
│       ├── config.py             # Configuration management and validation
│       │
│       ├── protocol/             # Core SWIM protocol implementation
│       │   ├── __init__.py
│       │   ├── member.py         # Cluster membership management
│       │   ├── failure_detector.py # SWIM failure detection algorithm
│       │   ├── disseminator.py   # Gossip-based message dissemination
│       │   ├── node.py           # Main node coordination and orchestration
│       │   ├── message.py        # Protocol message definitions and serialization
│       │   └── sync.py           # Cluster synchronization mechanisms
│       │
│       ├── transport/            # Network transport layer
│       │   ├── __init__.py
│       │   ├── base.py           # Abstract transport interface
│       │   ├── udp.py            # High-performance UDP transport
│       │   ├── tcp.py            # Reliable TCP transport
│       │   └── hybrid.py         # Intelligent transport selection
│       │
│       ├── integration/          # ZeroMQ integration layer
│       │   ├── __init__.py
│       │   ├── agent.py          # Main ZMQ agent integration orchestrator
│       │   ├── load_balancer.py  # Load balancing for ZMQ connections
│       │   ├── message_router.py # Message routing and distribution
│       │   │
│       │   ├── messaging/        # Reliable messaging components
│       │   │   ├── __init__.py
│       │   │   ├── ack_system.py # Message acknowledgment system
│       │   │   ├── buffer_monitor.py # Buffer overflow monitoring
│       │   │   ├── circuit_breaker.py # Circuit breaker pattern implementation
│       │   │   ├── congestion.py # Congestion control mechanisms
│       │   │   ├── message_registry.py # Message tracking and management
│       │   │   ├── reliability.py # Reliable message delivery
│       │   │   ├── trace.py      # Distributed tracing support
│       │   │   └── workflow.py   # Message workflow management
│       │   │
│       │   └── zmq/              # ZeroMQ-specific components
│       │       ├── __init__.py
│       │       ├── capacity_tracker.py # Connection capacity monitoring
│       │       ├── connection_manager.py # ZMQ connection lifecycle
│       │       ├── dealer.py      # DEALER socket management
│       │       ├── flow_control.py # Flow control and backpressure
│       │       ├── monitoring.py  # ZMQ socket monitoring
│       │       ├── ordering.py    # Message ordering guarantees
│       │       └── router.py      # ROUTER socket management
│       │
│       ├── lifeguard/            # Reliability enhancements
│       │   ├── __init__.py
│       │   ├── awareness.py      # Network condition awareness
│       │   ├── probe_rate.py     # Adaptive probe rate adjustment
│       │   └── timing.py         # Intelligent timeout calculation
│       │
│       ├── metrics/              # Metrics and monitoring
│       │   ├── __init__.py
│       │   ├── collector.py      # Metrics collection and aggregation
│       │   ├── latency.py        # Latency measurement and tracking
│       │   ├── bandwidth.py      # Bandwidth usage monitoring
│       │   ├── metrics_cli.py    # CLI for metrics access
│       │   │
│       │   └── api/              # Metrics API components
│       │       ├── __init__.py
│       │       ├── cli.py        # Command-line metrics interface
│       │       ├── client.py     # Metrics API client
│       │       ├── integration.py # Integration with external systems
│       │       └── server.py     # Metrics API server
│       │
│       ├── events/               # Event-driven architecture
│       │   ├── __init__.py
│       │   ├── dispatcher.py     # Event dispatching and routing
│       │   ├── handlers.py       # Event handler implementations
│       │   └── types.py          # Event type definitions
│       │
│       ├── diagnostics/          # Diagnostic and debugging tools
│       │   ├── __init__.py
│       │   ├── health_checker.py # Health check implementations
│       │   ├── message_tracer.py # Message tracing and debugging
│       │   └── performance_analyzer.py # Performance analysis tools
│       │
│       └── utils/                # Utility functions and helpers
│           ├── __init__.py
│           ├── logging.py        # Structured logging configuration
│           ├── network.py        # Network utility functions
│           ├── rate_limiter.py   # Rate limiting utilities
│           └── serialization.py  # Data serialization helpers
│
├── tests/                        # Test suite
│   ├── __init__.py
│   ├── unit/                     # Unit tests
│   │   ├── __init__.py
│   │   ├── events/               # Event system tests
│   │   │   ├── test_dispatcher.py
│   │   │   ├── test_handlers.py
│   │   │   └── test_types.py
│   │   ├── lifeguard/            # Reliability enhancement tests
│   │   │   ├── test_awareness.py
│   │   │   ├── test_network_stress.py
│   │   │   ├── test_probe_rate.py
│   │   │   └── test_timing.py
│   │   ├── metrics/              # Metrics system tests
│   │   │   ├── test_bandwidth_monitor.py
│   │   │   ├── test_latency_tracker.py
│   │   │   └── test_metrics_collector.py
│   │   ├── protocol/             # Core protocol tests
│   │   │   ├── test_disseminator.py
│   │   │   ├── test_failure_detector.py
│   │   │   ├── test_member.py
│   │   │   ├── test_message.py
│   │   │   ├── test_node.py
│   │   │   └── test_sync.py
│   │   ├── transport/            # Transport layer tests
│   │   │   ├── __init__.py
│   │   │   ├── test_base.py
│   │   │   ├── test_hybrid.py
│   │   │   ├── test_tcp.py
│   │   │   ├── test_transport_fallback.py
│   │   │   └── test_udp.py
│   │   └── utils/                # Utility function tests
│   │       ├── test_logging.py
│   │       ├── test_network.py
│   │       ├── test_rate_limiter.py
│   │       └── test_serialization.py
│   │
│   ├── integration/              # Integration tests
│   │   ├── network_stress.py     # Network stress testing
│   │   ├── send_big_message.py   # Large message handling tests
│   │   ├── test_message_size.py  # Message size limit tests
│   │   ├── test_swim_integration.py # End-to-end SWIM tests
│   │   └── test_udp_fallback.py # Transport fallback tests
│   │
│   └── swim_zmq/                 # ZMQ integration tests
│       ├── test_both_one_n_two.py # Multi-node ZMQ tests
│       ├── test_enhanced_phase2.py # Enhanced ZMQ functionality tests
│       ├── test_phase_one.py     # Basic ZMQ integration tests
│       └── test_phase_two.py     # Advanced ZMQ feature tests
│
├── examples/                     # Example applications
│   └── transport_example.py      # Transport layer usage example
│
└── docs/                         # Additional documentation
    └── COMPREHENSIVE_DOCUMENTATION.md # Complete API reference

Testing

Run the comprehensive test suite:

# Unit tests
pytest tests/unit/ -v

# Integration tests
pytest tests/integration/ -v

# All tests with coverage
pytest --cov=swim --cov-report=html

Performance

  • Cluster Size: Successfully tested with 1000+ nodes
  • Message Throughput: 10,000+ messages/second per node
  • Failure Detection Time: < 5 seconds average (configurable)
  • Memory Usage: ~50MB per node baseline
  • CPU Usage: < 5% on modern hardware for typical workloads

Configuration

The library supports extensive configuration options:

config = {
    # Protocol timing
    "protocol_period": 1.0,
    "failure_timeout": 5.0,
    "suspect_timeout": 3.0,
    
    # Transport settings
    "transport_type": "hybrid",
    "udp_max_size": 1400,
    
    # ZMQ settings
    "zmq_port_offset": 1000,
    "circuit_breaker_threshold": 5,
    
    # Metrics
    "enable_metrics": True,
    "metrics_port": 8080
}

Documentation

Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • SWIM protocol paper authors: Abhinandan Das, Indranil Gupta, Ashish Motivala
  • HashiCorp Serf and Memberlist projects for Lifeguard enhancements
  • ZeroMQ community for messaging patterns and best practices

Roadmap

  • v1.1.0: Encryption and authentication support
  • v1.2.0: IPv6 support and enhanced security features
  • v1.3.0: Advanced clustering algorithms and optimizations

SWIM P2P v1.0.0 - Production-ready distributed membership protocol implementation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

swim_p2p-1.2.3.tar.gz (287.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

swim_p2p-1.2.3-py3-none-any.whl (243.7 kB view details)

Uploaded Python 3

File details

Details for the file swim_p2p-1.2.3.tar.gz.

File metadata

  • Download URL: swim_p2p-1.2.3.tar.gz
  • Upload date:
  • Size: 287.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for swim_p2p-1.2.3.tar.gz
Algorithm Hash digest
SHA256 46c58ce12a7e6f54d4badabd933f84e49d544046b17dd11277b2eba7f80124a6
MD5 82366c3e7f09ddaf225869190f7fe70f
BLAKE2b-256 1af1572ee68804254e88a5e1928ff7807a3c4001b9a33bc3f349e7721e73cdec

See more details on using hashes here.

File details

Details for the file swim_p2p-1.2.3-py3-none-any.whl.

File metadata

  • Download URL: swim_p2p-1.2.3-py3-none-any.whl
  • Upload date:
  • Size: 243.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for swim_p2p-1.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 27ceb527d8c9bfd627b237290e586e9b60290e0d0bc0ffd8ed033cd6d465319e
MD5 5f1d55dfa03d1e41b903e0c7d6b95a0d
BLAKE2b-256 57254d40b101ceeb649c1691b7f72c8a6ec5cd6654ca959a7c4b715e4cc75f13

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page