A Python package for peer-to-peer communication for swarmchestrate entities
Project description
Swarmchestrate communication library
Overview
Swch_comm is a Python package for creating a peer to peer network.
Features
- Lightweight and Flexible: Minimal configuration required to start a server or join a network.
- Poetry Integration: Dependency management and virtual environment handled seamlessly.
Prerequisites
- Python 3.8 or later
- Poetry 1.2 or later
Development
Installing Poetry
If Poetry is not already installed on your system, follow these steps:
1. Install Poetry
Run the official installation script provided by Poetry:
curl -sSL https://install.python-poetry.org | python3 -
2. Verify Installation
Ensure Poetry is installed correctly by checking its version:
poetry --version
You should see something like:
Poetry version 1.x.x
Installation
-
Clone the repository:
git clone https://github.com/Swarmchestrate/lib_comm cd lib_comm
-
Install dependencies using Poetry:
poetry install -
(Optional) Activate the virtual environment:
poetry shellNote: Activating the virtual environment is optional. You can still use the package directly by prefacing your commands with
poetry run, which runs the script within Poetry's managed virtual environment. -
(Optional) Building the package:
poetry build -
Add new dependencies:
poetry add <package-name>
SwchPeer API Documentation
The SwchPeer class provides a high-level interface for creating peer-to-peer networks with automatic discovery, message handling, and network resilience features.
Initialization
from swchp2pcom import SwchPeer
agent = SwchPeer(
peer_id="my-peer-id", # Optional: Auto-generated if None
listen_ip="127.0.0.1", # Optional: IP to listen on
listen_port=8080, # Optional: Port to listen on
public_ip="192.168.1.100", # Optional: Advertised IP (defaults to listen_ip)
public_port=8080, # Optional: Advertised port (defaults to listen_port)
metadata={"type": "worker"}, # Optional: Peer metadata
enable_rejoin=True # Optional: Enable automatic reconnection
)
Network Operations
Joining Networks
enter(ip: str, port: int) -> Deferred
Join an existing peer network by connecting to a known peer.
# Returns a Deferred for asynchronous handling
deferred = agent.enter("192.168.1.100", 8080)
deferred.addCallback(lambda _: print("Successfully joined network"))
deferred.addErrback(lambda failure: print(f"Failed to join: {failure.getErrorMessage()}"))
connect(peer_id: str) -> Deferred
Connect to a specific known peer by their ID.
deferred = agent.connect("peer-uuid-123")
deferred.addCallback(lambda protocol: print("Connected to peer"))
Leaving Networks
disconnect(peer_id: str) -> Deferred
Disconnect from a specific peer.
deferred = agent.disconnect("peer-uuid-123")
deferred.addCallback(lambda _: print("Successfully disconnected from peer"))
deferred.addErrback(lambda failure: print(f"Disconnection failed: {failure.getErrorMessage()}"))
leave() -> Deferred
Gracefully leave the network.
deferred = agent.leave()
deferred.addCallback(lambda _: print("Successfully left network"))
deferred.addErrback(lambda failure: print(f"Leave failed: {failure.getErrorMessage()}"))
Messaging
Sending Messages
send(peer_id: str, message_type: str, payload: Dict[str, Any]) -> None
Send a targeted message to a specific peer.
agent.send("peer-123", "chat", {
"text": "Hello!",
"timestamp": time.time()
})
broadcast(message_type: str, payload: Dict[str, Any]) -> None
Send a message to all connected peers.
agent.broadcast("announcement", {
"message": "Server maintenance in 10 minutes",
"timestamp": time.time()
})
Message Handlers
register_message_handler(message_type: str, func: Callable) -> None
Register a custom handler for specific message types.
def handle_chat(sender_id, message):
print(f"Chat from {sender_id}: {message['payload']['text']}")
agent.register_message_handler("chat", handle_chat)
Event System
on(event_name: str, listener: Callable) -> SwchPeer
Register event listeners with method chaining support.
agent.on("entered", lambda: print("Successfully entered the network")) \
.on("left", lambda: print("Successfully left the network")) \
.on("peer:connected", lambda peer_id: print(f"New peer connected: {peer_id}")) \
.on("peer:discovered", lambda peer_id: print(f"Discovered peer: {peer_id}"))
Available Events
entered- When the agent successfully enters the networkleft- When the agent successfully leaves the networkpeer:connected- When a peer establishes a direct connectionpeer:disconnected- When a peer disconnectspeer:all_disconnected- When all peers disconnect (triggers rejoin if enabled)peer:discovered- When a new peer is discovered in the networkpeer:undiscovered- When a peer leaves the networkmessage- When any message is received
Peer Discovery
find_peers(metadata: Optional[Dict] = None) -> List[str]
Search for peers based on metadata criteria.
# Find all peers
all_peers = agent.find_peers()
# Find peers by type
workers = agent.find_peers({"type": "worker"})
# Find peers with multiple criteria
specific_peers = agent.find_peers({
"universe": "production",
"type": "coordinator"
})
get_connected_peers() -> List[str]
Get list of currently connected peer IDs.
connected = agent.get_connected_peers()
print(f"Connected to {len(connected)} peers")
get_peer_metadata(peer_id: str) -> Optional[Dict[str, Any]]
Retrieve metadata for a specific peer.
metadata = agent.get_peer_metadata("peer-123")
if metadata:
print(f"Peer type: {metadata.get('type', 'unknown')}")
Network Information
get_connection_count() -> int
Get the current number of active connections.
count = agent.get_connection_count()
print(f"Active connections: {count}")
Rejoin Mechanism
The agent includes automatic network rejoin capabilities for resilience.
enable_rejoin() -> None
Enable automatic reconnection when all peers disconnect.
agent.enable_rejoin()
disable_rejoin() -> None
Disable automatic reconnection.
agent.disable_rejoin()
is_rejoin_enabled() -> bool
Check if rejoin mechanism is active.
if agent.is_rejoin_enabled():
print("Auto-rejoin is enabled")
is_rejoin_in_progress() -> bool
Check if a rejoin attempt is currently happening.
if agent.is_rejoin_in_progress():
print("Currently attempting to rejoin network")
Reactor Control
start() -> None
Start the Twisted reactor (blocking call).
agent.start() # Blocks until reactor stops
stop() -> None
Stop the Twisted reactor.
agent.stop()
Complete Example
from swchp2pcom import SwchPeer
import time
# Create agent
agent = SwchPeer(
peer_id="worker-1",
listen_ip="127.0.0.1",
listen_port=8081,
metadata={"type": "worker", "version": "1.0"}
)
# Set up message handler
def handle_task(sender_id, message):
task = message['payload']['task']
print(f"Received task from {sender_id}: {task}")
# Send response back
agent.send(sender_id, "task_result", {
"task_id": task.get("id"),
"result": "completed",
"timestamp": time.time()
})
agent.register_message_handler("task_request", handle_task)
# Set up event handlers
agent.on("peer:connected", lambda peer_id: print(f"New peer connected: {peer_id}")) \
.on("peer:discovered", lambda peer_id: print(f"Discovered peer: {peer_id}"))
# Join existing network
try:
deferred = agent.enter("127.0.0.1", 8080)
deferred.addCallback(lambda _: print("Successfully joined network"))
deferred.addErrback(lambda f: print(f"Failed to join: {f.getErrorMessage()}"))
# Start the agent (this blocks)
agent.start()
except KeyboardInterrupt:
print("Shutting down...")
leave_deferred = agent.leave()
leave_deferred.addCallback(lambda _: agent.stop())
Contact
For any questions or feedback, feel free to reach out:
Thank you for using swch_comm! 🎉
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file swchp2pcom-0.3.0-py3-none-any.whl.
File metadata
- Download URL: swchp2pcom-0.3.0-py3-none-any.whl
- Upload date:
- Size: 22.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aba5beac3897067167e0be3893adbc2c505376ae90d5a37beceb5fd90116acd9
|
|
| MD5 |
ed91c95ecb70cb243dbb831f09c0e0d5
|
|
| BLAKE2b-256 |
3aba384d389de0b3fa9dd4830a7c88bed7968f97ea84f727140eccd2f85e8b0e
|
Provenance
The following attestation bundles were made for swchp2pcom-0.3.0-py3-none-any.whl:
Publisher:
publish-python-package.yml on Swarmchestrate/lib_comm
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
swchp2pcom-0.3.0-py3-none-any.whl -
Subject digest:
aba5beac3897067167e0be3893adbc2c505376ae90d5a37beceb5fd90116acd9 - Sigstore transparency entry: 417669160
- Sigstore integration time:
-
Permalink:
Swarmchestrate/lib_comm@ec1607debbc30f3b9276a3bea6645fa86fbb9f40 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/Swarmchestrate
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish-python-package.yml@ec1607debbc30f3b9276a3bea6645fa86fbb9f40 -
Trigger Event:
push
-
Statement type: