An async service invocation framework with registry, discovery, load balancing, and declarative HTTP client features.
Project description
Aio Service Caller
A lightweight and high-performance asynchronous service invocation framework for Python, providing service registration, discovery, load balancing, and declarative HTTP client capabilities.
1. Features
- 🔍 Service Discovery: Built-in Nacos integration for automatic service registration and discovery
- ⚖️ Load Balancing: Multiple strategies included (round robin, random, weighted random, weighted round robin)
- 🔌 Interceptor Mechanism: Spring-style interceptors supporting pre-request, post-response, and exception handling
- 🚀 High Performance Async I/O: Powered by
aiohttpwith connection pooling for efficient HTTP requests
2. Installation
pip install aio-service-caller[config]
The optional
configextra automatically installsyamlpyconfig. It is recommended to useyamlpyconfigto load service registry, discovery, and caller configuration from local configuration files combined with environment variables.
3. Quick Start
3.1. Add configuration
Create a config file such as application.yaml or application-{profile}.yaml inside your config directory (e.g., /config):
For details about
yamlpyconfig, refer to the documentation.
# Service caller settings
service-caller:
# Supported load balancer types: round_robin, random, weight_round_robin, weight_random
lb-type: round_robin # Load balancer type
connection-timeout: 6 # Connection timeout in seconds
read-timeout: 6 # Read timeout in seconds
connection-pool-size: 100 # Connection pool size
# Nacos configuration
app-registry:
nacos:
server-addr: "192.168.30.36:9090" # Nacos server address
namespace: "dev" # Optional namespace
cluster: "DEFAULT" # Optional cluster
group: "DEFAULT_GROUP" # Default group name
ip: "127.0.0.1" # Local service IP
port: 9999 # Local service port
app-name: "my-app" # Application name
username: "nacos" # Optional username
password: "Y789uioJKL" # Optional password
weight: 1.0 # Service instance weight
3.2. Create ServiceManager and call other services in the cluster
Creating a ServiceManager serves two purposes:
- Automatically registers the current service to Nacos using the provided configuration
- Allows you to call other service instances registered under the same namespace (real HTTP requests are handled internally via
aiohttp)
Example:
@pytest.mark.asyncio
async def test_get_service_instances_with_nacos(self):
# Load configuration
async with ConfigManager("./") as config_manager:
# Creating the ServiceManager automatically registers the current service into Nacos
async with ServiceManager(
config_manager=config_manager,
interceptors=[LoggingInterceptor(), AuthInterceptor(token="123456"), MetricsInterceptor()]
) as manager:
# Option 1: Directly get the parsed result
result = await manager.get("other-app", "/hello")
assert result == {"status": "OK"}
# Option 2: Get the raw aiohttp response object
async with manager.raw_get("other-app", "/hello") as response:
assert response.status == 200
result = await response.json()
assert result == {"status": "OK"}
You may pass any argument supported by aiohttp (except method and url) to the request—
such as headers, params, data, json, timeout, etc.
3.3. Custom Interceptors
When using ServiceManager, interceptors will be executed at the appropriate time during the call lifecycle.
You can implement your own interceptors and pass them via the interceptors parameter to implement custom behavior.
Example: a logging interceptor
class LoggingInterceptor(IServiceInterceptor):
"""Logging interceptor"""
def __init__(self, log_request: bool = True, log_response: bool = True):
"""
Args:
log_request: Whether to log request details
log_response: Whether to log response info
"""
self.log_request = log_request
self.log_response = log_response
@property
def name(self) -> str:
return "LoggingInterceptor"
async def before_request(self, context: RequestContext) -> None:
"""Log before sending the request"""
if self.log_request:
logger.info(
f"→ {context.method} {context.service_name}{context.path} | "
f"Headers: {context.kwargs.get('headers', {})} | "
f"Params: {context.kwargs.get('params', {})}"
)
async def after_response(self, context: RequestContext) -> None:
"""Log after receiving the response"""
if self.log_response and context.response:
logger.info(
f"← {context.method} {context.service_name}{context.path} | "
f"Resolved URL: {context.resolved_url} | "
f"Status: {context.response.status} | "
f"Duration: {context.duration:.3f}s | "
f"Size: {len(str(context.result)) if context.result else 0} bytes"
)
async def handle_exception(self, context: RequestContext) -> None:
"""Log when an exception occurs"""
if context.exception:
logger.error(
f"✗ {context.method} {context.service_name}{context.path} | "
f"Exception: {context.exception} | "
f"Duration: {context.duration:.3f}s"
)
@property
def order(self) -> int:
return 99999
Another example: adding authentication headers before the request is sent
class AuthInterceptor(IServiceInterceptor):
"""Authentication interceptor"""
def __init__(self, token: str, header_name: str = "Authorization", prefix: str = "Bearer "):
"""
Args:
token: Authentication token
header_name: HTTP header key
prefix: Token prefix
"""
self.token = token
self.header_name = header_name
self.prefix = prefix
@property
def name(self) -> str:
return "AuthInterceptor"
async def before_request(self, context: RequestContext) -> None:
"""Add auth headers before request"""
if "headers" not in context.kwargs:
context.kwargs["headers"] = {}
context.kwargs["headers"][self.header_name] = f"{self.prefix}{self.token}"
async def after_response(self, context: RequestContext) -> None:
"""Handle authentication-related responses"""
if context.response and context.response.status == 401:
logger.warning(f"Authentication failed for {context.service_name}{context.path}")
async def handle_exception(self, context: RequestContext) -> None:
"""Authentication exception handling"""
pass # Let upper layers handle the error
Interceptor execution order is determined by the order property.
Default is 0, and smaller values execute earlier.
Notes on Interceptors:
- Interceptors with the same name will only be added once. Any subsequent interceptor with the same name will be ignored.
- You can manually add or remove interceptors through the following three methods of
service_manager:
add_interceptor(interceptor: IServiceInterceptor): Add an interceptor.remove_interceptor(name: str): Remove an interceptor by its name.clear_interceptors(): Remove all interceptors.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aio_service_caller-0.1.4.tar.gz.
File metadata
- Download URL: aio_service_caller-0.1.4.tar.gz
- Upload date:
- Size: 18.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e9ef47b793b8152a6a9aca5608fe36805795bfcfcd4c055a5fee3e55a676c262
|
|
| MD5 |
979c504f6962bd69df664da7226e4c84
|
|
| BLAKE2b-256 |
1156be76345438899d2396964c0f66da62733c4718c972eb54e53f4f6df31171
|
File details
Details for the file aio_service_caller-0.1.4-py3-none-any.whl.
File metadata
- Download URL: aio_service_caller-0.1.4-py3-none-any.whl
- Upload date:
- Size: 27.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
75a0c5dbec1760978b0368c9ab26e29cfb379644fcc1155b78b069d03db9bb73
|
|
| MD5 |
8e53b9ac6f6cba7837493e525940f9a3
|
|
| BLAKE2b-256 |
9afa10ab3703340aa68158c88c0fbc0b6ece632b76740f392f1d8c8c8adda00a
|