Skip to main content

NATS-based RPC framework with code generation and OpenTelemetry tracing

Project description

amirpc

A Python RPC framework built on NATS for building microservices with automatic code generation.

Features

  • Code Generation - Generate clients, servers, and models from ASL (AMI Service Language) specs
  • Type Safety - Full Pydantic validation for requests and responses
  • Runtime Wrapper - Simple Runtime class that handles connection management
  • Load Balancing - Automatic queue groups for distributing requests across server instances
  • Event System - Publish/subscribe events with typed payloads
  • Health Checks - Built-in health endpoints (NATS + optional HTTP)
  • OpenTelemetry - Metrics and tracing out of the box
  • Gateway - Auto-generate FastAPI routes from metadata (optional)

Installation

pip install amirpc

# With compiler (for code generation)
pip install amirpc[compiler]

# With gateway support
pip install amirpc[gateway]

Quick Start

1. Define your service (ASL)

Create an infrastructure file that references your services:

/// myapp.asl (main entry point)
module "myapp";

infrastructure MyApp {
    services {
        Math from "./math.asl";
    }
}
/// math.asl
module "myapp.math";

service Math {
    /// Basic arithmetic operations.
    namespace Calc {
        /// Multiply two numbers.
        rpc multiply(a: int, b: int) -> { result: int };

        /// Add two numbers.
        rpc add(a: int, b: int) -> { result: int };
    }
}

2. Generate code

amic generate myapp.asl -o generated/

3. Implement the server

from amirpc import Runtime
from generated.math.server import MathServer, MathCalcServer
from generated.math.models import CalcMultiplyPayload, CalcMultiplyResult, CalcAddPayload, CalcAddResult


class CalcImpl(MathCalcServer):
    async def multiply(self, payload: CalcMultiplyPayload) -> CalcMultiplyResult:
        return CalcMultiplyResult(result=payload.a * payload.b)

    async def add(self, payload: CalcAddPayload) -> CalcAddResult:
        return CalcAddResult(result=payload.a + payload.b)


class MyMathServer(MathServer):
    calc = CalcImpl


if __name__ == "__main__":
    runtime = Runtime.from_env()
    runtime.serve([MyMathServer])

4. Use the client

import asyncio
from amirpc import Runtime
from generated.math.client import MathClient


async def main():
    async with Runtime.from_env().connect() as rt:
        client = rt.client(MathClient)

        result = await client.calc.multiply(a=6, b=7)
        print(f"6 * 7 = {result.result}")  # 42

        result = await client.calc.add(a=10, b=20)
        print(f"10 + 20 = {result.result}")  # 30


asyncio.run(main())

Configuration

Environment Variables

Variable Description Default
NATS_URL NATS server URL nats://localhost:4222
NATS_CREDS_FILE Path to .creds file None
AMI_SERVICE_VERSION Service version string None

Runtime Options

runtime = Runtime(
    nats_url="nats://localhost:4222",
    creds_file="/path/to/creds",
    service_version="1.0.0",
    connect_timeout=2.0,
    reconnect_time_wait=2.0,
    max_reconnect_attempts=60,
    expand_cnames=True,      # Resolve CNAME records for HA
    drain_timeout=25.0,      # Graceful shutdown timeout
)

Server Features

Lifecycle Hooks

async def init_db():
    # Initialize database connections
    pass

async def close_db():
    # Close database connections
    pass

runtime.serve(
    [MyServer],
    on_startup=init_db,
    on_shutdown=close_db,
)

Server-to-Server Calls

class MyServer(GeneratedServer):
    async def handle_request(self, payload):
        # Call another service
        other_client = self.client(OtherServiceClient)
        return await other_client.namespace.method(arg=value)

HTTP Health Endpoints

server = MyServer(
    nc=nc,
    enable_http_health=True,
    health_port=8080,
)
# GET /alive - liveness probe
# GET /ready - readiness probe
# GET /health - combined status

Events

Emit Events

from generated.math.events import MathEmitter
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    emitter = rt.emitter(MathEmitter)
    await emitter.emit_calculation_done(
        CalculationDoneEmitEvent(
            operation="multiply",
            operands=[6, 7],
            result=42,
        )
    )

Subscribe to Events

from generated.math.client import MathClient
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    client = rt.client(MathClient)

    @client.on_calculation_done
    async def handle_event(payload: CalculationDoneEmitEvent):
        print(f"Calculation: {payload.operation} = {payload.result}")

    await client.start()
    # Keep running...

Health Check CLI

# Check service health
amirpc-health --service myservice --nats-url nats://localhost:4222 --prefix ami

# Using environment variables
AMI_SERVICE_NAME=myservice AMI_NATS_URL=nats://localhost:4222 amirpc-health

# Options
amirpc-health --help

Gateway (FastAPI Integration)

from fastapi import FastAPI
from amirpc.gateway import AutoGateway

app = FastAPI()

gateway = AutoGateway(
    metadata_path="gateway_routes.json",
    get_nc=get_nats_client,
    require_permission=require_permission,
)

app.include_router(gateway.router, prefix="/api/v1")

ASL Language

See examples/asl/ for ASL spec examples.

Infrastructure (entry point)

/// myapp.asl
module "myapp";

infrastructure MyApp {
    services {
        Users from "./users.asl";
    }
}

Service definition

/// users.asl
module "myapp.users";

from "@/well-known" import model UUID, model Datetime;

/// User not found error.
error UserNotFound [http_status: 404] {
    user_id: UUID;
};

/// Invalid email error.
error InvalidEmail [http_status: 400] {
    email: string;
};

/// User model.
domain model User {
    id: UUID;
    email: string;
    name: string [default: "John"];
    created_at: Datetime;
}

service Users {
    /// User management operations.
    namespace Management {
        /// Create a new user.
        @http(method: "POST", path: "/users")
        @permission("users.create")
        @throws(InvalidEmail)
        rpc create(email: string, name: string) -> User;

        /// Get user by ID.
        @http(method: "GET", path: "/users/{user_id}")
        @throws(UserNotFound)
        rpc get(user_id: UUID) -> User;

        /// Delete a user.
        @http(method: "DELETE", path: "/users/{user_id}")
        @permission("users.delete")
        @throws(UserNotFound)
        rpc delete(user_id: UUID) -> {};
    }

    /// Emitted when a user is created.
    emit UserCreated(user: User);

    /// Listen for user deletion requests.
    listen UserDeleted(user_id: UUID);
}

Migration from 0.x

See MIGRATION_GUIDE.md for detailed migration instructions.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amirpc-1.0.1.tar.gz (77.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amirpc-1.0.1-py3-none-any.whl (96.7 kB view details)

Uploaded Python 3

File details

Details for the file amirpc-1.0.1.tar.gz.

File metadata

  • Download URL: amirpc-1.0.1.tar.gz
  • Upload date:
  • Size: 77.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.0.1.tar.gz
Algorithm Hash digest
SHA256 1a6598cd494aefbf8b6179c91691472c304d017ee571a4ab9d94e5dc72fe6e75
MD5 3a0ab37a3e673c092980d423ada5a931
BLAKE2b-256 bfbd6d9d288df7629f80c89baadc5ee2d9e33cff228b6c15e315bd953b418028

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.0.1.tar.gz:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file amirpc-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: amirpc-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 96.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 536641ac54d2316160b50c992e24f2b7973984b5363b9dae3b23b01ab28c47a1
MD5 6fb33a967ef9e87e439f0d1b92a8449e
BLAKE2b-256 8265906cead2e23dcd45c33cc057632486eef2ca9d0f403f04d24aa8d161f3fd

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.0.1-py3-none-any.whl:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page