Skip to main content

NATS-based RPC framework with code generation and OpenTelemetry tracing

Project description

amirpc

A Python RPC framework built on NATS for building microservices with automatic code generation.

Features

  • Code Generation - Generate clients, servers, and models from ASL (AMI Service Language) specs
  • Type Safety - Full Pydantic validation for requests and responses
  • Runtime Wrapper - Simple Runtime class that handles connection management
  • Load Balancing - Automatic queue groups for distributing requests across server instances
  • Event System - Publish/subscribe events with typed payloads
  • Health Checks - Built-in health endpoints (NATS + optional HTTP)
  • OpenTelemetry - Metrics and tracing out of the box
  • Gateway - Auto-generate FastAPI routes from metadata (optional)

Installation

pip install amirpc

# With compiler (for code generation)
pip install amirpc[compiler]

# With gateway support
pip install amirpc[gateway]

Quick Start

1. Define your service (ASL)

Create an infrastructure file that references your services:

/// myapp.asl (main entry point)
module "myapp";

infrastructure MyApp {
    services {
        Math from "./math.asl";
    }
}
/// math.asl
module "myapp.math";

service Math {
    /// Basic arithmetic operations.
    namespace Calc {
        /// Multiply two numbers.
        rpc multiply(a: int, b: int) -> { result: int };

        /// Add two numbers.
        rpc add(a: int, b: int) -> { result: int };
    }
}

2. Generate code

amic generate myapp.asl -o generated/

3. Implement the server

from amirpc import Runtime
from generated.math.server import MathServer, MathCalcServer
from generated.math.models import CalcMultiplyPayload, CalcMultiplyResult, CalcAddPayload, CalcAddResult


class CalcImpl(MathCalcServer):
    async def multiply(self, payload: CalcMultiplyPayload) -> CalcMultiplyResult:
        return CalcMultiplyResult(result=payload.a * payload.b)

    async def add(self, payload: CalcAddPayload) -> CalcAddResult:
        return CalcAddResult(result=payload.a + payload.b)


class MyMathServer(MathServer):
    calc = CalcImpl


if __name__ == "__main__":
    runtime = Runtime.from_env()
    runtime.serve([MyMathServer])

4. Use the client

import asyncio
from amirpc import Runtime
from generated.math.client import MathClient


async def main():
    async with Runtime.from_env().connect() as rt:
        client = rt.client(MathClient)

        result = await client.calc.multiply(a=6, b=7)
        print(f"6 * 7 = {result.result}")  # 42

        result = await client.calc.add(a=10, b=20)
        print(f"10 + 20 = {result.result}")  # 30


asyncio.run(main())

Configuration

Environment Variables

Variable Description Default
NATS_URL NATS server URL nats://localhost:4222
NATS_CREDS_FILE Path to .creds file None
AMI_SERVICE_VERSION Service version string None

Runtime Options

runtime = Runtime(
    nats_url="nats://localhost:4222",
    creds_file="/path/to/creds",
    service_version="1.0.0",
    connect_timeout=2.0,
    reconnect_time_wait=2.0,
    max_reconnect_attempts=60,
    expand_cnames=True,      # Resolve CNAME records for HA
    drain_timeout=25.0,      # Graceful shutdown timeout
)

Server Features

Lifecycle Hooks

async def init_db():
    # Initialize database connections
    pass

async def close_db():
    # Close database connections
    pass

runtime.serve(
    [MyServer],
    on_startup=init_db,
    on_shutdown=close_db,
)

Server-to-Server Calls

class MyServer(GeneratedServer):
    async def handle_request(self, payload):
        # Call another service
        other_client = self.client(OtherServiceClient)
        return await other_client.namespace.method(arg=value)

HTTP Health Endpoints

server = MyServer(
    nc=nc,
    enable_http_health=True,
    health_port=8080,
)
# GET /alive - liveness probe
# GET /ready - readiness probe
# GET /health - combined status

Events

Emit Events

from generated.math.events import MathEmitter
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    emitter = rt.emitter(MathEmitter)
    await emitter.emit_calculation_done(
        CalculationDoneEmitEvent(
            operation="multiply",
            operands=[6, 7],
            result=42,
        )
    )

Subscribe to Events

from generated.math.client import MathClient
from generated.math.models import CalculationDoneEmitEvent

async with Runtime.from_env().connect() as rt:
    client = rt.client(MathClient)

    @client.on_calculation_done
    async def handle_event(payload: CalculationDoneEmitEvent):
        print(f"Calculation: {payload.operation} = {payload.result}")

    await client.start()
    # Keep running...

Health Check CLI

# Check service health
amirpc-health --service myservice --nats-url nats://localhost:4222 --prefix ami

# Using environment variables
AMI_SERVICE_NAME=myservice AMI_NATS_URL=nats://localhost:4222 amirpc-health

# Options
amirpc-health --help

Gateway (FastAPI Integration)

from fastapi import FastAPI
from amirpc.gateway import AutoGateway

app = FastAPI()

gateway = AutoGateway(
    metadata_path="gateway_routes.json",
    get_nc=get_nats_client,
    require_permission=require_permission,
)

app.include_router(gateway.router, prefix="/api/v1")

ASL Language

See examples/asl/ for ASL spec examples.

Infrastructure (entry point)

/// myapp.asl
module "myapp";

infrastructure MyApp {
    services {
        Users from "./users.asl";
    }
}

Service definition

/// users.asl
module "myapp.users";

from "@/well-known" import model UUID, model Datetime;

/// User not found error.
error UserNotFound [http_status: 404] {
    user_id: UUID;
};

/// Invalid email error.
error InvalidEmail [http_status: 400] {
    email: string;
};

/// User model.
domain model User {
    id: UUID;
    email: string;
    name: string [default: "John"];
    created_at: Datetime;
}

service Users {
    /// User management operations.
    namespace Management {
        /// Create a new user.
        @http(method: "POST", path: "/users")
        @permission("users.create")
        @throws(InvalidEmail)
        rpc create(email: string, name: string) -> User;

        /// Get user by ID.
        @http(method: "GET", path: "/users/{user_id}")
        @throws(UserNotFound)
        rpc get(user_id: UUID) -> User;

        /// Delete a user.
        @http(method: "DELETE", path: "/users/{user_id}")
        @permission("users.delete")
        @throws(UserNotFound)
        rpc delete(user_id: UUID) -> {};
    }

    /// Emitted when a user is created.
    emit UserCreated(user: User);

    /// Listen for user deletion requests.
    listen UserDeleted(user_id: UUID);
}

Migration from 0.x

See MIGRATION_GUIDE.md for detailed migration instructions.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amirpc-1.1.0.tar.gz (78.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

amirpc-1.1.0-py3-none-any.whl (99.0 kB view details)

Uploaded Python 3

File details

Details for the file amirpc-1.1.0.tar.gz.

File metadata

  • Download URL: amirpc-1.1.0.tar.gz
  • Upload date:
  • Size: 78.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.1.0.tar.gz
Algorithm Hash digest
SHA256 0a10d2f94ef805d92445d3db0c58c3f533580b9aef0760c0275027721c5f205f
MD5 ec26b136c560d91d2889198b84204333
BLAKE2b-256 3b6f2318de40db0f2586ddfb793826d4e6f27703070e6cea9a6c90ba710dc1a6

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.1.0.tar.gz:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file amirpc-1.1.0-py3-none-any.whl.

File metadata

  • Download URL: amirpc-1.1.0-py3-none-any.whl
  • Upload date:
  • Size: 99.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for amirpc-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 16e7c9ff5068f40f4017278f041d250806373e50d620d6d9fa94e083764641fc
MD5 7586233c0d786ee512e809c70cb33c46
BLAKE2b-256 10c4f7e0e068bceb277b717a6added590827e9a014d897e614abd17d320d2364

See more details on using hashes here.

Provenance

The following attestation bundles were made for amirpc-1.1.0-py3-none-any.whl:

Publisher: publish.yml on ih3xcode/amirpc

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page