Skip to main content

NATS-based RPC framework with code generation and OpenTelemetry tracing

Project description

amirpc

A Python RPC framework built on NATS for building microservices with automatic code generation.

Features

  • Code Generation - Generate clients, servers, and models from ASL (AMI Service Language) specs
  • Type Safety - Full Pydantic validation for requests and responses
  • Runtime Wrapper - Simple Runtime class that handles connection management
  • Load Balancing - Automatic queue groups for distributing requests across server instances
  • Event System - Publish/subscribe events with typed payloads
  • Health Checks - Built-in health endpoints (NATS + optional HTTP)
  • OpenTelemetry - Metrics and tracing out of the box
  • Gateway - Auto-generate FastAPI routes from metadata (optional)

Installation

pip install amirpc

# With compiler (for code generation)
pip install amirpc[compiler]

# With gateway support
pip install amirpc[gateway]

Quick Start

1. Define your service (ASL)

Create an infrastructure file that references your services:

/// myapp.asl (main entry point)
module "myapp";

infrastructure MyApp {
    services {
        Math from "./math.asl";
    }
}
/// math.asl
module "myapp.math";

service Math {
    /// Basic arithmetic operations.
    namespace Calc {
        /// Multiply two numbers.
        rpc multiply(a: int, b: int) -> { result: int };

        /// Add two numbers.
        rpc add(a: int, b: int) -> { result: int };
    }
}

2. Generate code

amic generate myapp.asl -o generated/

3. Implement the server

from amirpc import Runtime
from generated.math.server import MathServer, MathCalcServer
from generated.math.models import CalcMultiplyPayload, CalcMultiplyResult, CalcAddPayload, CalcAddResult


class CalcImpl(MathCalcServer):
    async def multiply(self, payload: CalcMultiplyPayload) -> CalcMultiplyResult:
        return CalcMultiplyResult(result=payload.a * payload.b)

    async def add(self, payload: CalcAddPayload) -> CalcAddResult:
        return CalcAddResult(result=payload.a + payload.b)


class MyMathServer(MathServer):
    calc = CalcImpl


if __name__ == "__main__":
    runtime = Runtime.from_env()
    runtime.serve([MyMathServer])

4. Use the client

import asyncio
from amirpc import Runtime
from generated.math.client import MathClient


async def main():
    async with Runtime.from_env().connect() as rt:
        client = rt.client(MathClient)

        result = await client.calc.multiply(a=6, b=7)
        print(f"6 * 7 = {result.result}")  # 42

        result = await client.calc.add(a=10, b=20)
        print(f"10 + 20 = {result.result}")  # 30


asyncio.run(main())

Configuration

Environment Variables

Variable Description Default
NATS_URL NATS server URL nats://localhost:4222
NATS_CREDS_FILE Path to .creds file None
AMI_SERVICE_VERSION Service version string None

Runtime Options

runtime = Runtime(
    nats_url="nats://localhost:4222",
    creds_file="/path/to/creds",
    service_version="1.0.0",
    connect_timeout=2.0,
    reconnect_time_wait=2.0,
    max_reconnect_attempts=60,
    expand_cnames=True,      # Resolve CNAME records for HA
    drain_timeout=25.0,      # Graceful shutdown timeout
)

Server Features

Lifecycle Hooks

async def init_db():
    # Initialize database connections
    pass

async def close_db():
    # Close database connections
    pass

runtime.serve(
    [MyServer],
    on_startup=init_db,
    on_shutdown=close_db,
)

Server-to-Server Calls

class MyServer(GeneratedServer):
    async def handle_request(self, payload):
        # Call another service
        other_client = self.client(OtherServiceClient)
        return await other_client.namespace.method(arg=value)

HTTP Health Endpoints

server = MyServer(
    nc=nc,
    enable_http_health=True,
    health_port=8080,
)
# GET /alive - liveness probe
# GET /ready - readiness probe
# GET /health - combined status

Events

Emit Events

from generated.math.events import MathEmitter
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    emitter = rt.emitter(MathEmitter)
    await emitter.emit_calculation_done(
        CalculationDoneEmitEvent(
            operation="multiply",
            operands=[6, 7],
            result=42,
        )
    )

Subscribe to Events

from generated.math.client import MathClient
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    client = rt.client(MathClient)

    @client.on_calculation_done
    async def handle_event(payload: CalculationDoneEmitEvent):
        print(f"Calculation: {payload.operation} = {payload.result}")

    await client.start()
    # Keep running...

Health Check CLI

# Check service health
amirpc-health --service myservice --nats-url nats://localhost:4222 --prefix ami

# Using environment variables
AMI_SERVICE_NAME=myservice AMI_NATS_URL=nats://localhost:4222 amirpc-health

# Options
amirpc-health --help

Gateway (FastAPI Integration)

from fastapi import FastAPI
from amirpc.gateway import AutoGateway

app = FastAPI()

gateway = AutoGateway(
    metadata_path="gateway_routes.json",
    get_nc=get_nats_client,
    require_permission=require_permission,
)

app.include_router(gateway.router, prefix="/api/v1")

ASL Language

See examples/asl/ for ASL spec examples.

Infrastructure (entry point)

/// myapp.asl
module "myapp";

infrastructure MyApp {
    services {
        Users from "./users.asl";
    }
}

Service definition

/// users.asl
module "myapp.users";

from "@/well-known" import model UUID, model Datetime;

/// User not found error.
error UserNotFound [http_status: 404] {
    user_id: UUID;
};

/// Invalid email error.
error InvalidEmail [http_status: 400] {
    email: string;
};

/// User model.
domain model User {
    id: UUID;
    email: string;
    name: string [default: "John"];
    created_at: Datetime;
}

service Users {
    /// User management operations.
    namespace Management {
        /// Create a new user.
        @http(method: "POST", path: "/users")
        @permission("users.create")
        @throws(InvalidEmail)
        rpc create(email: string, name: string) -> User;

        /// Get user by ID.
        @http(method: "GET", path: "/users/{user_id}")
        @throws(UserNotFound)
        rpc get(user_id: UUID) -> User;

        /// Delete a user.
        @http(method: "DELETE", path: "/users/{user_id}")
        @permission("users.delete")
        @throws(UserNotFound)
        rpc delete(user_id: UUID) -> {};
    }

    /// Emitted when a user is created.
    emit UserCreated(user: User);

    /// Listen for user deletion requests.
    listen UserDeleted(user_id: UUID);
}

Migration from 0.x

See MIGRATION_GUIDE.md for detailed migration instructions.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amirpc-1.1.1.tar.gz (78.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amirpc-1.1.1-py3-none-any.whl (99.0 kB view details)

Uploaded Python 3

File details

Details for the file amirpc-1.1.1.tar.gz.

File metadata

  • Download URL: amirpc-1.1.1.tar.gz
  • Upload date:
  • Size: 78.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.1.1.tar.gz
Algorithm Hash digest
SHA256 2737a3a1ce040b28742c9cdf0f3ae10ae012194e7f523644f9e7b01d646e94db
MD5 dd906198a1b6fadea0d80ef9d73c98f6
BLAKE2b-256 e8a8e24778e8f9a0a81577c756a487e3eaa08a001f9e0ef3def38c59a2a63df1

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.1.1.tar.gz:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file amirpc-1.1.1-py3-none-any.whl.

File metadata

  • Download URL: amirpc-1.1.1-py3-none-any.whl
  • Upload date:
  • Size: 99.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 58904d19be83fbd2b60ba9cc34058fa999fa0ffdd99cb7079cdd6406b1a48a41
MD5 39a9680e938bdb20912f49edf5a11c68
BLAKE2b-256 a2255ce9fe5eeb58141349baca57c31961ef85f8b91aa20ca44d9ffc8730854b

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.1.1-py3-none-any.whl:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page