Skip to main content

An async service invocation framework with registry, discovery, load balancing, and declarative HTTP client features.

Project description

Aio Service Caller

A lightweight, high-performance Python asynchronous service invocation framework that provides service registration, service discovery, load balancing, and declarative HTTP client capabilities.


1. Features

  • 🔍 Service Discovery: Integrated with Nacos service governance, supporting automatic service registration and discovery.
  • ⚖️ Load Balancing: Built-in strategies including round-robin, random, weighted random, and weighted round-robin.
  • 🔌 Interceptor Mechanism: Similar to Spring’s interceptor design, supporting request-before, response-after, and exception handling phases.
  • 🚀 Asynchronous High Performance: Based on aiohttp, supporting connection pooling for efficient HTTP calls.

2. Installation

pip install aio-service-caller[config]

The [config] option automatically installs yamlpyconfig. It is recommended to use yamlpyconfig to load configuration from local files (with environment variable expansion), and manage service registry, discovery, and invocation settings.


3. Quick Start

3.1. Add Configuration

Create or modify configuration files under a directory (e.g., /config) such as application.yaml or application-{profile}.yaml:

For details about yamlpyconfig, see the documentation.

# Service caller configuration
service-caller:
  lb-type: round_robin      # Load balancing strategy: round_robin, random, weight_round_robin, weight_random
  connection-timeout: 6     # Connection timeout (seconds)
  read-timeout: 6           # Read timeout (seconds)
  connection-pool-size: 100 # aiohttp connection pool size

# Nacos configuration
app-registry:
  nacos:
    server-addr: "192.168.30.36:9090"
    namespace: "dev"
    cluster: "DEFAULT"
    group: "DEFAULT_GROUP"
    ip: "127.0.0.1"
    port: 9999
    app-name: "my-app"
    username: "nacos"
    password: "Y789uioJKL"
    weight: 1.0

3.2. Create ServiceManager and Invoke Other Services

ServiceManager serves two purposes:

  1. Register the current service into Nacos according to the configuration.
  2. Call other services registered in the same Nacos namespace, using aiohttp for the underlying HTTP requests.
@pytest.mark.asyncio
async def test_get_service_instances_with_nacos(self):
    async with ConfigManager("./") as config_manager:
        # Create ServiceManager
        async with ServiceManager(
            config_manager=config_manager,
            interceptors=[LoggingInterceptor(), AuthInterceptor(token="123456"), MetricsInterceptor()]
        ) as manager:

            # Option 1: get the parsed business result
            result = await manager.get("other-app", "/hello")
            assert result == {"status": "OK"}

            # Option 2: get the raw aiohttp response
            async with manager.raw_get("other-app", "/hello") as response:
                assert response.status == 200
                result = await response.json()
                assert result == {"status": "OK"}

Each supported HTTP method provides two calling styles:

  1. manager.<method>(service_name, path, **kwargs) → returns the processed business result
  2. manager.raw_<method>(service_name, path, **kwargs) → returns the raw aiohttp response object

kwargs is passed directly to aiohttp, allowing you to set headers, params, data, json, timeout, etc.


3.3. Custom Interceptors

When invoking other services via ServiceManager, interceptors registered in the manager will be invoked automatically at appropriate stages. You can implement custom interceptors by implementing the IServiceInterceptor interface.

class IServiceInterceptor(ABC):
    """Service invocation interceptor interface"""

    @abstractmethod
    async def before_request(self, context: RequestContext) -> None:
        """Pre-processing before the request is sent"""
        pass

    @abstractmethod
    async def after_response(self, context: RequestContext) -> None:
        """Post-processing after the response is received"""
        pass

    @abstractmethod
    async def handle_exception(self, context: RequestContext) -> None:
        """Exception handling when the request fails"""
        pass

    @property
    @abstractmethod
    def name(self) -> str:
        """Interceptor name"""
        pass

    @property
    def order(self) -> int:
        """Execution order, smaller values indicate higher priority"""
        return 0

Notes on interceptor behavior:

  1. Duplicate interceptors with the same name are ignored — only the first instance is kept.

  2. Interceptors can be dynamically managed through:

    • add_interceptor(interceptor)
    • remove_interceptor(name)
    • clear_interceptors()

3.3.1. Interceptor context Parameter

The context parameter passed into interceptors is a RequestContext object containing all information about the current request.

Attributes available in all stages (before_request, after_response, handle_exception):

  1. method: str — HTTP method (GET, POST, PUT, DELETE, etc.)
  2. service_name: str — Name of the target service
  3. path: str — Request path (e.g., /api/user/info)
  4. protocol: str — Request protocol (http or https)
  5. kwargs: dict — Request parameters (headers, params, data, json, ...)
  6. attributes: Dict[str, Any] — Custom attribute storage

Attributes available only in after_response and handle_exception:

  1. resolved_url: Optional[str] — Fully resolved URL after load balancing

  2. selected_instance: Optional[Any] — Chosen service instance

  3. response: Optional[ClientResponse] — aiohttp response object

  4. exception: Optional[Exception] — Exception raised during execution

  5. result: Any — Final processed result

  6. start_time: Optional[float] — Start timestamp

  7. response_time: Optional[float] — Time when response headers were received

  8. end_time: Optional[float] — End timestamp

  9. duration: Optional[float] — Total request duration


3.3.2. Interceptor Examples

Logging interceptor:

class LoggingInterceptor(IServiceInterceptor):
    """Logging interceptor"""

    def __init__(self, log_request: bool = True, log_response: bool = True):
        self.log_request = log_request
        self.log_response = log_response

    @property
    def name(self) -> str:
        return "LoggingInterceptor"

    async def before_request(self, context: RequestContext) -> None:
        if self.log_request:
            logger.info(
                f"→ {context.method} {context.service_name}{context.path} | "
                f"Headers: {context.kwargs.get('headers', {})} | "
                f"Params: {context.kwargs.get('params', {})}"
            )

    async def after_response(self, context: RequestContext) -> None:
        if self.log_response and context.response:
            logger.info(
                f"← {context.method} {context.service_name}{context.path} | "
                f"Resolved URL: {context.resolved_url} | "
                f"Status: {context.response.status} | "
                f"Duration: {context.duration:.3f}s | "
                f"Size: {len(str(context.result)) if context.result else 0} bytes"
            )

    async def handle_exception(self, context: RequestContext) -> None:
        if context.exception:
            logger.error(
                f"✗ {context.method} {context.service_name}{context.path} | "
                f"Exception: {context.exception} | "
                f"Duration: {context.duration:.3f}s"
            )

    @property
    def order(self) -> int:
        return 99999

Authentication interceptor:

class AuthInterceptor(IServiceInterceptor):
    """Authentication interceptor"""

    def __init__(self, token: str, header_name: str = "Authorization", prefix: str = "Bearer "):
        self.token = token
        self.header_name = header_name
        self.prefix = prefix

    @property
    def name(self) -> str:
        return "AuthInterceptor"

    async def before_request(self, context: RequestContext) -> None:
        if "headers" not in context.kwargs:
            context.kwargs["headers"] = {}

        context.kwargs["headers"][self.header_name] = f"{self.prefix}{self.token}"

    async def after_response(self, context: RequestContext) -> None:
        if context.response and context.response.status == 401:
            logger.warning(f"Authentication failed for {context.service_name}{context.path}")

    async def handle_exception(self, context: RequestContext) -> None:
        pass

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_service_caller-0.1.15.tar.gz (18.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_service_caller-0.1.15-py3-none-any.whl (27.7 kB view details)

Uploaded Python 3

File details

Details for the file aio_service_caller-0.1.15.tar.gz.

File metadata

  • Download URL: aio_service_caller-0.1.15.tar.gz
  • Upload date:
  • Size: 18.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.8

File hashes

Hashes for aio_service_caller-0.1.15.tar.gz
Algorithm Hash digest
SHA256 75cecee14e19c1ba7e84f5b27bd2f554fe1c81fbc75f9879dc65c0fe940eab8a
MD5 21918a272b60ddf7dac038ce80420647
BLAKE2b-256 6cd24d608021106e0f4aab8a4b4b3e889a7b129c699250552deead8a4ed64e4d

See more details on using hashes here.

File details

Details for the file aio_service_caller-0.1.15-py3-none-any.whl.

File metadata

File hashes

Hashes for aio_service_caller-0.1.15-py3-none-any.whl
Algorithm Hash digest
SHA256 0c097210b9ce18ff6c8d899c83bd434509a5228abd0f1361a8b702e940444d8e
MD5 d68c71e70c2f17833e28cb2b60783794
BLAKE2b-256 6d6a3bc8dca1b82f029851341c351f0229f40fe8ea999e8732b56cabf4264685

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page