Skip to main content

Typed Python interfaces for API Platform gateway policies

Project description

APIP SDK Core

PyPI version Python versions License

Python interfaces for writing gateway policies on the WSO2 API Platform.

A policy is a plain Python class that the gateway executor loads at runtime. It receives a rich, fully-typed execution context and returns a structured action that the gateway applies to the live request or response—no configuration files, no generated stubs, no runtime dependencies.


Contents


Requirements

  • Python 3.10 or newer
  • No third-party runtime dependencies

Installation

pip install apip-sdk-core

Quick start

1 · Add a request header

from typing import Any
from apip_sdk_core import (
    ExecutionContext,
    HeaderProcessingMode,
    ProcessingMode,
    RequestHeaderAction,
    RequestHeaderContext,
    RequestHeaderPolicy,
    UpstreamRequestHeaderModifications,
)


class AddCorrelationId(RequestHeaderPolicy):
    """Injects an X-Correlation-ID header into every upstream request."""

    def mode(self) -> ProcessingMode:
        return ProcessingMode(
            request_header_mode=HeaderProcessingMode.PROCESS,
        )

    def on_request_headers(
        self,
        execution_ctx: ExecutionContext,
        ctx: RequestHeaderContext,
        params: dict[str, Any],
    ) -> RequestHeaderAction:
        return UpstreamRequestHeaderModifications(
            headers_to_set={"x-correlation-id": execution_ctx.request_id},
        )

2 · Validate and rewrite a buffered request body

import json
from typing import Any
from apip_sdk_core import (
    BodyProcessingMode,
    ExecutionContext,
    ImmediateResponse,
    ProcessingMode,
    RequestAction,
    RequestContext,
    RequestPolicy,
    UpstreamRequestModifications,
)


class EnforceJsonSchema(RequestPolicy):
    """Rejects requests that are not valid JSON objects."""

    def mode(self) -> ProcessingMode:
        return ProcessingMode(request_body_mode=BodyProcessingMode.BUFFER)

    def on_request_body(
        self,
        execution_ctx: ExecutionContext,
        ctx: RequestContext,
        params: dict[str, Any],
    ) -> RequestAction:
        raw = ctx.body.content if ctx.body else None
        try:
            data = json.loads(raw or b"{}")
        except json.JSONDecodeError:
            return ImmediateResponse(
                status_code=400,
                body=b'{"error":"request body must be valid JSON"}',
                headers={"content-type": "application/json"},
            )
        # Optionally rewrite the body before forwarding
        return UpstreamRequestModifications(body=json.dumps(data).encode())

3 · Stream and log response chunks

import logging
from typing import Any
from apip_sdk_core import (
    BodyProcessingMode,
    ExecutionContext,
    ForwardResponseChunk,
    ProcessingMode,
    ResponseContext,
    ResponseAction,
    ResponseStreamContext,
    StreamBody,
    StreamingResponseAction,
    StreamingResponsePolicy,
    UpstreamRequestModifications,
)

logger = logging.getLogger(__name__)


class StreamingLogger(StreamingResponsePolicy):
    """Logs every streamed response chunk and passes it through unchanged."""

    def mode(self) -> ProcessingMode:
        return ProcessingMode(response_body_mode=BodyProcessingMode.STREAM)

    def needs_more_response_data(self, accumulated: bytes) -> bool:
        return False  # dispatch every chunk immediately

    def on_response_body(
        self,
        execution_ctx: ExecutionContext,
        ctx: ResponseContext,
        params: dict[str, Any],
    ) -> ResponseAction:
        return None  # fallback – never called in STREAM mode

    def on_response_body_chunk(
        self,
        execution_ctx: ExecutionContext,
        ctx: ResponseStreamContext,
        chunk: StreamBody,
        params: dict[str, Any],
    ) -> StreamingResponseAction:
        logger.debug(
            "chunk #%d  eos=%s  bytes=%d",
            chunk.index,
            chunk.end_of_stream,
            len(chunk.chunk),
        )
        return ForwardResponseChunk(body=chunk.chunk)

Policy types

All policy interfaces live in apip_sdk_core (re-exported from apip_sdk_core.policy.v1alpha2).

Interface Hook called Typical use
RequestHeaderPolicy on_request_headers Auth, routing, header injection
RequestPolicy on_request_body Body validation, transformation
ResponseHeaderPolicy on_response_headers CORS, response header rewriting
ResponsePolicy on_response_body Response body transformation
StreamingRequestPolicy on_request_body_chunk Streaming body inspection
StreamingResponsePolicy on_response_body_chunk Streaming response inspection / SSE

Every interface extends the base Policy class:

class Policy(ABC):
    @abstractmethod
    def mode(self) -> ProcessingMode: ...

    def close(self) -> None: ...   # optional – release resources

RequestHeaderPolicy

Called before the request body is read.

class RequestHeaderPolicy(Policy, ABC):
    @abstractmethod
    def on_request_headers(
        self,
        execution_ctx: ExecutionContext,
        ctx: RequestHeaderContext,
        params: dict[str, Any],
    ) -> RequestHeaderAction: ...

Returns UpstreamRequestHeaderModifications | ImmediateResponse | None

RequestPolicy

Called once with the complete buffered request body.

class RequestPolicy(Policy, ABC):
    @abstractmethod
    def on_request_body(
        self,
        execution_ctx: ExecutionContext,
        ctx: RequestContext,
        params: dict[str, Any],
    ) -> RequestAction: ...

Returns UpstreamRequestModifications | ImmediateResponse | None

ResponseHeaderPolicy

Called before the response body is read.

class ResponseHeaderPolicy(Policy, ABC):
    @abstractmethod
    def on_response_headers(
        self,
        execution_ctx: ExecutionContext,
        ctx: ResponseHeaderContext,
        params: dict[str, Any],
    ) -> ResponseHeaderAction: ...

Returns DownstreamResponseHeaderModifications | ImmediateResponse | None

ResponsePolicy

Called once with the complete buffered response body.

class ResponsePolicy(Policy, ABC):
    @abstractmethod
    def on_response_body(
        self,
        execution_ctx: ExecutionContext,
        ctx: ResponseContext,
        params: dict[str, Any],
    ) -> ResponseAction: ...

Returns DownstreamResponseModifications | ImmediateResponse | None

StreamingRequestPolicy

Extends RequestPolicy. Called chunk-by-chunk for the request body.

class StreamingRequestPolicy(RequestPolicy, ABC):
    @abstractmethod
    def needs_more_request_data(self, accumulated: bytes) -> bool: ...

    @abstractmethod
    def on_request_body_chunk(
        self,
        execution_ctx: ExecutionContext,
        ctx: RequestStreamContext,
        chunk: StreamBody,
        params: dict[str, Any],
    ) -> StreamingRequestAction: ...

Returns ForwardRequestChunk | None

StreamingResponsePolicy

Extends ResponsePolicy. Called chunk-by-chunk for the response body.

class StreamingResponsePolicy(ResponsePolicy, ABC):
    @abstractmethod
    def needs_more_response_data(self, accumulated: bytes) -> bool: ...

    @abstractmethod
    def on_response_body_chunk(
        self,
        execution_ctx: ExecutionContext,
        ctx: ResponseStreamContext,
        chunk: StreamBody,
        params: dict[str, Any],
    ) -> StreamingResponseAction: ...

Returns ForwardResponseChunk | TerminateResponseChunk | None


Processing mode

The mode() method declares which pipeline phases your policy wants to be called for. The gateway uses this to skip unnecessary work.

@dataclass(slots=True)
class ProcessingMode:
    request_header_mode:  HeaderProcessingMode = HeaderProcessingMode.SKIP
    request_body_mode:    BodyProcessingMode   = BodyProcessingMode.SKIP
    response_header_mode: HeaderProcessingMode = HeaderProcessingMode.SKIP
    response_body_mode:   BodyProcessingMode   = BodyProcessingMode.SKIP

HeaderProcessingMode

Value Meaning
SKIP Do not invoke the header hook for this policy
PROCESS Invoke the header hook

BodyProcessingMode

Value Meaning
SKIP Do not read the body for this policy
BUFFER Buffer the entire body and call the body hook once
STREAM Call the chunk hook for each body segment

Actions reference

Return an action dataclass (or None to pass through unchanged) from your policy hook.

Request actions

Dataclass Effect
UpstreamRequestHeaderModifications Modify headers / path / method / query params before forwarding to upstream
UpstreamRequestModifications Modify body and headers / path / method / query params before forwarding
ImmediateResponse Short-circuit the request and return a response to the client immediately
None Pass through without modifications

Response actions

Dataclass Effect
DownstreamResponseHeaderModifications Modify response headers before sending to client
DownstreamResponseModifications Modify response body and headers before sending to client
ImmediateResponse Replace the upstream response with a synthetic one
None Pass through without modifications

Streaming actions

Dataclass Effect
ForwardRequestChunk Forward the (optionally rewritten) chunk upstream
ForwardResponseChunk Forward the (optionally rewritten) chunk downstream
TerminateResponseChunk Send a final chunk downstream and end the stream
None Drop the chunk (use with care)

ImmediateResponse fields

@dataclass(slots=True)
class ImmediateResponse:
    status_code: int = 500
    headers: dict[str, str] = field(default_factory=dict)
    body: bytes | None = None
    analytics_metadata: dict[str, Any] = field(default_factory=dict)
    dynamic_metadata: dict[str, dict[str, Any]] = field(default_factory=dict)
    analytics_header_filter: DropHeaderAction = field(default_factory=DropHeaderAction)

Context types

ExecutionContext

Injected into every hook. Contains tracing information and a cancellation check.

@dataclass(slots=True)
class ExecutionContext:
    request_id: str
    phase: ExecutionPhase
    deadline: datetime | None = None
    route_name: str = ""
    policy_name: str = ""
    policy_version: str = ""
    trace_id: str | None = None
    span_id: str | None = None

    def is_cancelled(self) -> bool: ...  # check before expensive operations

SharedContext

Accessible via ctx.shared on every context object.

@dataclass(slots=True)
class SharedContext:
    project_id: str
    request_id: str
    api_id: str
    api_name: str
    api_version: str
    api_kind: str
    api_context: str
    operation_path: str
    auth_context: AuthContext | None
    metadata: dict[str, Any]  # cross-policy mutable bag

AuthContext

Populated when authentication has run upstream of your policy.

@dataclass(slots=True)
class AuthContext:
    authenticated: bool
    authorized: bool
    auth_type: str      # e.g. "JWT", "APIKey"
    subject: str
    issuer: str
    audience: list[str]
    scopes: dict[str, bool]
    credential_id: str
    properties: dict[str, str]
    previous: AuthContext | None  # previous auth layer result

Headers

Read-only multi-value header wrapper. All lookups are case-insensitive.

headers = ctx.headers

headers.get("content-type")      # -> list[str]
headers.has("authorization")     # -> bool
headers.get_all()                 # -> dict[str, list[str]]

for name, values in headers.iterate():
    print(name, values)

Versioned imports

The top-level package always re-exports the latest stable contract. Use these imports in most cases:

from apip_sdk_core import RequestPolicy, ProcessingMode, ImmediateResponse

If you need to pin to a specific contract version (e.g. in a library):

from apip_sdk_core.policy.v1alpha2 import RequestPolicy, ProcessingMode

The current stable contract is v1alpha2.


Development

Build

python -m build

Run tests

python -m unittest discover -s tests

Type-check

mypy src/

License

Apache License 2.0 — see LICENSE for details.

© 2026 WSO2 LLC.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apip_sdk_core-0.1.0.tar.gz (8.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

apip_sdk_core-0.1.0-py3-none-any.whl (11.3 kB view details)

Uploaded Python 3

File details

Details for the file apip_sdk_core-0.1.0.tar.gz.

File metadata

  • Download URL: apip_sdk_core-0.1.0.tar.gz
  • Upload date:
  • Size: 8.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for apip_sdk_core-0.1.0.tar.gz
Algorithm Hash digest
SHA256 1a974f3f9d5337c7689c675d8cc30808ed0998246b3ed3ed336688a0c642a174
MD5 727fe78b5eeab55f091c419c63d25c21
BLAKE2b-256 3e5fd8377e674df664f6262a4c6b749f756438964ada7e25027dee52cb9cd808

See more details on using hashes here.

File details

Details for the file apip_sdk_core-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: apip_sdk_core-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for apip_sdk_core-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 66dd29c592448ba7d48d5292a48da63d9327080c2c876e961b3ea3a4d2b6a1da
MD5 18988d9510d7dbae34ea0e6dd47fb504
BLAKE2b-256 c59b78ecf0967590bf00949057f152b5eff475f76592688a10b5dc5c9ec70e98

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page