A lightweight IPC library with bridge and streaming support
Project description
CommIPC: Asynchronous IPC for Linux
Installation
pip install comm-ipc
Why CommIPC?
CommIPC bridges the gap between simple Unix sockets and complex message brokers like RabbitMQ. It gives you Type-Safe, Asynchronous communication with the ease of a local function call.
Quick Start
This script demonstrates a basic server and client interaction in a single file.
import asyncio
from comm_ipc.server import CommIPCServer
from comm_ipc.client import CommIPC
async def main():
# Start server
server = CommIPCServer(socket_path="/tmp/quickstart.sock")
server_task = asyncio.create_task(server.run())
await asyncio.sleep(0.1) # Wait for server to start
# Start client
client = CommIPC(socket_path="/tmp/quickstart.sock")
await client.connect()
channel = await client.open("demo")
# Register an event
async def hello_handler(cd):
return {"msg": f"Hello, {cd.data['name']}"}
await channel.add_event("hello", hello_handler)
# Call the event
res = await channel.event("hello", {"name": "World"})
print(res.data["msg"])
# Shutdown
await client.close()
await server.stop()
server_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Connectivity and Transport
CommIPC supports several transport layers:
- Local: Unix Domain Sockets.
- Remote: TCP connections.
- Secure: SSL/TLS encryption for TCP.
Initialization
from comm_ipc.server import CommIPCServer
from comm_ipc.client import CommIPC
# Server setup
server = CommIPCServer(socket_path="/tmp/comm_ipc.sock")
await server.run()
# Client setup
client = CommIPC(socket_path="/tmp/comm_ipc.sock")
await client.connect()
System Reference: CommIPCServer
CommIPCServer manages routing and security handshakes.
Constructor: CommIPCServer
server_id(str): Unique server ID.socket_path(str): Path to the Unix socket. Default:/tmp/comm_ipc.sock.error_policy(str): Behavior on exceptions ("ignore","raise","broadcast").connection_secret(str): Secret for HMAC-SHA256 handshakes.system_passwords(Dict[str, str]): Pre-set channel passwords.channel_policy(str): Behavior on owner disconnect."terminate": Close the channel (default)."promote": Promote the next earliest member to owner.
lb_policy(str): Load balancing policy for group events."least-active": Send to provider with fewest pending calls (default)."round-robin": Cycle through providers sequentially.
idle_timeout(float): Header read timeout. Default:60.0.data_timeout(float): Body read timeout. Default:60.0.verbose(bool): log output.
Methods
await run(socket_path, host, port, ssl_context): Start the server on a socket or TCP address.await stop(): Close all links and stop the server.
System Reference: CommIPC (Client)
CommIPC is the main client interface.
Constructor: CommIPC
client_id(str): Unique client ID.socket_path(str): Path to the Unix socket.on_error(Callable): Callback for errors.ssl_context: SSL context for TCP.connection_secret(str): Handshake secret.auto_reconnect(bool): Automatic reconnection. Default:True.reconnect_max_tries(int): Retry limit.0means unlimited.idle_timeout(float): Header read timeout.data_timeout(float): Body read timeout.heartbeat_interval(float): Ping frequency. Default:30.0.return_type(str): Data format ("dict"or"model"). Default:"dict".verbose(bool): Log output.
Methods
await connect(host, port, ssl_context, connection_secret): Establish a link.await open(chan, password): Open a channel and return aCommIPCChannel.await set_password(chan, password): Set a channel password (owner only).await call(chan, ev, data): Perform a request-response call.stream(chan, ev, data): Async iterator for data streams.await add_subscription(chan, sub_name, parameters): Register a subscription endpoint.await remove_subscription(chan, sub_name): Remove a subscription.await subscribe(chan, sub_name, callback): Receive data from a stream.await unsubscribe(chan, sub_name): Stop receiving data.await publish(chan, sub_name, data): Send data to subscribers. Accepts dict orBaseModel.await wait_till_end(): Wait until the connection terminates.await close(): Disconnect from the server.on_msg: Callback for all incoming messages.
System Reference: CommIPCChannel
CommIPCChannel objects handle channel-specific interactions.
Methods
await add_event(name, call, parameters, returns, is_group): Register an RPC handler. Ifis_groupis True, it registers as a load-balanced provider.await add_stream(name, call, parameters): Register a stream handler (automatically detects async generators).await event(event_name, data): Call a remote RPC event.stream(event_name, data): Collect a remote stream.await broadcast(event_name, data): Send to all channel members.await send(event_name, data): Send to a provider without a response.await add_subscription(sub_name, model): Register a channel subscription schema.await remove_subscription(sub_name): Remove a channel subscription.await subscribe(sub_name, callback): Channel-level subscription.await unsubscribe(sub_name): Channel-level unsubscription.await publish(sub_name, data): Channel-level publishing. Accepts dict orBaseModel.on_receive(call, event_name): Attach a listener for specific or all messages.explore(): List discovered events and subscriptions.get_schema(name): Get the Pydantic schema for an endpoint.
System Reference: CommIPCApp
CommIPCApp provides a declarative, decorator-based interface for CommIPCChannel.
Constructor: CommIPCApp
channel(CommIPCChannel): The open channel instance to wrap.
Decorators & Methods
@app.provide(name, parameters, returns): Registers an asynchronous handler as an event provider. Automatically detects if the handler is an async generator for streaming support.@app.on(event_name): Attaches a listener to the channel. If anevent_nameis provided, it also automatically handles the server-sidesubscribe()call.@app.group.provide(name, parameters, returns): Registers a load-balanced group event provider.@app.subscription(name, model): Declares a subscription schema. This is a non-blocking way to ensure the server is aware of the subscription metadata, which is required before callingpublish().app.group(name: str = None): Returns aCommIPCAppGrouphelper. If anameis provided, registrations will be made under that specific load-balanced group.
CommIPCAppGroup Reference
@group.provide(name, parameters, returns): Registers an event provider within the group. Calls to this event will be load-balanced across all providers in the group.
System Reference: CommIPCGroup
CommIPCGroup provides an interface for load-balanced event groups. It is accessed via channel.group.
Methods
__call__(name: str): Returns aCommIPCGroupinstance scoped to the specified group name.await provide(event, handler, parameters, returns): Registers a provider for an event within this group.await get(event, data): Calls an event within the group (load balanced).stream(event, data): Returns an async iterator for a grouped stream (load balanced).
Example Usage
from comm_ipc.app import CommIPCApp
from comm_ipc.comm_data import CommData
from pydantic import BaseModel
# 1. Open a channel
chan = await client.open("math")
app = CommIPCApp(chan)
# 2. Define a data model
class MathParams(BaseModel):
a: int
b: int
# 3. Register a provider
@app.provide("add", parameters=MathParams)
async def add(cd: CommData):
return cd.data.a + cd.data.b
# 4. Register a streaming provider (detected automatically)
@app.provide("counter")
async def count_up(cd: CommData):
for i in range(5):
yield {"count": i}
# 5. Register a group provider
@app.group.provide("mult")
async def mult(cd: CommData):
return cd.data["a"] * cd.data["b"]
# 6. Declare and listen to subscriptions
@app.subscription("updates")
class UpdateModel(BaseModel):
status: str
@app.on("updates")
async def handle_update(cd: CommData):
print(f"System Status: {cd.data.status}")
Load Balancing (Event Groups)
When multiple providers register for the same event, it creates a conflict unless Groups are used. By grouping providers, the server automatically load-balances calls (using a least-active or round-robin policy).
Standard API
# Provider A
await channel.group("workers").add_event("process", handle_a)
# Provider B
await channel.group("workers").add_event("process", handle_b)
# Consumer
res = await channel.group("workers").event("process", data)
Decorator API
# Create a named group decorator
workers = app.group("workers")
@workers.provide("process")
async def handle(cd):
...
Code Examples
Request-Response (RPC)
from pydantic import BaseModel
from comm_ipc.client import CommIPC
class MathParams(BaseModel):
a: int
b: int
# Provider
async def add_handler(cd):
assert isinstance(cd.data, MathParams)
return {"result": cd.data.a + cd.data.b}
# Consumer
client = CommIPC(return_type="model") # Enables automated model reception
channel = await client.open("math")
await channel.add_event("add", add_handler, parameters=MathParams)
res = await channel.event("add", MathParams(a=10, b=20))
print(f"Result: {res.data['result']}") # cd.data is automatically a model instance
Publisher-Subscriber
from pydantic import BaseModel
class User(BaseModel):
id: int
name: str
# Publisher
await channel.add_subscription("news", model=User)
await channel.publish("news", User(id=1, name="Alice"))
# Subscriber
async def on_data(cd):
# cd.data is automatically an instance of User (local or dynamic)
print(f"Got user: {cd.data.name}")
await channel.subscribe("news", on_data)
Streaming
# Provider
async def count_up(cd):
for i in range(cd.data["limit"]):
yield i
await channel.add_stream("counter", count_up)
# Consumer
async for chunk in channel.stream("counter", {"limit": 5}):
print(chunk.data)
Messaging and Listeners
# Send: Directed message to a provider (no response)
await channel.send("log", {"level": "info", "msg": "event started"})
# Broadcast: Message to every client in the channel
await channel.broadcast("system_update", {"status": "maintenance"})
# Listen: specific event
async def on_update(cd):
print(f"Update: {cd.data}")
channel.on_receive(on_update, "system_update")
# Listen: generic channel observer
async def on_any(cd):
print(f"Channel {cd.channel} got {cd.event}")
channel.on_receive(on_any)
Security and Passwords
# Set password (owner)
channel = await client.open("secure")
await client.set_password("secure", "password123")
# Open with password (client)
channel = await client.open("secure", password="password123")
System Reference: CommData (Message Object)
CommData models all messages and metadata.
Fields
sender_id(str): Sender identifier.server_id(str): Routing server identifier.channel(str): Channel name.event(str): Event name or subscription ID.data(Any): Message content.timestamp(int): Creation timestamp.metadata(Dict): Additional data.request_id(str): Correlation ID for calls.target_id(str): Recipient identifier.path(List[str]): Chain of routing servers.is_stream(bool): Stream flag.is_final(bool): End-of-stream flag.signature(str): Message integrity signature.origin_server_id(str): First server in the chain.sub_name(str): Subscription identifier.
System Reference: CommIPCBridge
CommIPCBridge synchronizes two separate hubs.
Constructor: CommIPCBridge
bridge_id(str): Bridge identifier.socket_path1,socket_path2(str): Local socket paths.ssl_context1,ssl_context2: SSL contexts.allowed_channels(List[str]): List of channels to sync. Default: all.
Methods
await connect(target1_params, target2_params): Connect two targets.await stop(): Stop the bridge.
from comm_ipc.bridge import CommIPCBridge
bridge = CommIPCBridge(bridge_id="bridge-1")
await bridge.connect(
target1_params={"socket_path": "/tmp/s1.sock"},
target2_params={"socket_path": "/tmp/s2.sock"}
)
System Reference: System Channels
The server provides read-only channels for monitoring:
__comm_ipc_logs: Server log broadcast.__comm_ipc_errors: Global error broadcast.__comm_ipc_system: System event broadcast (e.g.,new_registration).
Resilience
Clients recover state automatically if auto_reconnect is True. On reconnection, the client:
- Re-identifies with the same ID.
- Re-opens and authenticates active channels.
- Re-registers all handlers and endpoints.
License
Licensed under the LGPLv3 License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file comm_ipc-0.1.5.tar.gz.
File metadata
- Download URL: comm_ipc-0.1.5.tar.gz
- Upload date:
- Size: 44.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
01b37f088dacfbf152393438275b0aee81b32fa40ac99ff4251683b352c404e3
|
|
| MD5 |
1d23525842bc37fcd572c1e1cd341799
|
|
| BLAKE2b-256 |
a0bb5c510cf24cdb2a0d59a9e2a5ae75a5fdfa444508ab3e152f615e181c3599
|
Provenance
The following attestation bundles were made for comm_ipc-0.1.5.tar.gz:
Publisher:
publish.yml on shashstormer/comm_ipc
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
comm_ipc-0.1.5.tar.gz -
Subject digest:
01b37f088dacfbf152393438275b0aee81b32fa40ac99ff4251683b352c404e3 - Sigstore transparency entry: 1274294673
- Sigstore integration time:
-
Permalink:
shashstormer/comm_ipc@22e0796922bebde988676981601212b94b0a7c86 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/shashstormer
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@22e0796922bebde988676981601212b94b0a7c86 -
Trigger Event:
push
-
Statement type:
File details
Details for the file comm_ipc-0.1.5-py3-none-any.whl.
File metadata
- Download URL: comm_ipc-0.1.5-py3-none-any.whl
- Upload date:
- Size: 31.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ce2e17cdb8f3b7c73c3d1f6fd1a4f9474032f9b37dd030ea5f469402e35b122
|
|
| MD5 |
75fd25681df5d5a08ff48a29d5fa930f
|
|
| BLAKE2b-256 |
fa30a6ce821ba7f1fd1d18490226ef04f1e1690a8c189c213454a2c0aa85c175
|
Provenance
The following attestation bundles were made for comm_ipc-0.1.5-py3-none-any.whl:
Publisher:
publish.yml on shashstormer/comm_ipc
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
comm_ipc-0.1.5-py3-none-any.whl -
Subject digest:
8ce2e17cdb8f3b7c73c3d1f6fd1a4f9474032f9b37dd030ea5f469402e35b122 - Sigstore transparency entry: 1274294759
- Sigstore integration time:
-
Permalink:
shashstormer/comm_ipc@22e0796922bebde988676981601212b94b0a7c86 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/shashstormer
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@22e0796922bebde988676981601212b94b0a7c86 -
Trigger Event:
push
-
Statement type: