A Python package for peer-to-peer communication for swarmchestrate entities
Project description
Swarmchestrate communication library
Overview
SwchP2Pcom is a Python package for creating a peer to peer network.
Features
- Lightweight and Flexible: Minimal configuration required to start a server or join a network.
- Poetry Integration: Dependency management and virtual environment handled seamlessly.
Prerequisites
- Python 3.12 or later
Installation
From PyPI (Recommended)
Install the package directly from PyPI using pip:
pip install swchp2pcom
From Source
For development or the latest features, you can install from source. See the Developer README for detailed development setup instructions.
SwchPeer API Documentation
The SwchPeer class provides a high-level interface for creating peer-to-peer networks with automatic discovery, message handling, and network resilience features.
Initialization
from swchp2pcom import SwchPeer
agent = SwchPeer(
peer_id="my-peer-id", # Optional: Auto-generated if None
listen_ip="127.0.0.1", # Optional: IP to listen on
listen_port=8080, # Optional: Port to listen on
public_ip="192.168.1.100", # Optional: Advertised IP (defaults to listen_ip)
public_port=8080, # Optional: Advertised port (defaults to listen_port)
metadata={"type": "worker"}, # Optional: Peer metadata
enable_rejoin=True # Optional: Enable automatic reconnection
)
Network Operations
Joining Networks
enter(ip: str, port: int) -> Deferred
Join an existing peer network by connecting to a known peer.
# Returns a Deferred for asynchronous handling
deferred = agent.enter("192.168.1.100", 8080)
deferred.addCallback(lambda _: print("Successfully joined network"))
deferred.addErrback(lambda failure: print(f"Failed to join: {failure.getErrorMessage()}"))
Leaving Networks
leave() -> Deferred
Gracefully leave the network.
deferred = agent.leave()
deferred.addCallback(lambda _: print("Successfully left network"))
deferred.addErrback(lambda failure: print(f"Leave failed: {failure.getErrorMessage()}"))
Adding links to peers
connect(peer_id: str) -> Deferred
Connect to a specific known peer by their ID.
deferred = agent.connect("peer-uuid-123")
deferred.addCallback(lambda protocol: print("Connected to peer"))
Removing links to peers
disconnect(peer_id: str) -> Deferred
Disconnect from a specific peer.
deferred = agent.disconnect("peer-uuid-123")
deferred.addCallback(lambda _: print("Successfully disconnected from peer"))
deferred.addErrback(lambda failure: print(f"Disconnection failed: {failure.getErrorMessage()}"))
Messaging
Sending Messages
send(peer_id: Union[str, List[str]], message_type: str, payload: Dict[str, Any]) -> None
Send a targeted message to one or more specific peers.
# Send to a single peer
agent.send("peer-123", "chat", {
"text": "Hello!",
"timestamp": time.time()
})
# Send to multiple peers
agent.send(["peer-123", "peer-456", "peer-789"], "notification", {
"message": "System update available",
"priority": "high"
})
broadcast(message_type: str, payload: Dict[str, Any]) -> None
Send a message to all connected peers.
agent.broadcast("announcement", {
"message": "Server maintenance in 10 minutes",
"timestamp": time.time()
})
Message Handlers
register_message_handler(message_type: str, func: Callable) -> None
Register a custom handler for specific message types.
def handle_chat(sender_id, message):
print(f"Chat from {sender_id}: {message['payload']['text']}")
agent.register_message_handler("chat", handle_chat)
Event System
on(event_name: str, listener: Callable) -> SwchPeer
Register event listeners with method chaining support.
Available Events
entered - Triggered when the agent successfully enters the network
- Parameters: None
- Handler signature:
listener()
def on_entered():
print("Successfully joined the network!")
agent.on("entered", on_entered)
left - Triggered when the agent successfully leaves the network
- Parameters: None
- Handler signature:
listener()
def on_left():
print("Successfully left the network!")
agent.on("left", on_left)
peer:connected - Triggered when a peer establishes a direct connection to this agent
- Parameters:
peer_id(str) - The ID of the connected peer - Handler signature:
listener(peer_id: str)
def on_peer_connected(peer_id):
print(f"Direct connection established with peer: {peer_id}")
# You can now send messages directly to this peer
agent.on("peer:connected", on_peer_connected)
peer:disconnected - Triggered when a peer disconnects from this agent
- Parameters:
peer_id(str) - The ID of the disconnected peer - Handler signature:
listener(peer_id: str)
def on_peer_disconnected(peer_id):
print(f"Lost direct connection with peer: {peer_id}")
# This peer may still be reachable through other peers
agent.on("peer:disconnected", on_peer_disconnected)
peer:all_disconnected - Triggered when all peers disconnect from this agent (triggers rejoin if enabled)
- Parameters: None
- Handler signature:
listener()
def on_all_disconnected():
print("Lost all connections - isolated from network")
# If rejoin is enabled, automatic reconnection will start
agent.on("peer:all_disconnected", on_all_disconnected)
peer:discovered - Triggered when a new peer is discovered in the network (may not be directly connected)
- Parameters:
peer_id(str) - The ID of the discovered peer - Handler signature:
listener(peer_id: str)
def on_peer_discovered(peer_id):
print(f"New peer discovered in network: {peer_id}")
metadata = agent.get_peer_metadata(peer_id)
if metadata:
print(f"Peer type: {metadata.get('type', 'unknown')}")
agent.on("peer:discovered", on_peer_discovered)
peer:undiscovered - Triggered when a peer leaves the network entirely
- Parameters:
peer_id(str) - The ID of the peer that left - Handler signature:
listener(peer_id: str)
def on_peer_undiscovered(peer_id):
print(f"Peer left the network: {peer_id}")
# This peer is no longer reachable in the network
agent.on("peer:undiscovered", on_peer_undiscovered)
message - Triggered when any message is received
- Parameters:
event_data(dict) - Contains message informationpeer_id(str) - ID of the sendermessage_type(str) - Type of the messagepayload(dict) - The message payload
- Handler signature:
listener(event_data: dict)
def on_message(event_data):
sender = event_data['peer_id']
msg_type = event_data['message_type']
payload = event_data['payload']
print(f"Received {msg_type} from {sender}: {payload}")
agent.on("message", on_message)
Event Handler Chaining Example
agent.on("entered", lambda: print("Joined network")) \
.on("peer:connected", lambda peer_id: print(f"Connected: {peer_id}")) \
.on("peer:discovered", lambda peer_id: print(f"Discovered: {peer_id}")) \
.on("message", lambda event: print(f"Message from {event['peer_id']}: {event['message_type']}"))
Peer Discovery
find_peers(metadata: Optional[Dict] = None) -> List[str]
Search for peers based on metadata criteria.
# Find all peers
all_peers = agent.find_peers()
# Find peers by type
workers = agent.find_peers({"type": "worker"})
# Find peers with multiple criteria
specific_peers = agent.find_peers({
"universe": "production",
"type": "coordinator"
})
get_connected_peers() -> List[str]
Get list of currently connected peer IDs.
connected = agent.get_connected_peers()
print(f"Connected to {len(connected)} peers")
get_peer_metadata(peer_id: str) -> Optional[Dict[str, Any]]
Retrieve metadata for a specific peer.
metadata = agent.get_peer_metadata("peer-123")
if metadata:
print(f"Peer type: {metadata.get('type', 'unknown')}")
Network Information
get_connection_count() -> int
Get the current number of active connections.
count = agent.get_connection_count()
print(f"Active connections: {count}")
Rejoin Mechanism
The agent includes automatic network rejoin capabilities for resilience.
enable_rejoin() -> None
Enable automatic reconnection when all peers disconnect.
agent.enable_rejoin()
disable_rejoin() -> None
Disable automatic reconnection.
agent.disable_rejoin()
is_rejoin_enabled() -> bool
Check if rejoin mechanism is active.
if agent.is_rejoin_enabled():
print("Auto-rejoin is enabled")
is_rejoin_in_progress() -> bool
Check if a rejoin attempt is currently happening.
if agent.is_rejoin_in_progress():
print("Currently attempting to rejoin network")
Reactor Control
start() -> None
Start the Twisted reactor (blocking call).
agent.start() # Blocks until reactor stops
stop() -> None
Stop the Twisted reactor.
agent.stop()
Complete Example
from swchp2pcom import SwchPeer
import time
# Create agent
agent = SwchPeer(
peer_id="worker-1",
listen_ip="127.0.0.1",
listen_port=8081,
metadata={"type": "worker", "version": "1.0"}
)
# Set up message handler
def handle_task(sender_id, message):
task = message['payload']['task']
print(f"Received task from {sender_id}: {task}")
# Send response back
agent.send(sender_id, "task_result", {
"task_id": task.get("id"),
"result": "completed",
"timestamp": time.time()
})
agent.register_message_handler("task_request", handle_task)
# Set up event handlers
agent.on("peer:connected", lambda peer_id: print(f"New peer connected: {peer_id}")) \
.on("peer:discovered", lambda peer_id: print(f"Discovered peer: {peer_id}"))
# Join existing network
try:
deferred = agent.enter("127.0.0.1", 8080)
deferred.addCallback(lambda _: print("Successfully joined network"))
deferred.addErrback(lambda f: print(f"Failed to join: {f.getErrorMessage()}"))
# Start the agent (this blocks)
agent.start()
except KeyboardInterrupt:
print("Shutting down...")
leave_deferred = agent.leave()
leave_deferred.addCallback(lambda _: agent.stop())
Contact
For any questions or feedback, feel free to reach out:
Thank you for using SwchP2Pcom! 🎉
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file swchp2pcom-0.4.0-py3-none-any.whl.
File metadata
- Download URL: swchp2pcom-0.4.0-py3-none-any.whl
- Upload date:
- Size: 23.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c6d768f7c255fb07921bc5f562b67124d44fb214708ad518ddfd456be185ead9
|
|
| MD5 |
6d5ff829fa02bf6e008c477ca28c3dc5
|
|
| BLAKE2b-256 |
3f38a290474cc6e83ee0637b278eb5bf4e18e31e367af5ecd7be9d5b4c4aff20
|
Provenance
The following attestation bundles were made for swchp2pcom-0.4.0-py3-none-any.whl:
Publisher:
publish-python-package.yml on Swarmchestrate/lib_comm
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
swchp2pcom-0.4.0-py3-none-any.whl -
Subject digest:
c6d768f7c255fb07921bc5f562b67124d44fb214708ad518ddfd456be185ead9 - Sigstore transparency entry: 505359720
- Sigstore integration time:
-
Permalink:
Swarmchestrate/lib_comm@ec579c8e5c320b0022a97afefe7ea61e42b8c40b -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/Swarmchestrate
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-python-package.yml@ec579c8e5c320b0022a97afefe7ea61e42b8c40b -
Trigger Event:
push
-
Statement type: