Skip to main content

NATS-based RPC framework with code generation and OpenTelemetry tracing

Project description

amirpc

A Python RPC framework built on NATS for building microservices with automatic code generation.

Features

  • Code Generation - Generate clients, servers, and models from ASL (AMI Service Language) specs
  • Type Safety - Full Pydantic validation for requests and responses
  • Runtime Wrapper - Simple Runtime class that handles connection management
  • Load Balancing - Automatic queue groups for distributing requests across server instances
  • Event System - Publish/subscribe events with typed payloads
  • Health Checks - Built-in health endpoints (NATS + optional HTTP)
  • OpenTelemetry - Metrics and tracing out of the box
  • Gateway - Auto-generate FastAPI routes from metadata (optional)

Installation

pip install amirpc

# With compiler (for code generation)
pip install amirpc[compiler]

# With gateway support
pip install amirpc[gateway]

Quick Start

1. Define your service (ASL)

Create an infrastructure file that references your services:

/// myapp.asl (main entry point)
module "myapp";

infrastructure MyApp {
    services {
        Math from "./math.asl";
    }
}
/// math.asl
module "myapp.math";

service Math {
    /// Basic arithmetic operations.
    namespace Calc {
        /// Multiply two numbers.
        rpc multiply(a: int, b: int) -> { result: int };

        /// Add two numbers.
        rpc add(a: int, b: int) -> { result: int };
    }
}

2. Generate code

amic generate myapp.asl -o generated/

3. Implement the server

from amirpc import Runtime
from generated.math.server import MathServer, MathCalcServer
from generated.math.models import CalcMultiplyPayload, CalcMultiplyResult, CalcAddPayload, CalcAddResult


class CalcImpl(MathCalcServer):
    async def multiply(self, payload: CalcMultiplyPayload) -> CalcMultiplyResult:
        return CalcMultiplyResult(result=payload.a * payload.b)

    async def add(self, payload: CalcAddPayload) -> CalcAddResult:
        return CalcAddResult(result=payload.a + payload.b)


class MyMathServer(MathServer):
    calc = CalcImpl


if __name__ == "__main__":
    runtime = Runtime.from_env()
    runtime.serve([MyMathServer])

4. Use the client

import asyncio
from amirpc import Runtime
from generated.math.client import MathClient


async def main():
    async with Runtime.from_env().connect() as rt:
        client = rt.client(MathClient)

        result = await client.calc.multiply(a=6, b=7)
        print(f"6 * 7 = {result.result}")  # 42

        result = await client.calc.add(a=10, b=20)
        print(f"10 + 20 = {result.result}")  # 30


asyncio.run(main())

Configuration

Environment Variables

Variable Description Default
NATS_URL NATS server URL nats://localhost:4222
NATS_CREDS_FILE Path to .creds file None
AMI_SERVICE_VERSION Service version string None

Runtime Options

runtime = Runtime(
    nats_url="nats://localhost:4222",
    creds_file="/path/to/creds",
    service_version="1.0.0",
    connect_timeout=2.0,
    reconnect_time_wait=2.0,
    max_reconnect_attempts=60,
    expand_srv=True,          # Resolve DNS SRV records for HA
    drain_timeout=25.0,      # Graceful shutdown timeout
)

Server Features

Lifecycle Hooks

async def init_db():
    # Initialize database connections
    pass

async def close_db():
    # Close database connections
    pass

runtime.serve(
    [MyServer],
    on_startup=init_db,
    on_shutdown=close_db,
)

Server-to-Server Calls

class MyServer(GeneratedServer):
    async def handle_request(self, payload):
        # Call another service
        other_client = self.client(OtherServiceClient)
        return await other_client.namespace.method(arg=value)

HTTP Health Endpoints

server = MyServer(
    nc=nc,
    enable_http_health=True,
    health_port=8080,
)
# GET /alive - liveness probe
# GET /ready - readiness probe
# GET /health - combined status

Events

Emit Events

from generated.math.events import MathEmitter
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    emitter = rt.emitter(MathEmitter)
    await emitter.emit_calculation_done(
        CalculationDoneEmitEvent(
            operation="multiply",
            operands=[6, 7],
            result=42,
        )
    )

Subscribe to Events

from generated.math.client import MathClient
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    client = rt.client(MathClient)

    @client.on_calculation_done
    async def handle_event(payload: CalculationDoneEmitEvent):
        print(f"Calculation: {payload.operation} = {payload.result}")

    await client.start()
    # Keep running...

Health Check CLI

# Check service health
amirpc-health --service myservice --nats-url nats://localhost:4222 --prefix ami

# Using environment variables
AMI_SERVICE_NAME=myservice AMI_NATS_URL=nats://localhost:4222 amirpc-health

# Options
amirpc-health --help

Gateway (FastAPI Integration)

from fastapi import FastAPI
from amirpc.gateway import AutoGateway

app = FastAPI()

gateway = AutoGateway(
    metadata_path="gateway_routes.json",
    get_nc=get_nats_client,
    require_permission=require_permission,
)

app.include_router(gateway.router, prefix="/api/v1")

ASL Language

See examples/asl/ for ASL spec examples.

Infrastructure (entry point)

/// myapp.asl
module "myapp";

infrastructure MyApp {
    services {
        Users from "./users.asl";
    }
}

Service definition

/// users.asl
module "myapp.users";

from "@/well-known" import model UUID, model Datetime;

/// User not found error.
error UserNotFound [http_status: 404] {
    user_id: UUID;
};

/// Invalid email error.
error InvalidEmail [http_status: 400] {
    email: string;
};

/// User model.
domain model User {
    id: UUID;
    email: string;
    name: string [default: "John"];
    created_at: Datetime;
}

service Users {
    /// User management operations.
    namespace Management {
        /// Create a new user.
        @http(method: "POST", path: "/users")
        @permission("users.create")
        @throws(InvalidEmail)
        rpc create(email: string, name: string) -> User;

        /// Get user by ID.
        @http(method: "GET", path: "/users/{user_id}")
        @throws(UserNotFound)
        rpc get(user_id: UUID) -> User;

        /// Delete a user.
        @http(method: "DELETE", path: "/users/{user_id}")
        @permission("users.delete")
        @throws(UserNotFound)
        rpc delete(user_id: UUID) -> {};
    }

    /// Emitted when a user is created.
    emit UserCreated(user: User);

    /// Listen for user deletion requests.
    listen UserDeleted(user_id: UUID);
}

Migration from 0.x

See MIGRATION_GUIDE.md for detailed migration instructions.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amirpc-1.3.0.tar.gz (79.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amirpc-1.3.0-py3-none-any.whl (99.2 kB view details)

Uploaded Python 3

File details

Details for the file amirpc-1.3.0.tar.gz.

File metadata

  • Download URL: amirpc-1.3.0.tar.gz
  • Upload date:
  • Size: 79.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.3.0.tar.gz
Algorithm Hash digest
SHA256 93ec570a626792b7d5292ced86239a5b425581ffd1df87250f8c329db361c57a
MD5 b149a11b558829292c7f0ebdb406e08c
BLAKE2b-256 fb38544ffa258128766cd50dac62a7afc867fc9d1688ccf4345d3e4c5570ff22

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.3.0.tar.gz:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file amirpc-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: amirpc-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 99.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2796fbd5bfd57492b8d7295d5882eda7e033df86831be461ae6d80b37ee0d519
MD5 747cbe44c305342188814498fc571aa4
BLAKE2b-256 72ae549d5f66fdbc1c906fad334e7250b1aebb802bab33d4788fe77db3dab814

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.3.0-py3-none-any.whl:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page