Skip to main content

Remote Procedure Call (RPC) library for Python using RabbitMQ

Project description

Remote Producer and Consumer

This package provides asynchronous RPC (Remote Procedure Call) functionality over RabbitMQ using aio_pika. It includes tools for both producing and consuming remote tasks, supporting concurrency, rate limiting, and callback patterns.

producer.py (Remote Producer)

  • Channel Pooling: Efficiently manages a pool of RabbitMQ channels for concurrent message publishing.
  • remote_call: Sends an RPC request to a remote queue, optionally waiting for a result with timeout and priority support.
  • remote_callback: Sends an RPC request and specifies a callback function to handle the result asynchronously.
  • remote_task: Decorator to mark a function as a remote task, enabling it to be called via RabbitMQ.
  • remote_task_callback: Decorator for remote tasks that require a callback upon completion.

consumer.py (Remote Consumer)

  • RemoteConsumer: Base class for consuming and processing messages from a RabbitMQ queue.
    • Handles message deserialization, function lookup, and invocation.
    • Supports concurrency and rate limiting (messages per minute).
    • Processes both request and result messages, including error handling and callback invocation.

These abstractions enable robust, asynchronous RPC workflows between distributed Python services using RabbitMQ.

Usage

import asyncio

import aio_pika
import remote
import random


@remote.remote_task(
    queue="rpc_lane",
    return_result=True
)
async def divide(x, y):
    print(f"DIV-thinking about {x} / {y}")
    await asyncio.sleep(random.random())
    print(f"DIV-returning {x} / {y}")
    return x / y


async def main() -> None:
    connection = await aio_pika.connect_robust()

    # Set up the consumer
    cons = remote.RemoteConsumer(connection, queue="rpc_lane", concurrency=1, limit_per_minute=0)
    await cons.consume()

    # Set up the producer
    await remote.init_channel_pool(connection)

    # Call the remote task - this interpreter is both producer and consumer
    print(await divide(6, 3))  # Should return 2.0

    await cons.cancel()
    await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rmq_remote-0.1.0.tar.gz (12.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rmq_remote-0.1.0-py3-none-any.whl (14.1 kB view details)

Uploaded Python 3

File details

Details for the file rmq_remote-0.1.0.tar.gz.

File metadata

  • Download URL: rmq_remote-0.1.0.tar.gz
  • Upload date:
  • Size: 12.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.3 Linux/6.11.0-25-generic

File hashes

Hashes for rmq_remote-0.1.0.tar.gz
Algorithm Hash digest
SHA256 925b639ca8b32b228f2af423159e7e2bfc4e254a52f67d41efb535661feebc9e
MD5 bcb0e9074460335efd1727e59bb3b6f9
BLAKE2b-256 4363c412e9260a29d05c007698c6cea096a74f660bcdc335685dc00719863835

See more details on using hashes here.

File details

Details for the file rmq_remote-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rmq_remote-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 14.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.3 Linux/6.11.0-25-generic

File hashes

Hashes for rmq_remote-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 769f21d6f8314b6aaae6344752596edaf1cd989ba4423f22c0f057378fb6db0b
MD5 0367b647716e49e0deae472b7071b5be
BLAKE2b-256 33fceb61e16f662f3c34a3c75607dd095c0430067086d17fec358c9bc015273a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page