Skip to main content

Server library and client library for the Connect Protocol

Project description

Connecpy

License: MIT CI codecov PyPI version

Python implementation of Connect Protocol.

This repo contains a protoc plugin that generates sever and client code and a pypi package with common implementation details.

Installation

Requirements

  • Python 3.10 or later

Install the protoc plugin

You can install the protoc plugin using one of these methods:

Option 1: Quick Install (Linux/macOS) - Recommended

Install with a single command:

curl -sSL https://raw.githubusercontent.com/i2y/connecpy/main/install.sh | bash

You can customize the installation with environment variables:

# Install to a custom directory
curl -sSL https://raw.githubusercontent.com/i2y/connecpy/main/install.sh | PROTOC_GEN_CONNECPY_INSTALL=$HOME/.local/bin bash

# Install a specific version
curl -sSL https://raw.githubusercontent.com/i2y/connecpy/main/install.sh | VERSION=v2.2.0 bash

# Combine multiple options
curl -sSL https://raw.githubusercontent.com/i2y/connecpy/main/install.sh | PROTOC_GEN_CONNECPY_INSTALL=$HOME/.local/bin VERSION=v2.2.0 bash

Option 2: Manual Download

  1. Download the appropriate archive for your platform from GitHub Releases:

    • Linux AMD64: protoc-gen-connecpy_VERSION_linux_amd64.tar.gz
    • Linux ARM64: protoc-gen-connecpy_VERSION_linux_arm64.tar.gz
    • macOS Intel: protoc-gen-connecpy_VERSION_darwin_amd64.tar.gz
    • macOS Apple Silicon: protoc-gen-connecpy_VERSION_darwin_arm64.tar.gz
    • Windows AMD64: protoc-gen-connecpy_VERSION_windows_amd64.zip
    • Windows ARM64: protoc-gen-connecpy_VERSION_windows_arm64.zip
  2. Extract and install the binary:

    Linux/macOS:

    # Extract the archive
    tar -xzf protoc-gen-connecpy_*.tar.gz
    
    # Make executable (if needed)
    chmod +x protoc-gen-connecpy
    
    # Move to a directory in PATH
    sudo mv protoc-gen-connecpy /usr/local/bin/
    # Or for user-only installation:
    # mkdir -p ~/.local/bin
    # mv protoc-gen-connecpy ~/.local/bin/
    # Make sure ~/.local/bin is in your PATH
    

    Windows (PowerShell):

    # Extract the archive
    Expand-Archive protoc-gen-connecpy_*.zip -DestinationPath .
    
    # Move to a directory in PATH, for example:
    Move-Item protoc-gen-connecpy.exe C:\Tools\
    # Then add C:\Tools to your PATH environment variable:
    # [System.Environment]::SetEnvironmentVariable("PATH", $env:PATH + ";C:\Tools", "User")
    
  3. Verify installation:

    # Check if the plugin is accessible
    which protoc-gen-connecpy  # Linux/macOS
    where protoc-gen-connecpy  # Windows
    
    # Test with protoc (requires a .proto file)
    protoc --connecpy_out=. --connecpy_opt=paths=source_relative test.proto
    

Option 3: Install with Go

If you have Go installed, you can install the plugin directly:

go install github.com/i2y/connecpy/v2/protoc-gen-connecpy@latest

This will install the binary to $GOPATH/bin (or $HOME/go/bin if GOPATH is not set). Make sure this directory is in your PATH.

Install the Python package

Additionally, please add the connecpy package to your project using your preferred package manager. For instance, with uv, use the command:

uv add connecpy

or

pip install connecpy

Server runtime dependencies

To run the server, you'll need one of the following: Uvicorn, Daphne, or Hypercorn. If your goal is to support both HTTP/1.1 and HTTP/2, you should opt for either Daphne or Hypercorn. Additionally, to test the server, you might need a client command, such as buf.

Generate and run

Use the protoc plugin to generate connecpy server and client code.

Using Buf (v2) - Recommended

Create a buf.gen.yaml file:

version: v2
plugins:
  - remote: buf.build/protocolbuffers/python
    out: gen
  - remote: buf.build/protocolbuffers/pyi
    out: gen
  - local: protoc-gen-connecpy
    out: gen

Then run:

buf generate

Using protoc directly

protoc --python_out=./ --pyi_out=./ --connecpy_out=./ ./haberdasher.proto

By default, naming follows PEP8 conventions. To use Google conventions, matching the output of grpc-python, add --connecpy_opt=naming=google. By default, imports are generated absolutely based on the proto package name. To use relative import, add --connecpy_opt=imports=relative.

Server code (ASGI)

# service.py
import random

from connecpy.code import Code
from connecpy.exceptions import ConnecpyException
from connecpy.request import RequestContext

from haberdasher_pb2 import Hat, Size


class HaberdasherService:
    async def make_hat(self, req: Size, ctx: RequestContext) -> Hat:
        print("remaining_time: ", ctx.timeout_ms())
        if req.inches <= 0:
            raise ConnecpyException(
                Code.INVALID_ARGUMENT, "inches: I can't make a hat that small!"
            )
        response = Hat(
            size=req.inches,
            color=random.choice(["white", "black", "brown", "red", "blue"]),
        )
        if random.random() > 0.5:
            response.name = random.choice(
                ["bowler", "baseball cap", "top hat", "derby"]
            )

        return response
# server.py
from haberdasher_connecpy import HaberdasherASGIApplication
from service import HaberdasherService

app = HaberdasherASGIApplication(
    HaberdasherService()
)

Run the server with

uvicorn --port=3000 server:app

or

daphne --port=3000 server:app

or

hypercorn --bind :3000 server:app

Client code (Asyncronous)

# async_client.py
import asyncio

import httpx

from connecpy.exceptions import ConnecpyException

from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size, Hat


server_url = "http://localhost:3000"
timeout_s = 5


async def main():
    async with httpx.AsyncClient(
        base_url=server_url,
        timeout=timeout_s,
    ) as session:
        async with HaberdasherClient(server_url, session=session) as client:
            try:
                response = await client.make_hat(
                    Size(inches=12),
                )
                if not response.HasField("name"):
                    print("We didn't get a name!")
                print(response)
            except ConnecpyException as e:
                print(e.code, e.message)


if __name__ == "__main__":
    asyncio.run(main())

Example output :

size: 12
color: "black"
name: "bowler"

Client code (Synchronous)

# client.py
from connecpy.exceptions import ConnecpyException

from haberdasher_connecpy import HaberdasherClientSync
from haberdasher_pb2 import Size, Hat


server_url = "http://localhost:3000"
timeout_s = 5


def main():
    with HaberdasherClientSync(server_url, timeout_ms=timeout_s * 1000) as client:
        try:
            response = client.make_hat(
                Size(inches=12),
            )
            if not response.HasField("name"):
                print("We didn't get a name!")
            print(response)
        except ConnecpyException as e:
            print(e.code, e.message)


if __name__ == "__main__":
    main()

Streaming RPCs

Connecpy supports streaming RPCs in addition to unary RPCs. Here are examples of each type:

Server Streaming RPC

In server streaming RPCs, the client sends a single request and receives multiple responses from the server.

Server implementation

# service.py
from typing import AsyncIterator
from connecpy.request import RequestContext
from haberdasher_pb2 import Hat, Size

class HaberdasherService:
    async def make_similar_hats(self, req: Size, ctx: RequestContext) -> AsyncIterator[Hat]:
        """Server Streaming: Returns multiple hats of similar size"""
        for i in range(3):
            yield Hat(
                size=req.inches + i,
                color=["red", "green", "blue"][i],
                name=f"hat #{i+1}"
            )

Client implementation (Async)

# async_client.py
import asyncio
import httpx
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size, Hat

async def main():
    async with httpx.AsyncClient(base_url="http://localhost:3000") as session:
        async with HaberdasherClient(
            "http://localhost:3000",
            session=session
        ) as client:
            # Server streaming: receive multiple responses
            hats = []
            stream = client.make_similar_hats(
                Size(inches=12, description="summer hat")
            )
            async for hat in stream:
                print(f"Received: {hat.color} {hat.name} (size {hat.size})")
                hats.append(hat)
            print(f"Total hats received: {len(hats)}")

if __name__ == "__main__":
    asyncio.run(main())

Client implementation (Sync)

# sync_client.py
import httpx
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClientSync
from haberdasher_pb2 import Size, Hat

def main():
    with httpx.Client(base_url="http://localhost:3000") as session:
        with HaberdasherClientSync(
            "http://localhost:3000",
            session=session
        ) as client:
            # Server streaming: receive multiple responses
            hats = []
            stream = client.make_similar_hats(
                Size(inches=12, description="winter hat")
            )
            for hat in stream:
                print(f"Received: {hat.color} {hat.name} (size {hat.size})")
                hats.append(hat)
            print(f"Total hats received: {len(hats)}")

if __name__ == "__main__":
    main()

Client Streaming RPC

In client streaming RPCs, the client sends multiple requests and receives a single response from the server.

Proto definition example

service ExampleService {
  rpc CollectSizes(stream Size) returns (Summary);
}

message Summary {
  int32 total_count = 1;
  float average_size = 2;
}

Server implementation

# service.py
from typing import AsyncIterator
from connecpy.request import RequestContext
from example_pb2 import Size, Summary

class ExampleService:
    async def collect_sizes(
        self,
        req: AsyncIterator[Size],
        ctx: RequestContext
    ) -> Summary:
        """Client Streaming: Collect multiple sizes and return summary"""
        sizes = []
        async for size_msg in req:
            sizes.append(size_msg.inches)

        if not sizes:
            return Summary(total_count=0, average_size=0)

        return Summary(
            total_count=len(sizes),
            average_size=sum(sizes) / len(sizes)
        )

Client implementation (Async)

# async_client.py
import asyncio
from typing import AsyncIterator
import haberdasher_pb2
from haberdasher_connecpy import ExampleClient

async def send_sizes() -> AsyncIterator[haberdasher_pb2.Size]:
    """Generator to send multiple sizes to the server"""
    sizes_to_send = [10, 12, 14, 16, 18]
    for size in sizes_to_send:
        yield haberdasher_pb2.Size(inches=size)
        await asyncio.sleep(0.1)  # Simulate some delay

async def main():
    async with ExampleClient(
        "http://localhost:3000",
        session=session
    ) as client:
        # Client streaming: send multiple requests
        summary = await client.collect_sizes(send_sizes())
        print(f"Summary: {summary.total_count} sizes, average: {summary.average_size}")

Client implementation (Sync)

# sync_client.py
from typing import Iterator
import haberdasher_pb2
from haberdasher_connecpy import ExampleClientSync

def send_sizes() -> Iterator[haberdasher_pb2.Size]:
    """Generator to send multiple sizes to the server"""
    sizes_to_send = [10, 12, 14, 16, 18]
    for size in sizes_to_send:
        yield haberdasher_pb2.Size(inches=size)

def main():
    with ExampleClientSync(
        "http://localhost:3000",
        session=session
    ) as client:
        # Client streaming: send multiple requests
        summary = client.collect_sizes(send_sizes())
        print(f"Summary: {summary.total_count} sizes, average: {summary.average_size}")

Bidirectional Streaming RPC

Bidirectional streaming RPCs allow both client and server to send multiple messages to each other. Connecpy supports both:

  • Full-duplex bidirectional streaming: Client and server can send and receive messages simultaneously
  • Half-duplex bidirectional streaming: Client finishes sending all requests before the server starts sending responses

Important Notes on Streaming

  1. Server Implementation:

    • HTTP/2 ASGI servers (hypercorn, daphne): Support all streaming types including full-duplex bidirectional streaming
    • HTTP/1 ASGI/WSGI servers: Support unary, server streaming, client streaming, and half-duplex bidirectional streaming
    • Note: Full-duplex bidirectional streaming is not supported by WSGI servers due to protocol limitations (WSGI servers read the entire request before processing)
  2. Client Support:

    • Both ConnecpyClient (async) and ConnecpyClientSync support receiving streaming responses
    • Both support sending streaming requests (client streaming)
  3. Resource Management: When using streaming responses, resource will be cleaned up when the returned iterator completes or is garbage collected.

  4. Error Handling: Streaming RPCs can raise exceptions during iteration:

    # Async version
    try:
        async for message in stream:
            process(message)
    except ConnecpyException as e:
        print(f"Stream error: {e.code} - {e.message}")
    
    # Sync version
    try:
        for message in stream:
            process(message)
    except ConnecpyException as e:
        print(f"Stream error: {e.code} - {e.message}")
    
  5. Bidirectional Streaming:

    • Both full-duplex and half-duplex modes are supported
    • In full-duplex mode, client and server can send/receive messages simultaneously
    • In half-duplex mode, client must finish sending before server starts responding

Other clients

Of course, you can use any HTTP client to make requests to a Connecpy server. For example, commands like curl or buf curl can be used, as well as HTTP client libraries such as requests, httpx, aiohttp, and others. The examples below use curl and buf curl.

Content-Type: application/proto, HTTP/1.1

buf curl --data '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --schema ./haberdasher.proto

On Windows, Content-Type: application/proto, HTTP/1.1

buf curl --data '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --schema .\haberdasher.proto

Content-Type: application/proto, HTTP/2

buf curl --data '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --http2-prior-knowledge --schema ./haberdasher.proto

On Windows, Content-Type: application/proto, HTTP/2

buf curl --data '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat --http2-prior-knowledge --schema .\haberdasher.proto

Content-Type: application/json, HTTP/1.1

curl -X POST -H "Content-Type: application/json" -d '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

On Windows, Content-Type: application/json, HTTP/1.1

curl -X POST -H "Content-Type: application/json" -d '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

Content-Type: application/json, HTTP/2

curl --http2-prior-knowledge -X POST -H "Content-Type: application/json" -d '{"inches": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

On Windows, Content-Type: application/json, HTTP/2

curl --http2-prior-knowledge -X POST -H "Content-Type: application/json" -d '{\"inches\": 12}' -v http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat

WSGI Support

Connecpy provides full WSGI support via the ConnecpyWSGIApplication. This synchronous application adapts our service endpoints to the WSGI specification. It reads requests from the WSGI environ, processes requests, and returns responses using start_response. This enables integration with WSGI servers and middleware.

WSGI Streaming Support

Starting from version 2.1.0, WSGI applications now support streaming RPCs! This includes server streaming, client streaming, and half-duplex bidirectional streaming. Here's how to implement streaming with WSGI:

# service_sync.py
from typing import Iterator
from connecpy.request import RequestContext
from haberdasher_pb2 import Hat, Size

class HaberdasherServiceSync:
    def make_hat(self, req: Size, ctx: RequestContext) -> Hat:
        """Unary RPC"""
        return Hat(size=req.inches, color="red", name="fedora")

    def make_similar_hats(self, req: Size, ctx: RequestContext) -> Iterator[Hat]:
        """Server Streaming RPC - returns multiple hats"""
        for i in range(3):
            yield Hat(
                size=req.inches + i,
                color=["red", "green", "blue"][i],
                name=f"hat #{i+1}"
            )

    def collect_sizes(self, req: Iterator[Size], ctx: RequestContext) -> Hat:
        """Client Streaming RPC - receives multiple sizes, returns one hat"""
        sizes = []
        for size_msg in req:
            sizes.append(size_msg.inches)

        avg_size = sum(sizes) / len(sizes) if sizes else 0
        return Hat(size=int(avg_size), color="average", name="custom")

    def make_various_hats(self, req: Iterator[Size], ctx: RequestContext) -> Iterator[Hat]:
        """Bidirectional Streaming RPC (half-duplex only for WSGI)"""
        # Note: In WSGI, all requests are received before sending responses
        for size_msg in req:
            yield Hat(size=size_msg.inches, color="custom", name=f"size-{size_msg.inches}")
# wsgi_server.py
from haberdasher_connecpy import HaberdasherWSGIApplication
from service_sync import HaberdasherServiceSync

app = HaberdasherWSGIApplication(
    HaberdasherServiceSync()
)

if __name__ == "__main__":
    from werkzeug.serving import run_simple
    run_simple("localhost", 3000, app)

Please see the complete example in the example directory.

Compression Support

Connecpy supports various compression methods for both GET and POST requests/responses:

  • gzip
  • brotli (br)
  • zstandard (zstd)
  • identity (no compression)

For GET requests, specify the compression method using the compression query parameter:

curl "http://localhost:3000/service/method?compression=gzip&message=..."

For POST requests, use the Content-Encoding header:

curl -H "Content-Encoding: br" -d '{"data": "..."}' http://localhost:3000/service/method

The compression is handled directly in the request handlers, ensuring consistent behavior across HTTP methods and frameworks (ASGI/WSGI).

With Connecpy's compression features, you can automatically handle compressed requests and responses. Here are some examples:

Server-side

The compression handling is built into both ASGI and WSGI applications. You don't need any additional middleware configuration - it works out of the box!

Client-side

For async clients:

from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size

async with HaberdasherClient(
    server_url,
    send_compression="br",
    accept_compression=["gzip"]
) as client:
    response = await client.make_hat(
        Size(inches=12)
    )

For synchronous clients:

from haberdasher_connecpy import HaberdasherClientSync
from haberdasher_pb2 import Size

with HaberdasherClientSync(
    server_url,
    send_compression="zstd",  # Use Zstandard compression for request
    accept_compression=["br"]  # Accept Brotli compressed response
) as client:
    response = client.make_hat(
        Size(inches=12)
    )

Connect GET Support

Connecpy automatically enables GET request support for methods marked with idempotency_level = NO_SIDE_EFFECTS in your proto files. This follows the Connect Protocol's GET specification.

Proto Definition

Mark methods as side-effect-free in your .proto file:

service Haberdasher {
  // This method will support both GET and POST requests
  rpc MakeHat(Size) returns (Hat) {
    option idempotency_level = NO_SIDE_EFFECTS;
  }

  // This method only supports POST requests (default)
  rpc UpdateHat(Hat) returns (Hat);
}

Using GET Requests from Clients

When a method is marked with NO_SIDE_EFFECTS, the generated client code includes a use_get parameter:

from haberdasher_connecpy import HaberdasherClient, HaberdasherClientSync
from haberdasher_pb2 import Size

# Async client using GET request
async with HaberdasherClient(server_url, session=session) as client:
    response = await client.make_hat(
        Size(inches=12),
        use_get=True  # Use GET instead of POST
    )

# Sync client using GET request
with HaberdasherClientSync(server_url) as client:
    response = client.make_hat(
        Size(inches=12),
        use_get=True  # Use GET instead of POST
    )

Server-Side Implementation

The generated server code automatically configures GET support based on the proto definition. Methods with NO_SIDE_EFFECTS will have allowed_methods=("GET", "POST") while others will have allowed_methods=("POST",) only.

Manual GET Requests

You can also make GET requests directly using curl or other HTTP clients:

# GET request with query parameters (base64-encoded message)
curl "http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat?encoding=proto&message=CgwI..."

# GET request with compression
curl "http://localhost:3000/i2y.connecpy.example.Haberdasher/MakeHat?encoding=proto&compression=gzip&message=..."

Note: GET support is particularly useful for:

  • Cacheable operations
  • Browser-friendly APIs
  • Read-only operations that don't modify server state

CORS Support

ConnecpyASGIApp is a standard ASGI application meaning any CORS ASGI middleware will work well with it, for example starlette.middleware.cors.CORSMiddleware. Refer to Connect Docs for standard headers commonly used by Connect clients for CORS negotiation and a full example using Starlette.

Connect Protocol

Connecpy protoc plugin generates the code based on Connect Protocol from the .proto files.

Supported RPC Types

Connecpy supports the following RPC types:

  • Unary RPCs - Single request/response
  • Server Streaming RPCs - Single request, multiple responses
  • Client Streaming RPCs - Multiple requests, single response
  • Bidirectional Streaming RPCs - Multiple requests and responses (both full-duplex and half-duplex)

Proto Editions Support

Starting from version 2.0.0, protoc-gen-connecpy supports Proto Editions 2023. You can use the new editions syntax in your .proto files:

edition = "2023";

package example;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

The code generation works the same way as with proto2/proto3 syntax. Note that you'll need protoc version 26.0 or later to use editions syntax.

Misc

Routing

Connecpy applications are standard WSGI or ASGI applications. For complex routing requirements, you can use a routing framework such as werkzeug or starlette.

The generated application classes now expose a path property that returns the service's URL path, making it easier to mount multiple services:

from haberdasher_connecpy import HaberdasherWSGIApplication

haberdasher_app = HaberdasherWSGIApplication(service)
print(haberdasher_app.path)  # "/package.ServiceName"

# Use with routing frameworks
app.wsgi_app = DispatcherMiddleware(
    app.wsgi_app,
    {
        haberdasher_app.path: haberdasher_app,
    },
)

Server-Side Interceptors

Connecpy supports server-side interceptors for both ASGI and WSGI applications. Interceptors allow you to add cross-cutting concerns like logging, authentication, and metrics to your services.

Type-Safe Interceptors

Connecpy provides type-safe interceptors for each RPC type:

# server.py
from typing import Awaitable, Callable, AsyncIterator
from connecpy.request import RequestContext
from haberdasher_pb2 import Size, Hat
from haberdasher_connecpy import HaberdasherASGIApplication
from service import HaberdasherService

# Single interceptor class handling multiple RPC types
class LoggingInterceptor:
    async def intercept_unary(
        self,
        next: Callable[[Size, RequestContext], Awaitable[Hat]],
        request: Size,
        ctx: RequestContext,
    ) -> Hat:
        print(f"Unary RPC: {ctx.method().name}, size={request.inches}")
        response = await next(request, ctx)
        print(f"Response sent: {response.color} hat")
        return response

    async def intercept_server_stream(
        self,
        next: Callable[[Size, RequestContext], AsyncIterator[Hat]],
        request: Size,
        ctx: RequestContext,
    ) -> AsyncIterator[Hat]:
        print(f"Server streaming RPC: {ctx.method().name}, size={request.inches}")
        async for response in next(request, ctx):
            print(f"Streaming: {response.color} hat (size {response.size})")
            yield response

# ASGI application with interceptors
app = HaberdasherASGIApplication(
    HaberdasherService(),
    interceptors=[LoggingInterceptor()]  # Single interceptor handles both unary and streaming
)

Metadata Interceptors

For simple cross-cutting concerns that don't need access to request/response bodies, use MetadataInterceptor:

import time
from connecpy.interceptor import MetadataInterceptor

# Simple timing interceptor
class TimingInterceptor(MetadataInterceptor[float]):
    async def on_start(self, ctx: RequestContext) -> float:
        print(f"Starting {ctx.method().name}")
        return time.time()

    async def on_end(self, start_time: float, ctx: RequestContext) -> None:
        elapsed = time.time() - start_time
        print(f"{ctx.method().name} took {elapsed:.3f}s")

# Works with all RPC types!
app = HaberdasherASGIApplication(
    HaberdasherService(),
    interceptors=[TimingInterceptor()]
)

Synchronous Interceptors (WSGI)

WSGI applications support synchronous interceptors:

from typing import Callable
from connecpy.interceptor import MetadataInterceptorSync
from connecpy.request import RequestContext
from haberdasher_pb2 import Size, Hat
from haberdasher_connecpy import HaberdasherWSGIApplication

class LoggingInterceptorSync:
    def intercept_unary_sync(
        self,
        next: Callable[[Size, RequestContext], Hat],
        request: Size,
        ctx: RequestContext,
    ) -> Hat:
        print(f"Sync RPC: {ctx.method().name}, size={request.inches}")
        return next(request, ctx)

class TimingInterceptorSync(MetadataInterceptorSync[float]):
    def on_start_sync(self, ctx: RequestContext) -> float:
        return time.time()

    def on_end_sync(self, start_time: float, ctx: RequestContext) -> None:
        elapsed = time.time() - start_time
        print(f"{ctx.method().name} took {elapsed:.3f}s")

# WSGI application with interceptors
wsgi_app = HaberdasherWSGIApplication(
    HaberdasherServiceSync(),
    interceptors=[LoggingInterceptorSync(), TimingInterceptorSync()]
)

Available Interceptor Types

Interceptor Type Use Case ASGI WSGI
UnaryInterceptor / UnaryInterceptorSync Unary RPCs
ClientStreamInterceptor / ClientStreamInterceptorSync Client streaming RPCs
ServerStreamInterceptor / ServerStreamInterceptorSync Server streaming RPCs
BidiStreamInterceptor / BidiStreamInterceptorSync Bidirectional streaming RPCs
MetadataInterceptor / MetadataInterceptorSync All RPC types (metadata only)

Interceptor Execution Order

Interceptors are executed in the order they are provided. For example, if you provide [A, B, C], the execution order will be:

  • A.on_start → B.on_start → C.on_start → handler → C.on_end → B.on_end → A.on_end

Client-Side Interceptors

Connecpy supports client-side interceptors, allowing you to add cross-cutting concerns to your client requests:

# async_client_with_interceptor.py
import asyncio
from connecpy.exceptions import ConnecpyException
from haberdasher_connecpy import HaberdasherClient
from haberdasher_pb2 import Size, Hat

class LoggingInterceptor:
    """Interceptor that logs all requests and responses"""

    async def intercept_unary(self, next, request, ctx):
        print(f"[LOG] Calling {ctx.method().name} with request: {request}")
        try:
            response = await next(request, ctx)
            print(f"[LOG] Received response: {response}")
            return response
        except Exception as e:
            print(f"[LOG] Error: {e}")
            raise

async def main():
    # Create client with interceptors
    client = HaberdasherClient(
        "http://localhost:3000",
        interceptors=[LoggingInterceptor()]
    )

    try:
        response = await client.make_hat(
            Size(inches=12)
        )
        print(response)
    finally:
        await client.close()

if __name__ == "__main__":
    asyncio.run(main())

Client interceptors support all RPC types (unary, client streaming, server streaming, and bidirectional streaming) and work with both async and sync clients.

Message Size Limits

Connecpy allows you to limit incoming message sizes to protect against resource exhaustion. By default, there is no limit, but you can set one by passing read_max_bytes to the application constructor:

from haberdasher_connecpy import HaberdasherASGIApplication, HaberdasherWSGIApplication

# Set maximum message size to 1MB for ASGI applications
app = HaberdasherASGIApplication(
    service,
    read_max_bytes=1024 * 1024  # 1MB
)

# Set maximum message size for WSGI applications
wsgi_app = HaberdasherWSGIApplication(
    service_sync,
    read_max_bytes=1024 * 1024  # 1MB
)

# Disable message size limit (not recommended for production)
app = HaberdasherASGIApplication(
    service,
    read_max_bytes=None
)

When a message exceeds the configured limit, the server will return a RESOURCE_EXHAUSTED error to the client.

Standing on the shoulders of giants

The initial version (1.0.0) of this software was created by modifying https://github.com/verloop/twirpy at January 4, 2024, so that it supports Connect Protocol.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

connecpy-2.3.0.tar.gz (42.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

connecpy-2.3.0-py3-none-any.whl (46.3 kB view details)

Uploaded Python 3

File details

Details for the file connecpy-2.3.0.tar.gz.

File metadata

  • Download URL: connecpy-2.3.0.tar.gz
  • Upload date:
  • Size: 42.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for connecpy-2.3.0.tar.gz
Algorithm Hash digest
SHA256 2bd8e91f2b1bcb9afbdc8dc3727b0cfce54fd94da58046d0f219661d3dd208c1
MD5 17991fe8a5f9ffd26f67e82cc23671b3
BLAKE2b-256 cd140d35aefbff665e821d7b4592f03e669300e90e0d2bc54d5edb9a1c22ce77

See more details on using hashes here.

Provenance

The following attestation bundles were made for connecpy-2.3.0.tar.gz:

Publisher: release.yaml on i2y/connecpy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file connecpy-2.3.0-py3-none-any.whl.

File metadata

  • Download URL: connecpy-2.3.0-py3-none-any.whl
  • Upload date:
  • Size: 46.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for connecpy-2.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f20c6a8a631dec60fb4cb01d103201f13395a2e7331347bace6ffdc62dbb77d9
MD5 e1d4e232b93b9edbdffa458e80065561
BLAKE2b-256 d3b1faca795b7355aff9be950d4cbaeb3c8d66766fd79cf035232846a5143ba2

See more details on using hashes here.

Provenance

The following attestation bundles were made for connecpy-2.3.0-py3-none-any.whl:

Publisher: release.yaml on i2y/connecpy

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page