Skip to main content

Light-weight peer-to-peer messaging

Project description

VL-MESSAGING - Very Light Messaging

SUMMARY

VLMessaging is intended as an easy to use lightweight peer-to-peer messaging system that can route messages between agents living in the same thread, and between threads, processes and machines. It provides a failure robust responsibility oriented directory service for resource discovery. Security, i.e. authentication, encryption, entitlement, etc. is left to be implemented at the application layer, so VLMessaging is not intended for use in hostile environments.

In contrast, DCOM, CORBA and other RPC style systems provide models that abstract failure from the programmer providing a "simple" interface. However, except possibly in non-scalable toy examples, failure, which is an inherent possibility whenever we communicate even with another thread, needs explicit handling.

VLMessaging makes the resource discovery step explicit and only provides asynchronous and timeout based message sending. The goal is to provide a simple to use mental model for building distributed applications, incorporating the idea of resource discovery and failure to perform, as fundamental first class concepts rather than abstracting them and subsequently forcing users to implement workarounds as abstraction leaks are encountered.

To provide robustness, directory entries are responsibility based (so multiple agents can fulfil the same responsibility) rather than address based and are replicated to other directories. If an agent goes down then other agents providing the same responsibility can be used instead. If a directory goes down then other directories can be used for discovery, and any responsibility providers will re-register with other directories when heartbeats fail.

Request / response is built in but other messaging patterns, such as publish / subscribe, guarenteed delivery, intentional load-balancing, queuing etc, are left to be implemented at the application level.

CONCEPTS

Msg
The unit of communication. Contains a subject, body, from address, to address, and other metadata.

Reply
A msg sent in response to another msg.

Router
Routes msgs between local connections to other routers for non-local addresses. Handles connection management and directory discovery.

Address
A triplet of machineId, routerId and connectionId that uniquely identifies a connection.

Connection

  • msgArrived - call back coroutine that contains a message
  • send
    • async, e.g. conn.send(msg)
    • semi-synchronous - await reply with timeout, e.g. reply = await conn.send(msg, 1000) wait for a reply upto 1 second

Directory

  • Entry
    • addr - the address of the connection providing the responsibility
    • service & params - defines the responsibility being provided, e.g. "vol-surface", {"ccy": "USD", "asset-class": "equity", ...}
    • vnets - the virtual-networks the entry is for
    • perms - who can see / use this entry
  • VNET - Virtual Network - e.g. "fixed-income-trading-game", "local"
  • Hubs
    • Local Hub - the central directory on a machine
    • Network Hub - a well-known directory on the network

AuthService

  • Domain
  • Perm

Agent / Daemon

  • has at least one connection to a router

Routing

  • Special addresses
    • LocalHubDirectory - the logical / physical address of the local hub directory on the machine
    • LocalInterMachineRouter - allows multi-hop routing between machines

USAGE PATTERNS

Act as responsibility provider

  1. ask router for the address of a directory
  2. send a msg to the address registering self as being available to fulfill specified responsibilities
  3. respond to msgs and send periodic heartbeats to the directory to maintain registration

Act as requester

  1. ask router for the address of a directory
  2. send msg to the address requesting addresses that can fulfill a responsibility
  3. send msgs to those addresses and either await replies upto a given timeout or process them asynchronously as they arrive

EXAMPLES

Add One Via IPC

from vlmessaging import Msg, Router, VLM
from vlmessaging.utils import co


class AddOneAgent:
    def __init__(self, router):
        self.conn = router.newConnection(self.msgArrived)

    async def msgArrived(self, msg):
        if msg.subject == 'ADD_ONE':
            await self.conn.send(msg.reply(msg.contents + 1))
        else:
            raise ValueError(f'Unhandled subject: {msg.subject}')

async def run_example():
    router1 = Router(mode=VLM.MACHINE_MODE)
    router2 = Router(mode=VLM.MACHINE_MODE)
    fred = AddOneAgent(router1)
    conn = router2.newConnection()
    reply = await conn.send(Msg(fred.conn.addr, 'ADD_ONE', 41), 1_000)

    assert reply.contents == 42

    router1.shutdown()
    router2.shutdown()
    await co.until((router1.hasShutdown, router2.hasShutdown))

co.startEventLoopWith(run_example)

Add One Via IPC and Resource Discovery

from vlmessaging import Msg, Router, VLM, Entry, Directory
from vlmessaging.utils import co, Missing, wip


class AddOneAgent:
  
    def __init__(self, router):
        self.conn = router.newConnection(self.msgArrived)

    async def start(self, vnet=[]):
        msg = Msg(
            self.conn.directoryAddr,
            VLM.REGISTER_ENTRY,
            Entry(
                self.conn.addr,
                'AddOneAgent',
                params=None,
                vnets=[vnet] if not isinstance(vnet, (list, tuple)) else None,
                perms=None
            )
        )
        await self.conn.send(msg, 500)
        return self

    async def msgArrived(self, msg):
        if msg.subject == 'ADD_ONE':
            await self.conn.send(msg.reply(msg.contents + 1))
        else:
            return [VLM.IGNORE_UNHANDLED_REPLIES, VLM.HANDLE_PING, VLM.HANDLE_DOES_NOT_UNDERSTAND]
        
        
async def run_example():
    r1 = Router(mode=VLM.MACHINE_MODE)
    d1 = Directory(r1,
        hubListen='ipc:///tmp/hub_1',
    )

    r2 = Router(mode=VLM.MACHINE_MODE)
    d2 = Directory(r2,
        hubs=['ipc:///tmp/hub_1'],
    )

    agent = await AddOneAgent(r1).start(VLM.LOCAL_VNET)

    conn = r2.newConnection()
    # loop until timeout or the relevant entry appears in d2 (propagated from d1)
    agentAddr = await wip._waitForSingleEntryAddrOfTypeOrReplyAndExit(conn, 'AddOneAgent', 2_000, 200, errMsg=Missing)
    reply = await conn.send(Msg(agentAddr, 'ADD_ONE', 41), 2_000)
    assert reply.contents == 42

    r1.shutdown()
    r2.shutdown()
    await co.until([r1.hasShutdown, r2.hasShutdown])

co.startEventLoopWith(run_example)

Add One Via TCP and Resource Discovery

async def run_example():
    r1 = Router(mode=VLM.NETWORK_MODE)
    d1 = Directory(r1,
        vnets=['test'],
        netListen='tcp://127.0.0.1:30001',
    )

    r2 = Router(mode=VLM.NETWORK_MODE)
    d2 = Directory(r2,
        vnets=['test'],
        netHubs=['tcp://127.0.0.1:30001'],
    )

    agent = await AddOneAgent(r1).start('test')

    conn = r2.newConnection()
    # loop until timeout or the relevant entry appears in d2 (propagated from d1)
    agentAddr = await wip._waitForSingleEntryAddrOfTypeOrReplyAndExit(conn, 'AddOneAgent', 2_000, 200, errMsg=Missing)
    reply = await conn.send(Msg(agentAddr, 'ADD_ONE', 41), 2_000)
    assert reply.contents == 42

    r1.shutdown()
    r2.shutdown()
    await co.until([r1.hasShutdown, r2.hasShutdown])

co.startEventLoopWith(run_example)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vl_messaging-2026.1.11.tar.gz (26.5 kB view details)

Uploaded Source

File details

Details for the file vl_messaging-2026.1.11.tar.gz.

File metadata

  • Download URL: vl_messaging-2026.1.11.tar.gz
  • Upload date:
  • Size: 26.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for vl_messaging-2026.1.11.tar.gz
Algorithm Hash digest
SHA256 12b0c90ba2ac209cb29b16a63e74c185a4c443177fbabf322d86af1aef3c112d
MD5 57d40e40f26e283c3f52db7faa948fe0
BLAKE2b-256 9ca798842b03fba106b9c9a1ebbd8c402557996e668ea3a223990dfc8a5c123d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page