Typed Python interfaces for API Platform gateway policies
Project description
APIP SDK Core
Python interfaces for writing gateway policies on the WSO2 API Platform.
A policy is a plain Python class that the gateway executor loads at runtime. It receives a rich, fully-typed execution context and returns a structured action that the gateway applies to the live request or response—no configuration files, no generated stubs, no runtime dependencies.
Contents
- Requirements
- Installation
- Quick start
- Policy types
- Processing mode
- Actions reference
- Context types
- Versioned imports
- Development
- License
Requirements
- Python 3.10 or newer
- No third-party runtime dependencies
Installation
pip install apip-sdk-core
Quick start
1 · Add a request header
from typing import Any
from apip_sdk_core import (
ExecutionContext,
HeaderProcessingMode,
ProcessingMode,
RequestHeaderAction,
RequestHeaderContext,
RequestHeaderPolicy,
UpstreamRequestHeaderModifications,
)
class AddCorrelationId(RequestHeaderPolicy):
"""Injects an X-Correlation-ID header into every upstream request."""
def mode(self) -> ProcessingMode:
return ProcessingMode(
request_header_mode=HeaderProcessingMode.PROCESS,
)
def on_request_headers(
self,
execution_ctx: ExecutionContext,
ctx: RequestHeaderContext,
params: dict[str, Any],
) -> RequestHeaderAction:
return UpstreamRequestHeaderModifications(
headers_to_set={"x-correlation-id": execution_ctx.request_id},
)
2 · Validate and rewrite a buffered request body
import json
from typing import Any
from apip_sdk_core import (
BodyProcessingMode,
ExecutionContext,
ImmediateResponse,
ProcessingMode,
RequestAction,
RequestContext,
RequestPolicy,
UpstreamRequestModifications,
)
class EnforceJsonSchema(RequestPolicy):
"""Rejects requests that are not valid JSON objects."""
def mode(self) -> ProcessingMode:
return ProcessingMode(request_body_mode=BodyProcessingMode.BUFFER)
def on_request_body(
self,
execution_ctx: ExecutionContext,
ctx: RequestContext,
params: dict[str, Any],
) -> RequestAction:
raw = ctx.body.content if ctx.body else None
try:
data = json.loads(raw or b"{}")
except json.JSONDecodeError:
return ImmediateResponse(
status_code=400,
body=b'{"error":"request body must be valid JSON"}',
headers={"content-type": "application/json"},
)
# Optionally rewrite the body before forwarding
return UpstreamRequestModifications(body=json.dumps(data).encode())
3 · Stream and log response chunks
import logging
from typing import Any
from apip_sdk_core import (
BodyProcessingMode,
ExecutionContext,
ForwardResponseChunk,
ProcessingMode,
ResponseContext,
ResponseAction,
ResponseStreamContext,
StreamBody,
StreamingResponseAction,
StreamingResponsePolicy,
UpstreamRequestModifications,
)
logger = logging.getLogger(__name__)
class StreamingLogger(StreamingResponsePolicy):
"""Logs every streamed response chunk and passes it through unchanged."""
def mode(self) -> ProcessingMode:
return ProcessingMode(response_body_mode=BodyProcessingMode.STREAM)
def needs_more_response_data(self, accumulated: bytes) -> bool:
return False # dispatch every chunk immediately
def on_response_body(
self,
execution_ctx: ExecutionContext,
ctx: ResponseContext,
params: dict[str, Any],
) -> ResponseAction:
return None # fallback – never called in STREAM mode
def on_response_body_chunk(
self,
execution_ctx: ExecutionContext,
ctx: ResponseStreamContext,
chunk: StreamBody,
params: dict[str, Any],
) -> StreamingResponseAction:
logger.debug(
"chunk #%d eos=%s bytes=%d",
chunk.index,
chunk.end_of_stream,
len(chunk.chunk),
)
return ForwardResponseChunk(body=chunk.chunk)
Policy types
All policy interfaces live in apip_sdk_core (re-exported from apip_sdk_core.policy.v1alpha2).
| Interface | Hook called | Typical use |
|---|---|---|
RequestHeaderPolicy |
on_request_headers |
Auth, routing, header injection |
RequestPolicy |
on_request_body |
Body validation, transformation |
ResponseHeaderPolicy |
on_response_headers |
CORS, response header rewriting |
ResponsePolicy |
on_response_body |
Response body transformation |
StreamingRequestPolicy |
on_request_body_chunk |
Streaming body inspection |
StreamingResponsePolicy |
on_response_body_chunk |
Streaming response inspection / SSE |
Every interface extends the base Policy class:
class Policy(ABC):
@abstractmethod
def mode(self) -> ProcessingMode: ...
def close(self) -> None: ... # optional – release resources
RequestHeaderPolicy
Called before the request body is read.
class RequestHeaderPolicy(Policy, ABC):
@abstractmethod
def on_request_headers(
self,
execution_ctx: ExecutionContext,
ctx: RequestHeaderContext,
params: dict[str, Any],
) -> RequestHeaderAction: ...
Returns UpstreamRequestHeaderModifications | ImmediateResponse | None
RequestPolicy
Called once with the complete buffered request body.
class RequestPolicy(Policy, ABC):
@abstractmethod
def on_request_body(
self,
execution_ctx: ExecutionContext,
ctx: RequestContext,
params: dict[str, Any],
) -> RequestAction: ...
Returns UpstreamRequestModifications | ImmediateResponse | None
ResponseHeaderPolicy
Called before the response body is read.
class ResponseHeaderPolicy(Policy, ABC):
@abstractmethod
def on_response_headers(
self,
execution_ctx: ExecutionContext,
ctx: ResponseHeaderContext,
params: dict[str, Any],
) -> ResponseHeaderAction: ...
Returns DownstreamResponseHeaderModifications | ImmediateResponse | None
ResponsePolicy
Called once with the complete buffered response body.
class ResponsePolicy(Policy, ABC):
@abstractmethod
def on_response_body(
self,
execution_ctx: ExecutionContext,
ctx: ResponseContext,
params: dict[str, Any],
) -> ResponseAction: ...
Returns DownstreamResponseModifications | ImmediateResponse | None
StreamingRequestPolicy
Extends RequestPolicy. Called chunk-by-chunk for the request body.
class StreamingRequestPolicy(RequestPolicy, ABC):
@abstractmethod
def needs_more_request_data(self, accumulated: bytes) -> bool: ...
@abstractmethod
def on_request_body_chunk(
self,
execution_ctx: ExecutionContext,
ctx: RequestStreamContext,
chunk: StreamBody,
params: dict[str, Any],
) -> StreamingRequestAction: ...
Returns ForwardRequestChunk | None
StreamingResponsePolicy
Extends ResponsePolicy. Called chunk-by-chunk for the response body.
class StreamingResponsePolicy(ResponsePolicy, ABC):
@abstractmethod
def needs_more_response_data(self, accumulated: bytes) -> bool: ...
@abstractmethod
def on_response_body_chunk(
self,
execution_ctx: ExecutionContext,
ctx: ResponseStreamContext,
chunk: StreamBody,
params: dict[str, Any],
) -> StreamingResponseAction: ...
Returns ForwardResponseChunk | TerminateResponseChunk | None
Processing mode
The mode() method declares which pipeline phases your policy wants to be called for. The gateway uses this to skip unnecessary work.
@dataclass(slots=True)
class ProcessingMode:
request_header_mode: HeaderProcessingMode = HeaderProcessingMode.SKIP
request_body_mode: BodyProcessingMode = BodyProcessingMode.SKIP
response_header_mode: HeaderProcessingMode = HeaderProcessingMode.SKIP
response_body_mode: BodyProcessingMode = BodyProcessingMode.SKIP
HeaderProcessingMode
| Value | Meaning |
|---|---|
SKIP |
Do not invoke the header hook for this policy |
PROCESS |
Invoke the header hook |
BodyProcessingMode
| Value | Meaning |
|---|---|
SKIP |
Do not read the body for this policy |
BUFFER |
Buffer the entire body and call the body hook once |
STREAM |
Call the chunk hook for each body segment |
Actions reference
Return an action dataclass (or None to pass through unchanged) from your policy hook.
Request actions
| Dataclass | Effect |
|---|---|
UpstreamRequestHeaderModifications |
Modify headers / path / method / query params before forwarding to upstream |
UpstreamRequestModifications |
Modify body and headers / path / method / query params before forwarding |
ImmediateResponse |
Short-circuit the request and return a response to the client immediately |
None |
Pass through without modifications |
Response actions
| Dataclass | Effect |
|---|---|
DownstreamResponseHeaderModifications |
Modify response headers before sending to client |
DownstreamResponseModifications |
Modify response body and headers before sending to client |
ImmediateResponse |
Replace the upstream response with a synthetic one |
None |
Pass through without modifications |
Streaming actions
| Dataclass | Effect |
|---|---|
ForwardRequestChunk |
Forward the (optionally rewritten) chunk upstream |
ForwardResponseChunk |
Forward the (optionally rewritten) chunk downstream |
TerminateResponseChunk |
Send a final chunk downstream and end the stream |
None |
Drop the chunk (use with care) |
ImmediateResponse fields
@dataclass(slots=True)
class ImmediateResponse:
status_code: int = 500
headers: dict[str, str] = field(default_factory=dict)
body: bytes | None = None
analytics_metadata: dict[str, Any] = field(default_factory=dict)
dynamic_metadata: dict[str, dict[str, Any]] = field(default_factory=dict)
analytics_header_filter: DropHeaderAction = field(default_factory=DropHeaderAction)
Context types
ExecutionContext
Injected into every hook. Contains tracing information and a cancellation check.
@dataclass(slots=True)
class ExecutionContext:
request_id: str
phase: ExecutionPhase
deadline: datetime | None = None
route_name: str = ""
policy_name: str = ""
policy_version: str = ""
trace_id: str | None = None
span_id: str | None = None
def is_cancelled(self) -> bool: ... # check before expensive operations
SharedContext
Accessible via ctx.shared on every context object.
@dataclass(slots=True)
class SharedContext:
project_id: str
request_id: str
api_id: str
api_name: str
api_version: str
api_kind: str
api_context: str
operation_path: str
auth_context: AuthContext | None
metadata: dict[str, Any] # cross-policy mutable bag
AuthContext
Populated when authentication has run upstream of your policy.
@dataclass(slots=True)
class AuthContext:
authenticated: bool
authorized: bool
auth_type: str # e.g. "JWT", "APIKey"
subject: str
issuer: str
audience: list[str]
scopes: dict[str, bool]
credential_id: str
properties: dict[str, str]
previous: AuthContext | None # previous auth layer result
Headers
Read-only multi-value header wrapper. All lookups are case-insensitive.
headers = ctx.headers
headers.get("content-type") # -> list[str]
headers.has("authorization") # -> bool
headers.get_all() # -> dict[str, list[str]]
for name, values in headers.iterate():
print(name, values)
Versioned imports
The top-level package always re-exports the latest stable contract. Use these imports in most cases:
from apip_sdk_core import RequestPolicy, ProcessingMode, ImmediateResponse
If you need to pin to a specific contract version (e.g. in a library):
from apip_sdk_core.policy.v1alpha2 import RequestPolicy, ProcessingMode
The current stable contract is v1alpha2.
Development
Build
python -m build
Run tests
python -m unittest discover -s tests
Type-check
mypy src/
License
Apache License 2.0 — see LICENSE for details.
© 2026 WSO2 LLC.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file apip_sdk_core-0.1.0.tar.gz.
File metadata
- Download URL: apip_sdk_core-0.1.0.tar.gz
- Upload date:
- Size: 8.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1a974f3f9d5337c7689c675d8cc30808ed0998246b3ed3ed336688a0c642a174
|
|
| MD5 |
727fe78b5eeab55f091c419c63d25c21
|
|
| BLAKE2b-256 |
3e5fd8377e674df664f6262a4c6b749f756438964ada7e25027dee52cb9cd808
|
File details
Details for the file apip_sdk_core-0.1.0-py3-none-any.whl.
File metadata
- Download URL: apip_sdk_core-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
66dd29c592448ba7d48d5292a48da63d9327080c2c876e961b3ea3a4d2b6a1da
|
|
| MD5 |
18988d9510d7dbae34ea0e6dd47fb504
|
|
| BLAKE2b-256 |
c59b78ecf0967590bf00949057f152b5eff475f76592688a10b5dc5c9ec70e98
|