Skip to main content

Code generator, server library and client library for the Connect Protocol

Project description

Connecpy

Python implementation of Connect Protocol.

This repo contains a protoc plugin that generates sever and client code and a pypi package with common implementation details.

Installation

You can install the protoc plugin to generate files by running the command:

go install github.com/i2y/connecpy/protoc-gen-connecpy@latest

Additionally, please add the connecpy package to your project using your preferred package manager. For instance, with uv, use the command:

uv add connecpy

or

pip install connecpy

To run the server, you'll need one of the following: Uvicorn, Daphne, or Hypercorn. If your goal is to support both HTTP/1.1 and HTTP/2, you should opt for either Daphne or Hypercorn. Additionally, to test the server, you might need a client command, such as buf.

Generate and run

Use the protoc plugin to generate connecpy server and client code.

protoc --python_out=./ --pyi_out=./ --connecpy_out=./ ./haberdasher.proto

Server code (ASGI)

# service.py
import random

from connecpy.exceptions import InvalidArgument
from connecpy.context import ServiceContext

from haberdasher_pb2 import Hat, Size


class HaberdasherService(object):
    async def MakeHat(self, req: Size, ctx: ServiceContext) -> Hat:
        print("remaining_time: ", ctx.time_remaining())
        if req.inches <= 0:
            raise InvalidArgument(
                argument="inches", error="I can't make a hat that small!"
            )
        response = Hat(
            size=req.inches,
            color=random.choice(["white", "black", "brown", "red", "blue"]),
        )
        if random.random() > 0.5:
            response.name = random.choice(
                ["bowler", "baseball cap", "top hat", "derby"]
            )

        return response
# server.py
from connecpy import context
from connecpy.asgi import ConnecpyASGIApp

import haberdasher_connecpy
from service import HaberdasherService

service = haberdasher_connecpy.HaberdasherServer(
    service=HaberdasherService()
)
app = ConnecpyASGIApp()
app.add_service(service)

Run the server with

uvicorn --port=3000 server:app

or

daphne --port=3000 server:app

or

hypercorn --bind :3000 server:app

Client code (Asyncronous)

# async_client.py
import asyncio

import httpx

from connecpy.context import ClientContext
from connecpy.exceptions import ConnecpyServerException

import haberdasher_connecpy, haberdasher_pb2


server_url = "http://localhost:3000"
timeout_s = 5


async def main():
    session = httpx.AsyncClient(
        base_url=server_url,
        timeout=timeout_s,
    )
    client = haberdasher_connecpy.AsyncHaberdasherClient(server_url, session=session)

    try:
        response = await client.MakeHat(
            ctx=ClientContext(),
            request=haberdasher_pb2.Size(inches=12),
            # Optionally provide a session per request
            # session=session,
        )
        if not response.HasField("name"):
            print("We didn't get a name!")
        print(response)
    except ConnecpyServerException as e:
        print(e.code, e.message, e.to_dict())
    finally:
        # Close the session (could also use a context manager)
        await session.aclose()


if __name__ == "__main__":
    asyncio.run(main())

Example output :

size: 12
color: "black"
name: "bowler"

Client code (Synchronous)

# client.py
from connecpy.context import ClientContext
from connecpy.exceptions import ConnecpyServerException

import haberdasher_connecpy, haberdasher_pb2


server_url = "http://localhost:3000"
timeout_s = 5


def main():
    client = haberdasher_connecpy.HaberdasherClient(server_url, timeout=timeout_s)

    try:
        response = client.MakeHat(
            ctx=ClientContext(),
            request=haberdasher_pb2.Size(inches=12),
        )
        if not response.HasField("name"):
            print("We didn't get a name!")
        print(response)
    except ConnecpyServerException as e:
        print(e.code, e.message, e.to_dict())


if __name__ == "__main__":
    main()

Other clients

Of course, you can use any HTTP client to make requests to a Connecpy server. For example, commands like curl or buf curl can be used, as well as HTTP client libraries such as requests, httpx, aiohttp, and others. The examples below use curl and buf curl.

Content-Type: application/proto, HTTP/1.1

buf curl --data '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --schema ./haberdasher.proto

On Windows, Content-Type: application/proto, HTTP/1.1

buf curl --data '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --schema .\haberdasher.proto

Content-Type: application/proto, HTTP/2

buf curl --data '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --http2-prior-knowledge --schema ./haberdasher.proto

On Windows, Content-Type: application/proto, HTTP/2

buf curl --data '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --http2-prior-knowledge --schema .\haberdasher.proto

Content-Type: application/json, HTTP/1.1

curl -X POST -H "Content-Type: application/json" -d '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

On Windows, Content-Type: application/json, HTTP/1.1

curl -X POST -H "Content-Type: application/json" -d '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

Content-Type: application/json, HTTP/2

curl --http2-prior-knowledge -X POST -H "Content-Type: application/json" -d '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

On Windows, Content-Type: application/json, HTTP/2

curl --http2-prior-knowledge -X POST -H "Content-Type: application/json" -d '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

WSGI Support

Connecpy now provides WSGI support via the ConnecpyWSGIApp. This synchronous application adapts our service endpoints to the WSGI specification. It reads requests from the WSGI environ, processes POST requests, and returns responses using start_response. This enables integration with legacy WSGI servers and middleware.

Please see the example in the example directory.

Compression Support

Connecpy supports various compression methods for both GET and POST requests:

  • gzip
  • brotli (br)
  • zstandard (zstd)
  • identity (no compression)

For GET requests, specify the compression method using the compression query parameter:

curl "http://localhost:3000/service/method?compression=gzip&message=..."

For POST requests, use the Content-Encoding header:

curl -H "Content-Encoding: br" -d '{"data": "..."}' http://localhost:3000/service/method

The compression is handled directly in the request handlers, ensuring consistent behavior across HTTP methods and frameworks (ASGI/WSGI).

With Connecpy's compression features, you can automatically handle compressed requests and responses. Here are some examples:

Server-side

The compression handling is built into both ASGI and WSGI applications. You don't need any additional middleware configuration - it works out of the box!

Client-side

For synchronous clients:

from connecpy.context import ClientContext

client = HaberdasherClient(server_url)
response = client.MakeHat(
    ctx=ClientContext(
        headers={
            "Content-Encoding": "br",  # Use Brotli compression for request
            "Accept-Encoding": "gzip",  # Accept gzip compressed response
        }
    ),
    request=request_obj,
)

For async clients:

async with httpx.AsyncClient() as session:
    client = AsyncHaberdasherClient(server_url, session=session)
    response = await client.MakeHat(
        ctx=ClientContext(),
        request=request_obj,
        headers={
            "Content-Encoding": "zstd",  # Use Zstandard compression for request
            "Accept-Encoding": "br",     # Accept Brotli compressed response
        },
    )

Using GET requests with compression:

response = client.MakeHat(
    ctx=ClientContext(),
    request=request_obj,
    use_get=True,  # Enable GET request (for methods marked with no_side_effects)
    params={
        "compression": "gzip",  # Use gzip compression for the message
    }
)

CORS Support

Connecpy provides built-in CORS support via the CORSMiddleware. By default, it allows all origins and includes necessary Connect Protocol headers:

from connecpy.cors import CORSMiddleware

app = ConnecpyASGIApp()
app.add_service(service)
app = CORSMiddleware(app)  # Use default configuration

You can customize the CORS behavior using CORSConfig:

from connecpy.cors import CORSMiddleware, CORSConfig

config = CORSConfig(
    allow_origin="https://your-domain.com",           # Restrict allowed origins
    allow_methods=("POST", "GET", "OPTIONS"),         # Customize allowed methods
    allow_headers=(                                   # Customize allowed headers
        "Content-Type",
        "Connect-Protocol-Version",
        "X-Custom-Header",
    ),
    access_control_max_age=3600,                     # Set preflight cache duration
)

app = CORSMiddleware(app, config=config)

The middleware handles both preflight requests (OPTIONS) and adds appropriate CORS headers to responses.

Connect Protocol

Connecpy protoc plugin generates the code based on Connect Protocl from the .proto files. Currently, Connecpy supports only Unary RPCs using the POST HTTP method. Connecpy will support other types of RPCs as well, in the near future.

Misc

Server Path Prefix

You can set server path prefix by passing server_path_prefix to ConnecpyASGIApp constructor.

This example sets server path prefix to /foo/bar.

# server.py
service = haberdasher_connecpy.HaberdasherServer(
    service=HaberdasherService(),
    server_path_prefix="/foo/bar",
)
# async_client.py
response = await client.MakeHat(
    ctx=ClientContext(),
    request=haberdasher_pb2.Size(inches=12),
    server_path_prefix="/foo/bar",
)

Interceptor (Server Side)

ConnecpyASGIApp supports interceptors. You can add interceptors by passing interceptors to ConnecpyASGIApp constructor. AsyncConnecpyServerInterceptor

# server.py
from typing import Any, Callable

from connecpy import context
from connecpy.asgi import ConnecpyASGIApp
from connecpy.interceptor import AsyncConnecpyServerInterceptor

import haberdasher_connecpy
from service import HaberdasherService


class MyInterceptor(AsyncConnecpyServerInterceptor):
    def __init__(self, msg):
        self._msg = msg

    async def intercept(
        self,
        method: Callable,
        request: Any,
        ctx: context.ServiceContext,
        method_name: str,
    ) -> Any:
        print("intercepting " + method_name + " with " + self._msg)
        return await method(request, ctx)


my_interceptor_a = MyInterceptor("A")
my_interceptor_b = MyInterceptor("B")

service = haberdasher_connecpy.HaberdasherServer(service=HaberdasherService())
app = ConnecpyASGIApp(
    interceptors=(my_interceptor_a, my_interceptor_b),
)
app.add_service(service)

Btw, ConnecpyServerInterceptor's intercept method has compatible signature as intercept method of grpc_interceptor.server.AsyncServerInterceptor, so you might be able to convert Connecpy interceptors to gRPC interceptors by just changing the import statement and the parent class.

ASGI Middleware

You can also use any ASGI middlewares with ConnecpyASGIApp. The example bleow uses brotli-asgi.

# server.py
from brotli_asgi import BrotliMiddleware
from connecpy import context
from connecpy.asgi import ConnecpyASGIApp

import haberdasher_connecpy
from service import HaberdasherService

service = haberdasher_connecpy.HaberdasherServer(
    service=HaberdasherService()
)
app = ConnecpyASGIApp()
app.add_service(service)

app = BrotliMiddleware(app, minimum_size=1000)

With Connecpy’s compression features, you can automatically compress data via gzip, brotli, zstd, and more. By combining them with CompressionMiddleware, you can return compressed responses with no additional configuration:

from connecpy.asgi import ConnecpyASGIApp
from connecpy.compression import CompressionMiddleware

app = ConnecpyASGIApp()
# ...your services and interceptors here...
app = CompressionMiddleware(app)

Additionally, if you want to incorporate your own custom encodings, you can do so by using GenericEncodingMiddleware:

from connecpy.compression import GenericEncodingMiddleware

middleware_map = {
    "gzip": SomeGzipLikeMiddleware,
    "custom-enc": MyCustomEncoder,
}
app = GenericEncodingMiddleware(app, middleware_map)

If you want to configure CORS more easily, you can introduce CORSMiddleware and control origins/headers via CORSConfig. For example, here’s how you might allow all origins:

from connecpy.cors import CORSMiddleware

app = CORSMiddleware(app)

gRPC Compatibility

In Connecpy, unlike connect-go, it is not possible to simultaneously support both gRPC and Connect RPC on the same server and port. In addition to it, Connecpy itself doesn't support gRPC. However, implementing a gRPC server using the same service code used for Connecpy server is feasible, as shown below. This is possible because the type signature of the service class in Connecpy is compatible with type signature gRPC farmework requires. The example below uses grpc.aio and there are in example dicrectory.

# grpc_server.py
import asyncio

from grpc.aio import server

import haberdasher_pb2_grpc

# same service.py as the one used in previous server.py
from service import HaberdasherService

host = "localhost:50051"


async def main():
    s = server()
    haberdasher_pb2_grpc.add_HaberdasherServicer_to_server(HaberdasherService(), s)
    bound_port = s.add_insecure_port(host)
    print(f"localhost:{bound_port}")
    await s.start()
    await s.wait_for_termination()


if __name__ == "__main__":
    asyncio.run(main())
# grpc_client.py
import asyncio

from grpc.aio import insecure_channel

import haberdasher_pb2
import haberdasher_pb2_grpc


target = "localhost:50051"


async def main():
    channel = insecure_channel(target)
    stub = haberdasher_pb2_grpc.HaberdasherStub(channel)
    request = haberdasher_pb2.Size(inches=12)
    response = await stub.MakeHat(request)
    print(response)


if __name__ == "__main__":
    asyncio.run(main())

Message Body Length

Currently, message body length limit is set to 100kb, you can override this by passing max_receive_message_length to ConnecpyASGIApp constructor.

# this sets max message length to be 10 bytes
app = ConnecpyASGIApp(max_receive_message_length=10)

Standing on the shoulders of giants

The initial version (1.0.0) of this software was created by modifying https://github.com/verloop/twirpy at January 4, 2024, so that it supports Connect Protocol. Therefore, this software is also licensed under Unlicense same as twirpy.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

connecpy-1.4.1.tar.gz (85.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

connecpy-1.4.1-py3-none-any.whl (26.9 kB view details)

Uploaded Python 3

File details

Details for the file connecpy-1.4.1.tar.gz.

File metadata

  • Download URL: connecpy-1.4.1.tar.gz
  • Upload date:
  • Size: 85.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.26

File hashes

Hashes for connecpy-1.4.1.tar.gz
Algorithm Hash digest
SHA256 c8b8a48acf80cabccd03d98b6ababdda6cd672e51e54ffc84fa8012a2e94b617
MD5 1dca6a32eaab8988af0572edb7fffc1e
BLAKE2b-256 a5ae76ff7bf110ae2e8d6f2eaee30bfb9fe27e406735bee71ad7c3cc704b8b10

See more details on using hashes here.

File details

Details for the file connecpy-1.4.1-py3-none-any.whl.

File metadata

  • Download URL: connecpy-1.4.1-py3-none-any.whl
  • Upload date:
  • Size: 26.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.26

File hashes

Hashes for connecpy-1.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d7570fcfa076b008e477f6ec646ba0a64c4dc523ce8f161c9c699ee9238499ab
MD5 88ac59ceb7ac723225ff0cbae01d7feb
BLAKE2b-256 edb7fe625cc186df9acb46c88ffda3247abb7077498c487be96aee5fa1a55dd3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page