Skip to main content

No project description provided

Project description

roq

Pythonic RPC over MQTT for aiomqtt

  • 💡 Clean interface, RPC with just a decorator
  • 💪 Robust multi-client implementation
  • 🤝 Seamlessly integrates with aiomqtt

Quick start

Server Side: Expose a function over MQTT:

import roq

@roq.procedure("/your/topic/name")
def sum(a, b):
    return a + b

Client Side: Call that function over MQTT:

import roq

async with client:
    ret = await client["/your/topic/name"](1, 2)

installation

pip install roq

Usage

To create a roq-enabled aiomqtt.Client you can:

  1. Replace aiomqtt.Client with roq.Client in your code (full documentation here)
import roq

client = roq.Client(
    ...  # Your usual MQTT client setup
)

async with client:
    # Call RPCs, subscribe, handle messages, the usual stuff
    await client["/your/topic/name"](some, argument, some=keyword)
    await client.subscribe("/some/other/topic")

    async for message in client.messages:
        print("Received message on topic", message.topic) 
  1. Wrap an existing aiomqtt.Client with roq.Client (queue_type must be an instance of roq.ROQRouterQueue)
import roq
import aiomqtt

client = aiomqtt.Client(
    ...  # Your usual MQTT client setup
    queue_type=roq.ROQRouterQueue
)

async with roq.Client(client) as client:
    # Same as above, subscribe, call, handle

Use the roq.procedure("/topic/name") decorator to bind a function to be called on /topic/name.

Use client["/topic/name"](your, arguments, oreven=keywoardargs) to call a function bound to /topic/name

Examples

server.py:

import roq
import asyncio

@roq.procedure("/rpc/sum")
def sum(a, b):
    return a + b

async def main():
    client = roq.Client("test.mosquitto.org")
    
    async with client:
        await client.subscribe("/some/other/topic")
        async for message in client.messages:
            print("Received message on topic", message.topic)

if __name__ == "__main__":
    asyncio.run(main())

client.py:

import roq
import asyncio

async def main():
    client = roq.Client("test.mosquitto.org")

    async with client:
        ret = await client["/rpc/sum"](1, 2)
        print(f"Result from RPC: {ret}")
        await client.publish("/some/other/topic", payload=b"Hello, World!")

if __name__ == "__main__":
    asyncio.run(main())

Use a custom queue

You can use a custom queue extending roq.ROQRouterQueue

import roq

class YourQueue(roq.ROQRouterQueue)
    def _put(self, item):
        if self.handle_roq_message(item): return   # handle roq messages

        #your item logic here
        super()._put(item)

    def _get(self):
        return super()._get()

License

This project is licensed under the Apache License 2.0. For more details, see the LICENSE file in the root directory of this project or check out the Apache License 2.0 here

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

roq-0.0.2.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

roq-0.0.2-py3-none-any.whl (9.0 kB view details)

Uploaded Python 3

File details

Details for the file roq-0.0.2.tar.gz.

File metadata

  • Download URL: roq-0.0.2.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.12.2

File hashes

Hashes for roq-0.0.2.tar.gz
Algorithm Hash digest
SHA256 193a5bfb07e13e1e47a02a815fdfcb7be56442e47d51a1837511b9bc6d7ccdce
MD5 84df3df6cd51c2318cf438840333919e
BLAKE2b-256 3046560a8a38230a537e1d3ba2cc08f11f1500662afa8e461fba2a7238cf3932

See more details on using hashes here.

File details

Details for the file roq-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: roq-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 9.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.12.2

File hashes

Hashes for roq-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 aa4c4b45882ede6864b786df5d569e5b61b6c2b8061e1e6a53b2ff7c08487e67
MD5 6439b663e15cb40a6c123280f0a9e7d3
BLAKE2b-256 57ca10a31891ddb434a185f60bb6d50072026628098729587d50be70615a378a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page