Skip to main content

A Python library for building gRPC/ConnectRPC services with Pydantic models.

Project description

🚀 PydanticRPC

PydanticRPC is a Python library that enables you to rapidly expose Pydantic models via gRPC/Connect RPC services without writing any protobuf files. Instead, it automatically generates protobuf files on the fly from the method signatures of your Python objects and the type signatures of your Pydantic models.

Below is an example of a simple gRPC service that exposes a PydanticAI agent:

import asyncio

from pydantic_ai import Agent
from pydantic_rpc import AsyncIOServer, Message


# `Message` is just an alias for Pydantic's `BaseModel` class.
class CityLocation(Message):
    city: str
    country: str


class Olympics(Message):
    year: int

    def prompt(self):
        return f"Where were the Olympics held in {self.year}?"


class OlympicsLocationAgent:
    def __init__(self):
        self._agent = Agent("ollama:llama3.2", result_type=CityLocation)

    async def ask(self, req: Olympics) -> CityLocation:
        result = await self._agent.run(req.prompt())
        return result.data


if __name__ == "__main__":
    s = AsyncIOServer()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(s.run(OlympicsLocationAgent()))

And here is an example of a simple Connect RPC service that exposes the same agent as an ASGI application:

import asyncio

from pydantic_ai import Agent
from pydantic_rpc import ConnecpyASGIApp, Message


class CityLocation(Message):
    city: str
    country: str


class Olympics(Message):
    year: int

    def prompt(self):
        return f"Where were the Olympics held in {self.year}?"


class OlympicsLocationAgent:
    def __init__(self):
        self._agent = Agent("ollama:llama3.2", result_type=CityLocation)

    async def ask(self, req: Olympics) -> CityLocation:
        result = await self._agent.run(req.prompt())
        return result.data

app = ConnecpyASGIApp()
app.mount(OlympicsLocationAgent())

💡 Key Features

  • 🔄 Automatic Protobuf Generation: Automatically creates protobuf files matching the method signatures of your Python objects.
  • ⚙️ Dynamic Code Generation: Generates server and client stubs using grpcio-tools.
  • Pydantic Integration: Uses pydantic for robust type validation and serialization.
  • For gRPC:
    • 💚 Health Checking: Built-in support for gRPC health checks using grpc_health.v1.
    • 🔎 Server Reflection: Built-in support for gRPC server reflection.
    • Asynchronous Support: Easily create asynchronous gRPC services with AsyncIOServer.
  • For gRPC-Web:
    • 🌐 WSGI/ASGI Support: Create gRPC-Web services that can run as WSGI or ASGI applications powered by Sonora.
  • For Connect-RPC:
    • 🌐 Connecpy Support: Partially supports Connect-RPC via Connecpy.

📦 Installation

Install PydanticRPC via pip:

pip install pydantic-rpc

🚀 Getting Started

🔧 Synchronous Service Example

from pydantic_rpc import Server, Message

class HelloRequest(Message):
    name: str

class HelloReply(Message):
    message: str

class Greeter:
    # Define methods that accepts a request and returns a response.
    def say_hello(self, request: HelloRequest) -> HelloReply:
        return HelloReply(message=f"Hello, {request.name}!")

if __name__ == "__main__":
    server = Server()
    server.run(Greeter())

⚙️ Asynchronous Service Example

import asyncio

from pydantic_rpc import AsyncIOServer, Message


class HelloRequest(Message):
    name: str


class HelloReply(Message):
    message: str


class Greeter:
    async def say_hello(self, request: HelloRequest) -> HelloReply:
        return HelloReply(message=f"Hello, {request.name}!")


if __name__ == "__main__":
    server = AsyncIOServer()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server.run(Greeter()))

🌐 ASGI Application Example

from pydantic_rpc import ASGIApp, Message

class HelloRequest(Message):
    name: str

class HelloReply(Message):
    message: str

class Greeter:
    def say_hello(self, request: HelloRequest) -> HelloReply:
        return HelloReply(message=f"Hello, {request.name}!")


async def app(scope, receive, send):
    """ASGI application.

    Args:
        scope (dict): The ASGI scope.
        receive (callable): The receive function.
        send (callable): The send function.
    """
    pass

# Please note that `app` is any ASGI application, such as FastAPI or Starlette.

app = ASGIApp(app)
app.mount(Greeter())

🌐 WSGI Application Example

from pydantic_rpc import WSGIApp, Message

class HelloRequest(Message):
    name: str

class HelloReply(Message):
    message: str

class Greeter:
    def say_hello(self, request: HelloRequest) -> HelloReply:
        return HelloReply(message=f"Hello, {request.name}!")

def app(environ, start_response):
    """WSGI application.

    Args:
        environ (dict): The WSGI environment.
        start_response (callable): The start_response function.
    """
    pass

# Please note that `app` is any WSGI application, such as Flask or Django.

app = WSGIApp(app)
app.mount(Greeter())

🏆 Connecpy (Connect-RPC) Example

PydanticRPC also partially supports Connect-RPC via connecpy. Check out “greeting_connecpy.py” for an example:

rye run python greeting_connecpy.py

This will launch a Connecpy-based ASGI application that uses the same Pydantic models to serve Connect-RPC requests.

[!NOTE] Please install protoc-gen-connecpy to run the Connecpy example.

  1. Install Go.
  2. Install protoc-gen-connecpy:
    go install github.com/connecpy/protoc-gen-connecpy@latest
    

♻️ Skipping Protobuf Generation

By default, PydanticRPC generates .proto files and code at runtime. If you wish to skip the code-generation step (for example, in production environment), set the environment variable below:

export PYDANTIC_RPC_SKIP_GENERATION=true

When this variable is set to "true", PydanticRPC will load existing pre-generated modules rather than generating them on the fly.

💎 Advanced Features

🌊 Response Streaming

PydanticRPC supports streaming for responses in asynchronous gRPC and gRPC-Web services only.

Please see the sample code below:

import asyncio
from typing import AsyncIterator

from pydantic import field_validator
from pydantic_ai import Agent
from pydantic_rpc import AsyncIOServer, Message


# `Message` is just a pydantic BaseModel alias
class CityLocation(Message):
    city: str
    country: str


class OlympicsQuery(Message):
    year: int

    def prompt(self):
        return f"Where were the Olympics held in {self.year}?"

    @field_validator("year")
    def validate_year(cls, value):
        if value < 1896:
            raise ValueError("The first modern Olympics was held in 1896.")

        return value


class OlympicsDurationQuery(Message):
    start: int
    end: int

    def prompt(self):
        return f"From {self.start} to {self.end}, how many Olympics were held? Please provide the list of countries and cities."

    @field_validator("start")
    def validate_start(cls, value):
        if value < 1896:
            raise ValueError("The first modern Olympics was held in 1896.")

        return value

    @field_validator("end")
    def validate_end(cls, value):
        if value < 1896:
            raise ValueError("The first modern Olympics was held in 1896.")

        return value


class StreamingResult(Message):
    answer: str


class OlympicsAgent:
    def __init__(self):
        self._agent = Agent("ollama:llama3.2")

    async def ask(self, req: OlympicsQuery) -> CityLocation:
        result = await self._agent.run(req.prompt(), result_type=CityLocation)
        return result.data

    async def ask_stream(
        self, req: OlympicsDurationQuery
    ) -> AsyncIterator[StreamingResult]:
        async with self._agent.run_stream(req.prompt(), result_type=str) as result:
            async for data in result.stream_text(delta=True):
                yield StreamingResult(answer=data)


if __name__ == "__main__":
    s = AsyncIOServer()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(s.run(OlympicsAgent()))

🔗 Multiple Services with Custom Interceptors

PydanticRPC supports defining and running multiple services in a single server:

from datetime import datetime
import grpc
from grpc import ServicerContext

from pydantic_rpc import Server, Message


class FooRequest(Message):
    name: str
    age: int
    d: dict[str, str]


class FooResponse(Message):
    name: str
    age: int
    d: dict[str, str]


class BarRequest(Message):
    names: list[str]


class BarResponse(Message):
    names: list[str]


class FooService:
    def foo(self, request: FooRequest) -> FooResponse:
        return FooResponse(name=request.name, age=request.age, d=request.d)


class MyMessage(Message):
    name: str
    age: int
    o: int | datetime


class Request(Message):
    name: str
    age: int
    d: dict[str, str]
    m: MyMessage


class Response(Message):
    name: str
    age: int
    d: dict[str, str]
    m: MyMessage | str


class BarService:
    def bar(self, req: BarRequest, ctx: ServicerContext) -> BarResponse:
        return BarResponse(names=req.names)


class CustomInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        # do something
        print(handler_call_details.method)
        return continuation(handler_call_details)


async def app(scope, receive, send):
    pass


if __name__ == "__main__":
    s = Server(10, CustomInterceptor())
    s.run(
        FooService(),
        BarService(),
    )

🩺 [TODO] Custom Health Check

TODO

🗄️ Protobuf file generation

You can generate protobuf files for a given module and a specified class using core.py:

python core.py a_module.py aClass

Using this generated proto file and tools as protoc, buf and BSR, you could generate code for any desired language other than Python.

📖 Data Type Mapping

Python Type Protobuf Type
str string
bool bool
int int32, int64
float float, double
list[T], tuple[T] repeated T
dict[K, V] map<K, V>

TODO

  • Streaming Support
  • Betterproto Support
  • Sonora-connect Support
  • Custom Health Check Support
  • Add more examples
  • Add tests

📜 License

This project is licensed under the MIT License. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydantic_rpc-0.3.1.tar.gz (27.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydantic_rpc-0.3.1-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file pydantic_rpc-0.3.1.tar.gz.

File metadata

  • Download URL: pydantic_rpc-0.3.1.tar.gz
  • Upload date:
  • Size: 27.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for pydantic_rpc-0.3.1.tar.gz
Algorithm Hash digest
SHA256 928670f519377c5f5a22cb426ddde22dd0992ece789ad6b0962d3c10c66e5794
MD5 3f5ee5e254bd53e5e979f2b9f19963dd
BLAKE2b-256 244cc3fde35bb715293b429c7a59e417e934bebf83aeb27b957d296a2db1f9af

See more details on using hashes here.

File details

Details for the file pydantic_rpc-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: pydantic_rpc-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.3

File hashes

Hashes for pydantic_rpc-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a75c2e5c0a3499b6df3bd78e3ba598f2765a4e631c16331795d3cc63fb2452dc
MD5 5cb7c20329c5bd3c80f3cbbfd8781d0e
BLAKE2b-256 5c96532c4715ea381a91c7ffee0fe5fe544bb6a183af7c6b30c7bdb217250386

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page