NATS-based RPC framework with code generation and OpenTelemetry tracing
Project description
amirpc
A Python RPC framework built on NATS for building microservices with automatic code generation.
Features
- Code Generation - Generate clients, servers, and models from ASL (AMI Service Language) specs
- Type Safety - Full Pydantic validation for requests and responses
- Runtime Wrapper - Simple
Runtimeclass that handles connection management - Load Balancing - Automatic queue groups for distributing requests across server instances
- Event System - Publish/subscribe events with typed payloads
- Health Checks - Built-in health endpoints (NATS + optional HTTP)
- OpenTelemetry - Metrics and tracing out of the box
- Gateway - Auto-generate FastAPI routes from metadata (optional)
Installation
pip install amirpc
# With compiler (for code generation)
pip install amirpc[compiler]
# With gateway support
pip install amirpc[gateway]
Quick Start
1. Define your service (ASL)
Create an infrastructure file that references your services:
/// myapp.asl (main entry point)
module "myapp";
infrastructure MyApp {
services {
Math from "./math.asl";
}
}
/// math.asl
module "myapp.math";
service Math {
/// Basic arithmetic operations.
namespace Calc {
/// Multiply two numbers.
rpc multiply(a: int, b: int) -> { result: int };
/// Add two numbers.
rpc add(a: int, b: int) -> { result: int };
}
}
2. Generate code
amic generate myapp.asl -o generated/
3. Implement the server
from amirpc import Runtime
from generated.math.server import MathServer, MathCalcServer
from generated.math.models import CalcMultiplyPayload, CalcMultiplyResult, CalcAddPayload, CalcAddResult
class CalcImpl(MathCalcServer):
async def multiply(self, payload: CalcMultiplyPayload) -> CalcMultiplyResult:
return CalcMultiplyResult(result=payload.a * payload.b)
async def add(self, payload: CalcAddPayload) -> CalcAddResult:
return CalcAddResult(result=payload.a + payload.b)
class MyMathServer(MathServer):
calc = CalcImpl
if __name__ == "__main__":
runtime = Runtime.from_env()
runtime.serve([MyMathServer])
4. Use the client
import asyncio
from amirpc import Runtime
from generated.math.client import MathClient
async def main():
async with Runtime.from_env().connect() as rt:
client = rt.client(MathClient)
result = await client.calc.multiply(a=6, b=7)
print(f"6 * 7 = {result.result}") # 42
result = await client.calc.add(a=10, b=20)
print(f"10 + 20 = {result.result}") # 30
asyncio.run(main())
Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
NATS_URL |
NATS server URL | nats://localhost:4222 |
NATS_CREDS_FILE |
Path to .creds file | None |
AMI_SERVICE_VERSION |
Service version string | None |
Runtime Options
runtime = Runtime(
nats_url="nats://localhost:4222",
creds_file="/path/to/creds",
service_version="1.0.0",
connect_timeout=2.0,
reconnect_time_wait=2.0,
max_reconnect_attempts=60,
expand_cnames=True, # Resolve CNAME records for HA
drain_timeout=25.0, # Graceful shutdown timeout
)
Server Features
Lifecycle Hooks
async def init_db():
# Initialize database connections
pass
async def close_db():
# Close database connections
pass
runtime.serve(
[MyServer],
on_startup=init_db,
on_shutdown=close_db,
)
Server-to-Server Calls
class MyServer(GeneratedServer):
async def handle_request(self, payload):
# Call another service
other_client = self.client(OtherServiceClient)
return await other_client.namespace.method(arg=value)
HTTP Health Endpoints
server = MyServer(
nc=nc,
enable_http_health=True,
health_port=8080,
)
# GET /alive - liveness probe
# GET /ready - readiness probe
# GET /health - combined status
Events
Emit Events
from generated.math.events import MathEmitter
from generated.math.models import CalculationDoneEmitEvent
async with Runtime.from_env().connect() as rt:
emitter = rt.emitter(MathEmitter)
await emitter.emit_calculation_done(
CalculationDoneEmitEvent(
operation="multiply",
operands=[6, 7],
result=42,
)
)
Subscribe to Events
from generated.math.client import MathClient
from generated.math.models import CalculationDoneEmitEvent
async with Runtime.from_env().connect() as rt:
client = rt.client(MathClient)
@client.on_calculation_done
async def handle_event(payload: CalculationDoneEmitEvent):
print(f"Calculation: {payload.operation} = {payload.result}")
await client.start()
# Keep running...
Health Check CLI
# Check service health
amirpc-health --service myservice --nats-url nats://localhost:4222 --prefix ami
# Using environment variables
AMI_SERVICE_NAME=myservice AMI_NATS_URL=nats://localhost:4222 amirpc-health
# Options
amirpc-health --help
Gateway (FastAPI Integration)
from fastapi import FastAPI
from amirpc.gateway import AutoGateway
app = FastAPI()
gateway = AutoGateway(
metadata_path="gateway_routes.json",
get_nc=get_nats_client,
require_permission=require_permission,
)
app.include_router(gateway.router, prefix="/api/v1")
ASL Language
See examples/asl/ for ASL spec examples.
Infrastructure (entry point)
/// myapp.asl
module "myapp";
infrastructure MyApp {
services {
Users from "./users.asl";
}
}
Service definition
/// users.asl
module "myapp.users";
from "@/well-known" import model UUID, model Datetime;
/// User not found error.
error UserNotFound [http_status: 404] {
user_id: UUID;
};
/// Invalid email error.
error InvalidEmail [http_status: 400] {
email: string;
};
/// User model.
domain model User {
id: UUID;
email: string;
name: string [default: "John"];
created_at: Datetime;
}
service Users {
/// User management operations.
namespace Management {
/// Create a new user.
@http(method: "POST", path: "/users")
@permission("users.create")
@throws(InvalidEmail)
rpc create(email: string, name: string) -> User;
/// Get user by ID.
@http(method: "GET", path: "/users/{user_id}")
@throws(UserNotFound)
rpc get(user_id: UUID) -> User;
/// Delete a user.
@http(method: "DELETE", path: "/users/{user_id}")
@permission("users.delete")
@throws(UserNotFound)
rpc delete(user_id: UUID) -> {};
}
/// Emitted when a user is created.
emit UserCreated(user: User);
/// Listen for user deletion requests.
listen UserDeleted(user_id: UUID);
}
Migration from 0.x
See MIGRATION_GUIDE.md for detailed migration instructions.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amirpc-1.1.1.tar.gz.
File metadata
- Download URL: amirpc-1.1.1.tar.gz
- Upload date:
- Size: 78.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2737a3a1ce040b28742c9cdf0f3ae10ae012194e7f523644f9e7b01d646e94db
|
|
| MD5 |
dd906198a1b6fadea0d80ef9d73c98f6
|
|
| BLAKE2b-256 |
e8a8e24778e8f9a0a81577c756a487e3eaa08a001f9e0ef3def38c59a2a63df1
|
Provenance
The following attestation bundles were made for amirpc-1.1.1.tar.gz:
Publisher:
publish.yml on ih3xcode/amirpc
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
amirpc-1.1.1.tar.gz -
Subject digest:
2737a3a1ce040b28742c9cdf0f3ae10ae012194e7f523644f9e7b01d646e94db - Sigstore transparency entry: 855200182
- Sigstore integration time:
-
Permalink:
ih3xcode/amirpc@f58cc569ce960123c59f2ab18ea40b452e1db3f4 -
Branch / Tag:
refs/tags/v1.1.1 - Owner: https://github.com/ih3xcode
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f58cc569ce960123c59f2ab18ea40b452e1db3f4 -
Trigger Event:
release
-
Statement type:
File details
Details for the file amirpc-1.1.1-py3-none-any.whl.
File metadata
- Download URL: amirpc-1.1.1-py3-none-any.whl
- Upload date:
- Size: 99.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58904d19be83fbd2b60ba9cc34058fa999fa0ffdd99cb7079cdd6406b1a48a41
|
|
| MD5 |
39a9680e938bdb20912f49edf5a11c68
|
|
| BLAKE2b-256 |
a2255ce9fe5eeb58141349baca57c31961ef85f8b91aa20ca44d9ffc8730854b
|
Provenance
The following attestation bundles were made for amirpc-1.1.1-py3-none-any.whl:
Publisher:
publish.yml on ih3xcode/amirpc
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
amirpc-1.1.1-py3-none-any.whl -
Subject digest:
58904d19be83fbd2b60ba9cc34058fa999fa0ffdd99cb7079cdd6406b1a48a41 - Sigstore transparency entry: 855200186
- Sigstore integration time:
-
Permalink:
ih3xcode/amirpc@f58cc569ce960123c59f2ab18ea40b452e1db3f4 -
Branch / Tag:
refs/tags/v1.1.1 - Owner: https://github.com/ih3xcode
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@f58cc569ce960123c59f2ab18ea40b452e1db3f4 -
Trigger Event:
release
-
Statement type: