Remote Procedure Call (RPC) library for Python using RabbitMQ
Project description
Remote Producer and Consumer
This package provides asynchronous RPC (Remote Procedure Call) functionality over RabbitMQ using aio_pika. It includes
tools for both producing and consuming remote tasks, supporting concurrency, rate limiting, and callback patterns.
producer.py (Remote Producer)
- Channel Pooling: Efficiently manages a pool of RabbitMQ channels for concurrent message publishing.
- remote_call: Sends an RPC request to a remote queue, optionally waiting for a result with timeout and priority support.
- remote_callback: Sends an RPC request and specifies a callback function to handle the result asynchronously.
- remote_task: Decorator to mark a function as a remote task, enabling it to be called via RabbitMQ.
- remote_task_callback: Decorator for remote tasks that require a callback upon completion.
consumer.py (Remote Consumer)
- RemoteConsumer: Base class for consuming and processing messages from a RabbitMQ queue.
- Handles message deserialization, function lookup, and invocation.
- Supports concurrency and rate limiting (messages per minute).
- Processes both request and result messages, including error handling and callback invocation.
These abstractions enable robust, asynchronous RPC workflows between distributed Python services using RabbitMQ.
Usage
import asyncio
import aio_pika
import remote
import random
@remote.remote_task(
queue="rpc_lane",
return_result=True
)
async def divide(x, y):
print(f"DIV-thinking about {x} / {y}")
await asyncio.sleep(random.random())
print(f"DIV-returning {x} / {y}")
return x / y
async def main() -> None:
connection = await aio_pika.connect_robust()
# Set up the consumer
cons = remote.RemoteConsumer(connection, queue="rpc_lane", concurrency=1, limit_per_minute=0)
await cons.consume()
# Set up the producer
await remote.init_channel_pool(connection)
# Call the remote task - this interpreter is both producer and consumer
print(await divide(6, 3)) # Should return 2.0
await cons.cancel()
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rmq_remote-0.1.0.tar.gz.
File metadata
- Download URL: rmq_remote-0.1.0.tar.gz
- Upload date:
- Size: 12.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.12.3 Linux/6.11.0-25-generic
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
925b639ca8b32b228f2af423159e7e2bfc4e254a52f67d41efb535661feebc9e
|
|
| MD5 |
bcb0e9074460335efd1727e59bb3b6f9
|
|
| BLAKE2b-256 |
4363c412e9260a29d05c007698c6cea096a74f660bcdc335685dc00719863835
|
File details
Details for the file rmq_remote-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rmq_remote-0.1.0-py3-none-any.whl
- Upload date:
- Size: 14.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.3 CPython/3.12.3 Linux/6.11.0-25-generic
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
769f21d6f8314b6aaae6344752596edaf1cd989ba4423f22c0f057378fb6db0b
|
|
| MD5 |
0367b647716e49e0deae472b7071b5be
|
|
| BLAKE2b-256 |
33fceb61e16f662f3c34a3c75607dd095c0430067086d17fec358c9bc015273a
|