Skip to main content

Async Python client for RPC transport interoperability with NestJS microservices.

Project description

nest-rpc-client

PyPI Python

Async Python client for RPC transport interoperability with NestJS microservices. Supports RabbitMQ, Redis, NATS, TCP (and planned support for Kafka, MQTT).

Features

  • Fully async transports
  • Pluggable transport architecture
  • Compatible with NestJS microservices patterns (send, emit)
  • Extras-based installation

Installation

Install with a specific transport:

# TCP uses asyncio
pip install "nest-rpc-client[rabbitmq]"
pip install "nest-rpc-client[redis]"
pip install "nest-rpc-client[nats]"
pip install "nest-rpc-client[all]"

Usage

from nest_rpc_client.client import Client
from nest_rpc_client.transports.rabbitmq import RabbitMQTransport
from nest_rpc_client.config.rabbitmq import RabbitMQConfig

transport = RabbitMQTransport(RabbitMQConfig(url="amqp://guest:guest@localhost/", queue="rpc_queue"))

async with Client(transport) as client:
    result = await client.send("get_user", {"id": 123})
    await client.emit("user_created", {"id": 123})

You can also use only transport:

from nest_rpc_client.transports.rabbitmq import RabbitMQTransport
from nest_rpc_client.config.rabbitmq import RabbitMQConfig

transport = RabbitMQTransport(RabbitMQConfig(url="amqp://guest:guest@localhost/", queue="rpc_queue"))

await transport.connect()

result = await transport.send("get_user", {"id": 123})
await transport.emit("user_created", {"id": 123})

await transport.close()

Pattern usage with dictionaries

If your pattern is a dictionary (object in js, for example: {cmd: "sum"}), you need to serialize it to a string before using it. Example:

pattern = json.dumps({"cmd": "sum"}) # '{"cmd": "sum"}'
await client.send(pattern, data)

Custom Transport

nest-rpc-client allows you to use your own custom transports. To do this, inherit from the Transport base class and implement its abstract methods:

from nest_rpc_client.transport import Transport

class MyCustomTransport(Transport):
    async def connect(self) -> None:
        # Initialize any connections or resources
        ...

    async def close(self) -> None:
        # Clean up resources or close connections
        ...

    async def send(self, pattern: str, data: dict) -> dict:
        # Implement the RPC request (expects a response)
        ...

    async def emit(self, pattern: str, data: dict) -> None:
        # Implement the fire-and-forget event
        ...

You can then use your custom transport exactly like the built-in ones:

transport = MyCustomTransport()
async with Client(transport) as client:
    result = await client.send("my_pattern", {"foo": "bar"})

Exception handling

nest-rpc-client raises RpcException whenever the RPC server returns an error. Example:

from nest_rpc_client.client import Client
from nest_rpc_client.transports.rabbitmq import RabbitMQTransport
from nest_rpc_client.config.rabbitmq import RabbitMQConfig
from nest_rpc_client.exceptions import RpcException

transport = RabbitMQTransport(RabbitMQConfig(url="amqp://guest:guest@localhost/", queue="rpc_queue"))

async with Client(transport) as client:
    try:
        result = await client.send("get_user", {"id": 123})
    except RpcException as e:
        print(f"RPC Error: {e.err}")

When the RPC response includes an err field (following the NestJS microservice pattern), the client automatically raises RpcException with the contents of err as its attribute:

{
    "id": "correlation-id",
    "err": {
        "message": "User not found",
        "code": 404
    }
}

This allows consistent error handling across all transports.

Tests:

This project includes both unit tests and full integration tests.

  • Unit tests are included in this repository and can be run with: pytest
  • Full integration tests are available in a separate repository: nest-rpc-client-tests

TODO:

  • Implement kafka transport
  • Implement mqtt transport

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nest_rpc_client-1.3.2.tar.gz (6.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nest_rpc_client-1.3.2-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file nest_rpc_client-1.3.2.tar.gz.

File metadata

  • Download URL: nest_rpc_client-1.3.2.tar.gz
  • Upload date:
  • Size: 6.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.11.2 Linux/6.1.0-31-amd64

File hashes

Hashes for nest_rpc_client-1.3.2.tar.gz
Algorithm Hash digest
SHA256 49ea443440dbdd73f9e2c43019604470da27a1aec54ee2ceef507514224133e1
MD5 4a745caa16195086a985e8e51b089c20
BLAKE2b-256 08ec2f4518d7a57d14f431c8b41a13bf790090d9362e2355c63739a7f7a19418

See more details on using hashes here.

File details

Details for the file nest_rpc_client-1.3.2-py3-none-any.whl.

File metadata

  • Download URL: nest_rpc_client-1.3.2-py3-none-any.whl
  • Upload date:
  • Size: 9.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.4 CPython/3.11.2 Linux/6.1.0-31-amd64

File hashes

Hashes for nest_rpc_client-1.3.2-py3-none-any.whl
Algorithm Hash digest
SHA256 80dc5e6466d12a5e67c57e2a9469f088c48455c1eb78d784ecc9a51bea7e1f05
MD5 d1460b98e3964417e584de22479ae054
BLAKE2b-256 7beca45d480a33f9c481bfedd60340ba111eb0de4e9bbf5b36890f1e29e1b192

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page