Skip to main content

Asynchronous remote procedure calls for Python over MQTT

Project description

vrpc-py

Variadic Remote Procedure Calls for Python

vrpc-py is an asynchronous, event-driven Remote Procedure Call (RPC) framework for Python. It allows you to seamlessly expose standard Python classes and functions over an MQTT message broker, making them instantly callable from anywhere in the world.

By leveraging MQTT and Python's asyncio, vrpc-py bypasses NATs, firewalls, and complex networking setups, allowing for bi-directional communication, dynamic object instantiation, and remote continuous event streaming.

It is 100% protocol-compatible with the Node.js / JS vrpc implementation.

✨ Features

  • Zero Boilerplate: Register existing Python classes without modifying their code.
  • Fully Asynchronous: Built from the ground up on modern asyncio and aiomqtt.
  • Stateful Instances: Create, manage, and delete remote object instances dynamically. Shared instances can be interacted with by multiple clients simultaneously.
  • Remote Event Listeners: Pass Python callables as arguments to remote functions. VRPC automatically binds them to MQTT streams (perfect for real-time sensor data or status updates).
  • Batch Executions: Broadcast method calls to all instances of a class across multiple distributed agents in a single line of code (call_all).

📦 Installation

Since vrpc-py is modern Python package (PEP 621), you can install it directly via pip:

pip install vrpc-py

Requirements: Python 3.8+

🚀 Quick Start

To use VRPC, you need two components: an Agent (which hosts your code) and a Client (which remotely calls it). Both connect to a central MQTT broker.

1. The Agent (Server)

First, write a standard Python class. Let's create a Counter that maintains state and can emit continuous events via a callback.

# agent.py
import asyncio
from vrpc import VrpcAdapter, VrpcAgent

class Counter:
    def __init__(self, initial_value=0):
        self._count = initial_value
        self._callback = None

    def on_change(self, callback):
        """Registers a callback for continuous state updates."""
        self._callback = callback

    def increment(self, step=1):
        """Increments the counter and triggers the callback."""
        self._count += step
        if self._callback:
            self._callback(self._count)
        return self._count

# 1. Register the class so VRPC knows about it
VrpcAdapter.register(Counter)

async def main():
    # 2. Configure the Agent to connect to a broker
    agent = VrpcAgent(
        domain="my.custom.domain",
        agent="python-agent-1",
        broker="mqtt://broker.hivemq.com:1883" # Public test broker
    )

    print("Agent is starting...")
    await agent.serve()

if __name__ == "__main__":
    asyncio.run(main())

2. The Client

Now, from a completely different machine, process, or network, we can connect a client, create an instance of that Counter, and interact with it.

# client.py
import asyncio
from vrpc import VrpcClient

async def main():
    # 1. Connect to the same broker and domain
    client = VrpcClient(
        domain="my.custom.domain",
        broker="mqtt://broker.hivemq.com:1883"
    )
    await client.connect()

    # 2. Create a remote instance of the Counter
    print("Creating remote Counter instance...")
    counter = await client.create(
        agent="python-agent-1",
        class_name="Counter",
        instance="my-shared-counter",
        args=[10]  # Passes '10' to initial_value
    )

    # 3. Define a local function to handle remote events
    def handle_update(new_val):
        print(f" -> Continuous Event Received! Counter is now: {new_val}")

    # VRPC magically wires this Python function over MQTT
    await counter.on_change(handle_update)

    # 4. Call remote methods
    print("Calling increment(5)...")
    result = await counter.increment(5)
    print(f"RPC Returned: {result}")

    # Cleanup
    await client.end()

if __name__ == "__main__":
    asyncio.run(main())

🏗 Architecture Concepts

VRPC organizes remote execution using a specific hierarchy:

  • Domain: The highest level of isolation. Agents and Clients must share the same domain to see each other.
  • Agent: A physical process hosting VRPC code. A single domain can have hundreds of distributed agents.
  • Class: A registered Python class available for instantiation.
  • Instance: A specific, stateful object created from a Class. Instances can be Shared (visible to all clients) or Isolated (visible only to the client that created it).

🤝 Contributing

Contributions, issues, and feature requests are welcome!

  1. Fork the project.
  2. Install with test dependencies: pip install -e .[test]
  3. Run local adapter tests: pytest tests/adapter/test_adapter.py
  4. Run integration tests (requires Docker): cd tests/agent && ./test.sh

📝 License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vrpc-3.0.0-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file vrpc-3.0.0-py3-none-any.whl.

File metadata

  • Download URL: vrpc-3.0.0-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for vrpc-3.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6ea20b362ad9890f5b14c4e56ad08987b825ac1fab7d6e0112a9dc43b0dac57e
MD5 5c7642224ece24334c8d2bc0dd25658f
BLAKE2b-256 85b5ae2285d27d9d5a9f337f63d2079831f917ee3a3aef643cbeb10baf823e25

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page