Skip to main content

A framework for communication between machine apps and other services.

Project description

Vention Communication

A thin, FastAPI-powered RPC layer for machine-apps that exposes Connect-compatible request-response and server-streaming endpoints — plus .proto generation from Python decorators, allowing typed SDKs to be generated separately via Buf.

Table of Contents

✨ Features

  • Zero boilerplate RPCs: Expose any async Python function as a network API with a single decorator.
  • Strong typing: Request and response models derived directly from Python annotations.
  • Built-in schema emission: Generates a .proto file at runtime, ready for SDK code generation.
  • Single service surface: All methods exposed under /rpc/<package.Service>/<Method>.
  • Connect-compatible transport: Works seamlessly with @connectrpc/connect-web and connectrpc-python.

🧠 Concepts & Overview

The Problem

Machine-app developers write Python daily but shouldn’t need to know REST, gRPC, or frontend networking. They just need a way to say “this function should be callable from anywhere.”

The Solution

vention-communication bridges that gap by turning annotated Python functions into typed RPC endpoints automatically.

@action() → defines a one-request / one-response method.

@stream() → defines a live telemetry or event stream that other services can subscribe to.

VentionApp.finalize() → scans for decorators, builds a Connect router, and emits a .proto schema.

Once the .proto exists, SDKs for TypeScript, Python, or Go can be generated using Buf. The result: your frontend gets auto-completed methods for every RPC, no HTTP or JSON code required.

Core Concepts

  • Actions (Request-Response) — send a request, get a response back. Input and output types are inferred from function annotations. If either is missing, google.protobuf.Empty is used.
  • Streams (Server streaming) — continuous updates broadcast to all subscribers. Each stream can optionally replay the last value when someone subscribes. Queues default to size-1 to always show the latest value.
  • Service Surface — all actions and streams belong to one service, e.g. vention.app.v1.<YourAppName>Service, with routes mounted under /rpc.
  • Proto GenerationVentionApp.finalize() writes a .proto to disk, capturing all decorated RPCs, inferred models, and scalar wrappers. SDK generation (via Buf) is handled externally.

⚙️ Installation & Setup

Requirements:

  • Python 3.10+
  • FastAPI
  • Uvicorn (for serving)

Install:

pip install vention-communication

Optional client libraries:

  • TypeScript: @connectrpc/connect-web
  • Python: connectrpc with httpx.AsyncClient

🚀 Quickstart Tutorial

A complete "hello world" in three steps.

1. Define your RPCs

from pydantic import BaseModel
from communication.app import VentionApp
from communication.decorators import action, stream
import time
import random

class PingRequest(BaseModel):
    message: str

class PingResponse(BaseModel):
    message: str

class Heartbeat(BaseModel):
    value: str
    timestamp: int

app = VentionApp(name="DemoApp", emit_proto=True)

@action()
async def ping(req: PingRequest) -> PingResponse:
    return PingResponse(message=f"Pong: {req.message}")

@stream(name="heartbeat", payload=Heartbeat, replay=True)
async def heartbeat():
    """Broadcast a live heartbeat value to all subscribers."""
    return Heartbeat(value=f"{random.uniform(0,100):.2f}", timestamp=int(time.time()))

app.finalize()

# Emit heartbeat every second
@app.on_event("startup")
async def startup():
    asyncio.create_task(loop())

async def loop():
    while True:
        asyncio.create_task(heartbeat())
        await asyncio.sleep(1)

Run:

uvicorn demo.main:app --reload

Endpoints are automatically registered under /rpc/vention.app.v1.DemoAppService.

2. Generated .proto

After startup, proto/app.proto is emitted automatically.

You can now use Buf or protoc to generate client SDKs, based on each client stack you desire.

The next section will provide an example for Typescript applications, but for any other environment, please refer to the official documentation on how to install and quickstart code generation:

https://buf.build/docs/cli/installation/

3. Example TypeScript Client

Make sure you have Node 24+ installed. Use [NVM](https://github.com/nvm-sh/nvm) to easily install and manage different Node versions.
  1. Create a folder called client and cd into it.

Protobuf Javascript/Typescript libraries

  1. Install the runtime library, code generator, and the Buf CLI:
npm install @bufbuild/protobuf
npm install --save-dev @bufbuild/protoc-gen-es @bufbuild/buf
  1. Create a buf.gen.yaml file that looks like this:
version: v2
inputs:
  - directory: proto
plugins:
  - local: protoc-gen-es
    opt: target=ts
    out: src/gen
  1. Generate the client code, pointing the path to the newly generated proto folder:
npx buf generate ../proto

Client Application

  1. Install the client RPC libraries:
npm i @connectrpc/connect @connectrpc/connect-web
  1. Then create an index.ts file in the client/src folder, and paste the following code in it:
import { createClient } from "@connectrpc/connect";
import { createConnectTransport } from "@connectrpc/connect-web";
import { DemoAppService } from "./gen/connect/app_pb.ts";

const transport = createConnectTransport({
  baseUrl: "http://localhost:8000/rpc",
  useBinaryFormat: false,
});

const client = createClient(DemoAppService, transport);

const res = await client.ping({ message: "Hello" });
console.log(res.message);

for await (const hb of client.heartbeat({})) {
  console.log("Heartbeat", hb.value, hb.timestamp);
}

🛠 How-to Guides

Add a new request-response endpoint

class StatusResponse(BaseModel):
    ok: bool

@action()
async def get_status() -> StatusResponse:
    return {"ok": True}

Add a new stream

@stream(name="Status", payload=dict)
async def publish_status() -> StatusResponse:
    return {"ok": True}

Emit proto to a custom path

app = VentionApp(name="MyService", emit_proto=True, proto_path="out/myservice.proto")
app.finalize()

📖 API Reference

VentionApp

VentionApp(
  name: str = "VentionApp",
  *,
  emit_proto: bool = False,
  proto_path: str = "proto/app.proto",
  **fastapi_kwargs
)

Methods:

  • .register_rpc_plugin(bundle: RpcBundle) — merges external action/stream definitions (e.g., from state-machine or storage).
  • .finalize() — registers routes, emits .proto, and makes publishers available.

Attributes:

  • connect_router: internal FastAPI router for Connect RPCs.
  • proto_path: location of the emitted .proto.

Decorators

@action(name: Optional[str] = None)
# → Registers a request-response handler

@stream(
    name: str,
    payload: type,
    replay: bool = True,
    queue_maxsize: int = 1,
    policy: Literal["latest", "fifo"] = "latest"
)
# → Registers a server-streaming RPC and publisher

Stream Parameters:

  • name: Unique name for the stream
  • payload: Type of data to stream (Pydantic model or JSON-serializable type)
  • replay: Whether new subscribers receive the last value (default: True)
  • queue_maxsize: Maximum items per subscriber queue (default: 1)
  • policy: Delivery policy when queue is full - "latest" drops old items, "fifo" waits for space (default: "latest")

Stream Configuration Options

When creating a stream with @stream(), you can configure how updates are delivered to subscribers:

replay (default: True)

Controls whether new subscribers receive the last published value immediately when they subscribe.

  • replay=True: New subscribers instantly receive the most recent value (if one exists). Useful for state streams where clients need the current state immediately upon connection.
  • replay=False: New subscribers only receive values published after they subscribe. Useful for event streams where you only want to see new events.

queue_maxsize (default: 1)

The maximum number of items that can be queued for each subscriber before the delivery policy kicks in.

  • queue_maxsize=1: Only the latest value is kept. Perfect for state streams where you only care about the current state.
  • queue_maxsize=N (N > 1): Allows buffering up to N items. Useful when subscribers might process items slower than they're published, but you still want to limit memory usage.
# Only keep latest temperature reading
@stream(name="Temperature", payload=Temperature, queue_maxsize=1)

# Buffer up to 10 sensor readings
@stream(name="SensorData", payload=SensorReading, queue_maxsize=10)

policy (default: "latest")

Defines what happens when a subscriber’s queue is full and a new value is published.

Each subscriber maintains its own in-memory queue of pending messages. When you publish faster than a client can consume, the queue eventually fills — the policy determines what happens next.

policy="latest" — “drop oldest, never block”

  • The publisher never waits.
  • If a subscriber’s queue is full, the oldest item is dropped and the new one is inserted immediately.
  • Fast subscribers receive every message; slow subscribers skip intermediate values but always see the most recent state.

✅ Pros

  • Zero backpressure — publisher performance unaffected by slow clients.
  • Keeps UI dashboards and telemetry feeds current (“latest value always wins”).
  • Ideal for high-frequency data (positions, sensor readings, machine state).

⚠️ Cons

  • Drops messages for slow clients (they may miss intermediate updates).
  • Subscribers can diverge — one may receive more updates than another.

Example:

@stream(name="Temperature", payload=TempReading,
        policy="latest", queue_maxsize=1)
# → publisher never blocks; subscribers always see the most recent temperature

policy="fifo" — “deliver all, may block”

  • The publisher awaits until there’s space in every subscriber’s queue.
  • Guarantees that all messages are delivered in order to every subscriber.
  • A slow subscriber can stall the entire stream, because the distributor waits for that subscriber’s queue to make room before continuing.

✅ Pros

  • Preserves every event and strict ordering.
  • Reliable for command sequences, audit logs, and event-driven logic.

⚠️ Cons

  • One slow or paused subscriber can block all others.
  • Publishing rate is limited by the slowest client.
  • In extreme cases, a throttled browser or dropped connection can cause the distributor to stall until the queue frees or the subscriber is removed.

Example:

@stream(name="Events", payload=MachineEvent,
        policy="fifo", queue_maxsize=100)
# → guarantees ordered delivery but can back-pressure the publisher

Common Combinations:

  • State monitoring (default): replay=True, queue_maxsize=1, policy="latest" — subscribers get current state immediately and always see the latest value.
  • Event streaming: replay=False, queue_maxsize=100, policy="fifo" — subscribers only see new events and process them in order.

🔍 Troubleshooting & FAQ

Q: Can I disable proto generation at runtime?

Yes — set emit_proto=False in VentionApp(...).

Q: Publishing raises KeyError: Unknown stream.

Ensure app.finalize() has been called before publishing or subscribing.

Q: How do I integrate this with other libraries (state machine, storage, etc.)?

Use app.register_rpc_plugin() to merge additional RPC definitions before calling .finalize().

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vention_communication-0.4.27.tar.gz (29.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vention_communication-0.4.27-py3-none-any.whl (29.2 kB view details)

Uploaded Python 3

File details

Details for the file vention_communication-0.4.27.tar.gz.

File metadata

  • Download URL: vention_communication-0.4.27.tar.gz
  • Upload date:
  • Size: 29.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.10.12 Linux/6.17.0-1010-azure

File hashes

Hashes for vention_communication-0.4.27.tar.gz
Algorithm Hash digest
SHA256 9693f56d9d377998779de6ff16f9ab455635c01ee7b0252215114cbaa8883fe5
MD5 05eed49871485cc7d3cb7204623487ce
BLAKE2b-256 49cec363d441fdcf663b0ff80ab9457b16c67c6418042f521c915431288a6cb5

See more details on using hashes here.

File details

Details for the file vention_communication-0.4.27-py3-none-any.whl.

File metadata

File hashes

Hashes for vention_communication-0.4.27-py3-none-any.whl
Algorithm Hash digest
SHA256 15749a9073bf11d92e4acde17bb4b46ce8658518b8841073333a829de8c01e50
MD5 51a6b448bed420fe0cb41b8ef1124997
BLAKE2b-256 c0261938c9f8b24872807fe9099a1a770fd45f52deee8bd72fbfb6527db4bd7e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page