An async service invocation framework with registry, discovery, load balancing, and declarative HTTP client features.
Project description
Aio Service Caller
A lightweight, high-performance Python asynchronous service invocation framework that provides service registration, service discovery, load balancing, and declarative HTTP client capabilities.
1. Features
- 🔍 Service Discovery: Integrated with Nacos service governance, supporting automatic service registration and discovery.
- ⚖️ Load Balancing: Built-in strategies including round-robin, random, weighted random, and weighted round-robin.
- 🔌 Interceptor Mechanism: Similar to Spring’s interceptor design, supporting request-before, response-after, and exception handling phases.
- 🚀 Asynchronous High Performance: Based on
aiohttp, supporting connection pooling for efficient HTTP calls.
2. Installation
pip install aio-service-caller[config]
The
[config]option automatically installsyamlpyconfig. It is recommended to useyamlpyconfigto load configuration from local files (with environment variable expansion), and manage service registry, discovery, and invocation settings.
3. Quick Start
3.1. Add Configuration
Create or modify configuration files under a directory (e.g., /config) such as application.yaml or application-{profile}.yaml:
For details about
yamlpyconfig, see the documentation.
# Service caller configuration
service-caller:
lb-type: round_robin # Load balancing strategy: round_robin, random, weight_round_robin, weight_random
connection-timeout: 6 # Connection timeout (seconds)
read-timeout: 6 # Read timeout (seconds)
connection-pool-size: 100 # aiohttp connection pool size
# Nacos configuration
app-registry:
nacos:
server-addr: "192.168.30.36:9090"
namespace: "dev"
cluster: "DEFAULT"
group: "DEFAULT_GROUP"
ip: "127.0.0.1"
port: 9999
app-name: "my-app"
username: "nacos"
password: "Y789uioJKL"
weight: 1.0
3.2. Create ServiceManager and Invoke Other Services
ServiceManager serves two purposes:
- Register the current service into Nacos according to the configuration.
- Call other services registered in the same Nacos namespace, using
aiohttpfor the underlying HTTP requests.
@pytest.mark.asyncio
async def test_get_service_instances_with_nacos(self):
async with ConfigManager("./") as config_manager:
# Create ServiceManager
async with ServiceManager(
config_manager=config_manager,
interceptors=[LoggingInterceptor(), AuthInterceptor(token="123456"), MetricsInterceptor()]
) as manager:
# Option 1: get the parsed business result
result = await manager.get("other-app", "/hello")
assert result == {"status": "OK"}
# Option 2: get the raw aiohttp response
async with manager.raw_get("other-app", "/hello") as response:
assert response.status == 200
result = await response.json()
assert result == {"status": "OK"}
Each supported HTTP method provides two calling styles:
manager.<method>(service_name, path, **kwargs)→ returns the processed business resultmanager.raw_<method>(service_name, path, **kwargs)→ returns the rawaiohttpresponse object
kwargs is passed directly to aiohttp, allowing you to set headers, params, data, json, timeout, etc.
3.3. Custom Interceptors
When invoking other services via ServiceManager, interceptors registered in the manager will be invoked automatically at appropriate stages.
You can implement custom interceptors by implementing the IServiceInterceptor interface.
class IServiceInterceptor(ABC):
"""Service invocation interceptor interface"""
@abstractmethod
async def before_request(self, context: RequestContext) -> None:
"""Pre-processing before the request is sent"""
pass
@abstractmethod
async def after_response(self, context: RequestContext) -> None:
"""Post-processing after the response is received"""
pass
@abstractmethod
async def handle_exception(self, context: RequestContext) -> None:
"""Exception handling when the request fails"""
pass
@property
@abstractmethod
def name(self) -> str:
"""Interceptor name"""
pass
@property
def order(self) -> int:
"""Execution order, smaller values indicate higher priority"""
return 0
Notes on interceptor behavior:
-
Duplicate interceptors with the same name are ignored — only the first instance is kept.
-
Interceptors can be dynamically managed through:
add_interceptor(interceptor)remove_interceptor(name)clear_interceptors()
3.3.1. Interceptor context Parameter
The context parameter passed into interceptors is a RequestContext object containing all information about the current request.
Attributes available in all stages (before_request, after_response, handle_exception):
method:str— HTTP method (GET,POST,PUT,DELETE, etc.)service_name:str— Name of the target servicepath:str— Request path (e.g.,/api/user/info)protocol:str— Request protocol (httporhttps)kwargs:dict— Request parameters (headers,params,data,json, ...)attributes:Dict[str, Any]— Custom attribute storage
Attributes available only in after_response and handle_exception:
-
resolved_url:Optional[str]— Fully resolved URL after load balancing -
selected_instance:Optional[Any]— Chosen service instance -
response:Optional[ClientResponse]— aiohttp response object -
exception:Optional[Exception]— Exception raised during execution -
result:Any— Final processed result -
start_time:Optional[float]— Start timestamp -
response_time:Optional[float]— Time when response headers were received -
end_time:Optional[float]— End timestamp -
duration:Optional[float]— Total request duration
3.3.2. Interceptor Examples
Logging interceptor:
class LoggingInterceptor(IServiceInterceptor):
"""Logging interceptor"""
def __init__(self, log_request: bool = True, log_response: bool = True):
self.log_request = log_request
self.log_response = log_response
@property
def name(self) -> str:
return "LoggingInterceptor"
async def before_request(self, context: RequestContext) -> None:
if self.log_request:
logger.info(
f"→ {context.method} {context.service_name}{context.path} | "
f"Headers: {context.kwargs.get('headers', {})} | "
f"Params: {context.kwargs.get('params', {})}"
)
async def after_response(self, context: RequestContext) -> None:
if self.log_response and context.response:
logger.info(
f"← {context.method} {context.service_name}{context.path} | "
f"Resolved URL: {context.resolved_url} | "
f"Status: {context.response.status} | "
f"Duration: {context.duration:.3f}s | "
f"Size: {len(str(context.result)) if context.result else 0} bytes"
)
async def handle_exception(self, context: RequestContext) -> None:
if context.exception:
logger.error(
f"✗ {context.method} {context.service_name}{context.path} | "
f"Exception: {context.exception} | "
f"Duration: {context.duration:.3f}s"
)
@property
def order(self) -> int:
return 99999
Authentication interceptor:
class AuthInterceptor(IServiceInterceptor):
"""Authentication interceptor"""
def __init__(self, token: str, header_name: str = "Authorization", prefix: str = "Bearer "):
self.token = token
self.header_name = header_name
self.prefix = prefix
@property
def name(self) -> str:
return "AuthInterceptor"
async def before_request(self, context: RequestContext) -> None:
if "headers" not in context.kwargs:
context.kwargs["headers"] = {}
context.kwargs["headers"][self.header_name] = f"{self.prefix}{self.token}"
async def after_response(self, context: RequestContext) -> None:
if context.response and context.response.status == 401:
logger.warning(f"Authentication failed for {context.service_name}{context.path}")
async def handle_exception(self, context: RequestContext) -> None:
pass
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aio_service_caller-0.1.14.tar.gz.
File metadata
- Download URL: aio_service_caller-0.1.14.tar.gz
- Upload date:
- Size: 18.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
57356ccac23e1e636b0416d240237ed40ac541fa0343928ce4cc9cbb40180570
|
|
| MD5 |
a669a3a4e3716570842037c43039c4fb
|
|
| BLAKE2b-256 |
d9106961baa30307703d9180eff74571e29c8dacf9d8f68f1c0c411e26e1194e
|
File details
Details for the file aio_service_caller-0.1.14-py3-none-any.whl.
File metadata
- Download URL: aio_service_caller-0.1.14-py3-none-any.whl
- Upload date:
- Size: 27.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
99fd208190931843b65c826dc728e93e70f0b0021a9aee3e9213d1ac69148bd5
|
|
| MD5 |
b59a5f736ffbcc5b931095df0ffbb376
|
|
| BLAKE2b-256 |
49a0c107c5a34e5e6582a2acd1957970a5839d9a911bc3cc5437b44e6574af43
|