Utilities used in Kaia project
Project description
Table of contents
foundation_kaia
foundation_kaia is a utility library that provides the core building blocks used across the kaia ecosystem:
networking/RPC via marshalling, subprocess isolation via fork, template rendering via prompters,
and various shared utilities.
Fork
Fork is a small utility to run the functions in a different subprocess: particularly the web-servers to ensure full isolation.
You can define the callable, and then use Fork like this:
from foundation_kaia import Fork
with Fork(callable):
# Do useful work
pass
There are also .start() and .terminate() methods if you do not want to use the context manager.
Mostly, Fork is used for unit testing and the documentation purposes, so, with the context manager.
Marshalling
Submodule marshalling organizes remote procedure call for python code via HTTP/websockets protocol and simple
comminucation format that is easy to reproduce outside of python.
Essentials
marshalling starts with defining the interface with the desired functionality:
from foundation_kaia.marshalling import service, endpoint
from typing import Any
@service
class IMyService:
@endpoint
def select(self, id: str, data: list[dict[str, Any]], drop_duplicates: bool|None = None) -> list[str]:
...
This interface needs to be implemented in the service:
class MyService(IMyService):
def select(self, id: str, data: list[dict[str, Any]], drop_duplicates: bool|None = None) -> list[str]:
result = []
for element in data:
if element['id'] == id:
if drop_duplicates and element['text'] in result:
continue
result.append(element['text'])
return result
This is enough to create a web-server that will serve MyService, exposing it's methods (marked with @endpoint)
as the server's endpoints. Also, you can create an API that will implement IMyService, therefore providing a direct
replacement for the direct call of IMyService methods:
from foundation_kaia.marshalling import ApiCall, Server, ServiceComponent, ApiUtils
from foundation_kaia.fork import Fork
PORT = 14000
class MyApi(IMyService):
def __init__(self, base_url: str):
ApiCall.define_endpoints(self, base_url, IMyService)
if __name__ == '__main__':
server = Server(PORT, ServiceComponent(MyService()))
with Fork(server):
base_url = f"http://localhost:{PORT}"
ApiUtils.wait_for_reply(base_url, 10)
api = MyApi(base_url)
result = api.select(
'test',
[
{'id': 'test', 'text': 'example 1'},
{'id': 'test', 'text': 'example 2'},
{'id': 'test', 'text': 'example 1'},
{'id': 'test_1', 'text': 'example 3'},
],
True
)
After this, result will be:
['example 1', 'example 2']
Calling server outside of Python
The server can be called without API and python, as a plain HTTP server. The parameters should be passed as:
- primitives, non-nullable: as a part of the url path in the order of appearence in the method
- primitives, nullable: as a part of the query string
- others: in JSON body.
If you do not wish these complications, you may use force_json_params parameter of @endpoint decorator.
Now, let's call out server:
import requests
if __name__ == '__main__':
server = Server(PORT, ServiceComponent(MyService()))
with Fork(server):
ApiUtils.wait_for_reply(f"http://localhost:{PORT}", 10)
response = requests.post(
f"http://localhost:{PORT}/my-service/select/test/?drop_duplicates=True",
json=dict(data=[
{'id': 'test', 'text': 'example 1'},
{'id': 'test', 'text': 'example 2'},
{'id': 'test', 'text': 'example 1'},
{'id': 'test_1', 'text': 'example 3'},
]))
response.raise_for_status()
response.json() will be:
['example 1', 'example 2']
To get a reference of the services, endpoints, as well as JsonSchema for the passing jsons, please use this:
from foundation_kaia.marshalling.documenter import ApiDocumentation
documentation = ApiDocumentation.parse(MyApi)
For instance, documentation.services['my-service'].endpoints['select'] is:
EndpointDocumentation(name='select',
docstring=None,
url_template='/my-service/select/<id>?drop_duplicates=<drop_duplicates>',
json_schema=JsonSchema(schema={'properties': {'data': {'items': {'additionalProperties': {},
'type': 'object'},
'type': 'array'}},
'type': 'object'},
defs={}),
file_params=[],
return_type='list',
return_is_file=False)
Files
Marshalling can handle incoming files in two different ways:
- Normal files: collected in the buffer at the server's side, and then delivered to the method as blob of bytes. The arguments that have
bytesin their type annotation will be such normal files. The more practical way is to useFileLikeannotation, that allows you to send Path, str, bytes andFileobjects. - Streams: transfered bit-by-bit and delivered to the method as
Iterable[bytes], so they can be processed bit by bit. The arguments with Iterable[bytes] in their annotation will be streaming files. Iterable[bytes]|FileLike is also a streaming file.
Depending on the combination of the arguments, following structures in the requests' body are possible:
- Only json-parameters, no file: the body will be JSON (Content-Type will be set application/json)
- A single file, normal or stream: the body will be this file (Content-Type will be set to application/octet-stream)
- A mix of normal files and json-parameters: multipart-form containing
json_argumentspart with json and the files - Other combinations are not possible in HTTP protocol, but can be used in WebSockets. Also, the structure when the endpoint consumes a file bit-by-bit and produces the output file bit-by-bit, is not possible in HTTP protocol.
If the output of the endpoint is file, it is always streamed.
Let's define a service that demonstrates all the HTTP-compatible combinations, using them with requests library. API doesn't really need an additional demonstration as all of these cases are processed under the hood with API remaning the drop-in replacement for the service in the codebase.
We will use force_json_params=True to ensure the arguments end up in the JSON, not in the query params or url.
We will also use verify_abstract=False argument to merge interface with the service. It's not normally needed as the clear separation between the interface and the service is important, but in the testing environment, it's handy to unite them in one class.
FileLikeHandler is a type-hint-friendly and convenient class that will transform all the files to the bytes at the server's side.
from foundation_kaia.marshalling import service, endpoint, FileLike, FileLikeHandler
from collections.abc import Iterable
@service
class IFileService:
@endpoint(force_json_params=True, verify_abstract=False)
def json_only(self, name: str) -> str:
return name
@endpoint(verify_abstract=False)
def single_file(self, file: FileLike|Iterable[bytes]) -> int:
return len(FileLikeHandler.to_bytes(file))
@endpoint(force_json_params=True, verify_abstract=False)
def json_and_file(self, name: str, file: FileLike) -> int:
return len(FileLikeHandler.to_bytes(file))
@endpoint(force_json_params=True, verify_abstract=False)
def download_file(self, content: str) -> Iterable[bytes]:
encoded = content.encode('utf-8')
chunk_size = 4
for i in range(0, len(encoded), chunk_size):
yield encoded[i:i + chunk_size]
Now let's run the server and call each endpoint with requests to see the body structures.
import json
import requests
from foundation_kaia.marshalling import Server, ServiceComponent, ApiUtils
from foundation_kaia.fork import Fork
PORT = 14010
if __name__ == '__main__':
server = Server(PORT, ServiceComponent(IFileService()))
with Fork(server):
ApiUtils.wait_for_reply(f"http://localhost:{PORT}", 10)
JSON-only: the data sent is a JSON with the required arguments.
response = requests.post(
f"http://localhost:{PORT}/file-service/json-only",
json={"name": "hello"},
)
response.raise_for_status()
response.json() is:
'hello'
Single file / stream:
response = requests.post(
f"http://localhost:{PORT}/file-service/single-file",
data=b"hello",
)
response.raise_for_status()
response.json() is the total byte count:
5
JSON and file
response = requests.post(
f"http://localhost:{PORT}/file-service/json-and-file",
files=[
('json_arguments', json.dumps({"name": "report"})),
('file', b'hello'),
],
)
response.raise_for_status()
response.json() is:
5
Downloading a file: the response body is a stream of bytes regardless of how many chunks
the server yields. Use stream=True and iter_content to consume it, or simply read
response.content to collect everything at once.
response = requests.post(
f"http://localhost:{PORT}/file-service/download-file",
json={"content": "hello"},
stream=True,
)
response.raise_for_status()
response.content reassembles all chunks into the original bytes:
b'hello'
WebSockets and Streaming
For endpoints that produce output incrementally — audio synthesis, token-by-token LLM responses,
or any long-running computation — use the @websocket decorator instead of @endpoint.
WebSocket endpoints share the same parameter conventions as HTTP endpoints (primitives in the URL
path, nullable primitives in the query string, JSON/file body), but they additionally support:
- Streaming output: the return type is
Iterable[bytes],Iterable[str], orIterable[T]for any serialisableT. - Streaming input combined with other parameters:
Iterable[bytes]next to JSON orFileLikearguments. Still, there can be only one incoming stream. - Simultaneous streaming input and streaming output: a full-duplex round-trip over a single WebSocket connection.
Let's define a service that exercises all three variants of the streaming output:
from collections.abc import Iterable
from dataclasses import dataclass
from foundation_kaia.marshalling import service, websocket, Server, ServiceComponent, ApiUtils, Serializer, SerializationContext
from foundation_kaia.fork import Fork
@dataclass
class Token:
text: str
index: int
@service
class IStreamingService:
@websocket(force_json_params=True, verify_abstract=False)
def binary_chunks(self, payload: str) -> Iterable[bytes]:
data = payload.encode('utf-8')
for i in range(0, len(data), 3):
yield data[i:i + 3]
@websocket(force_json_params=True, verify_abstract=False)
def text_lines(self, count: int) -> Iterable[str]:
for i in range(count):
yield f"line:{i}"
@websocket(force_json_params=True, verify_abstract=False)
def token_stream(self, prompt: str) -> Iterable[Token]:
for idx, word in enumerate(prompt.split()):
yield Token(text=word, index=idx)
The wire protocol over WebSocket is straightforward:
- The client sends
{"type": "args", "json": {...}}with the JSON-serialized parameters. - The server replies with
{"type": "stream"}to signal that a stream follows. - For
Iterable[bytes]: the server sends raw binary frames, then{"type": "end"}. - For
Iterable[str]andIterable[T]: the server sends{"type": "item", "value": ...}text frames, then{"type": "end"}.
Let's demonstrate each variant using the websocket-client library directly:
import json
import websocket as ws_client
PORT = 14020
if __name__ == '__main__':
server = Server(PORT, ServiceComponent(IStreamingService()))
with Fork(server):
ApiUtils.wait_for_reply(f"http://localhost:{PORT}", 10)
Iterable[bytes] — binary frames, then {"type": "end"}:
ws = ws_client.create_connection(f"ws://localhost:{PORT}/streaming-service/binary-chunks")
ws.send(json.dumps({"type": "args", "json": {"payload": "hello world"}}))
assert json.loads(ws.recv())["type"] == "stream"
chunks = []
while True:
opcode, data = ws.recv_data()
if opcode == ws_client.ABNF.OPCODE_BINARY:
chunks.append(data)
else:
if json.loads(data.decode())["type"] == "end":
break
ws.close()
Reassembled bytes:
b'hello world'
Iterable[str] — {"type": "item", "value": "..."} text frames:
ws = ws_client.create_connection(f"ws://localhost:{PORT}/streaming-service/text-lines")
ws.send(json.dumps({"type": "args", "json": {"count": 3}}))
assert json.loads(ws.recv())["type"] == "stream"
lines = []
while True:
msg = json.loads(ws.recv())
if msg["type"] == "end":
break
lines.append(msg["value"])
ws.close()
Collected lines:
['line:0', 'line:1', 'line:2']
Iterable[T] — same text frames but value is the JSON-serialised dataclass.
Use Serializer to deserialise each item back to the original type:
token_serializer = Serializer.parse(Token)
ctx = SerializationContext()
ws = ws_client.create_connection(f"ws://localhost:{PORT}/streaming-service/token-stream")
ws.send(json.dumps({"type": "args", "json": {"prompt": "hello world"}}))
assert json.loads(ws.recv())["type"] == "stream"
tokens = []
while True:
msg = json.loads(ws.recv())
if msg["type"] == "end":
break
tokens.append(token_serializer.from_json(msg["value"], ctx))
ws.close()
Collected tokens:
[Token(text='hello', index=0), Token(text='world', index=1)]
Serialization
foundation_kaia uses its own serialization, that allows you to convert to and from JSON complex data structures.
Depending on the type annotations, str can be deserialized to a string, a timestamp, an Enum and so on.
dict can be deserialized to a dictionary, a dataclass or a sqlalchemy record.
The entry point is Serializer.parse(type), which builds a serializer for any supported type.
Call .to_json(value) to produce a JSON-compatible value and .from_json(json_value) to restore the
original Python object.
There are several extensions for the plain JSON objects that are supported in this module.
Dataclasses
from dataclasses import dataclass
from foundation_kaia.marshalling import Serializer
@dataclass
class Point:
x: int
y: int
if __name__ == '__main__':
point = Serializer.parse(Point).from_json({'x': 1, 'y': 2})
The parsed value is:
Point(x=1, y=2)
point_list = Serializer.parse(list[Point]).from_json([{'x': 1, 'y': 2}, {'x': 3, 'y': 4}])
The parsed list is:
[Point(x=1, y=2), Point(x=3, y=4)]
Additional primitives
Enums, datetime and Path are serialized as strings and deserialized as the objects of the corresponding type. Enums are serialized and deserialized by their name, not their value.
from enum import Enum
from datetime import datetime
from pathlib import Path
class Color(Enum):
RED = 'red'
GREEN = 'green'
@dataclass
class Event:
color: Color
when: datetime
log_path: Path
if __name__ == '__main__':
event = Serializer.parse(Event).from_json({
"color": "GREEN",
"when": "2025-01-15T12:00:00",
"log_path": "/var/log/app.log",
})
The deserialized value is:
Event(color=<Color.GREEN: 'green'>,
when=datetime.datetime(2025, 1, 15, 12, 0),
log_path=PosixPath('/var/log/app.log'))
Bytes
Bytes fields are serialized as a tagged object with a base-64 encoded value.
@dataclass
class Blob:
name: str
content: bytes
if __name__ == '__main__':
blob = Serializer.parse(Blob).from_json({'name': 'file.bin', 'content': {'@type': 'bytes', 'value': 'AAECAw=='}})
The parsed value is:
Blob(name='file.bin', content=b'\x00\x01\x02\x03')
SQLAlchemy entities
SQLAlchemy mapped classes are serialized and deserialized using their column attributes.
from sqlalchemy.orm import declarative_base, Mapped, mapped_column
Base = declarative_base()
class UserRecord(Base):
__tablename__ = 'user'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
score: Mapped[float | None] = mapped_column(nullable=True)
if __name__ == '__main__':
user = Serializer.parse(UserRecord).from_json({'id': 1, 'name': 'alice', 'score': 9.5})
The parsed fields are:
{'id': 1, 'name': 'alice', 'score': 9.5}
Polymorphism
When a dataclass field is typed as a base class but holds a subclass at runtime, the serializer
adds @type and @subtype tags so the exact subclass can be reconstructed on deserialization.
When the field holds an exact instance of the declared type, no extra tags are added.
@dataclass
class Shape:
color: str
@dataclass
class Circle(Shape):
radius: float
@dataclass
class Canvas:
title: str
background: Shape
from foundation_kaia.marshalling.serialization.tools import TypeTools
if __name__ == '__main__':
canvas = Serializer.parse(Canvas).from_json({
'title': 'art',
'background': {
'@type': 'dataclass',
'@subtype': TypeTools.type_to_full_name(Circle),
'color': 'red',
'radius': 5.0,
},
})
The background field is reconstructed as the exact subclass; its type name is:
'Circle'
Unsupported data
In case the type is not supported, it will be serialized with pickle and passed as base-64 encoded bytes. This allows the serialization/deserialization in itself to be complete and handle the cases that are not covered by supported types. So, if you want to keep your endpoints compatible with non-python APIs, you need to use only the supported types in them.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kaia_foundation-4.9.91.tar.gz.
File metadata
- Download URL: kaia_foundation-4.9.91.tar.gz
- Upload date:
- Size: 65.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a3476ebeea5084a3a9cb78a16ca19bd8623967a283d19898b35ccabcae3b0f5a
|
|
| MD5 |
b9a34cb5c8afa28a73e28d5ba969d045
|
|
| BLAKE2b-256 |
58b91c6804f95b18c3f3b63ee1e6b0db056cba40e1a4897dbe51ea7bb2d8d7ee
|
File details
Details for the file kaia_foundation-4.9.91-py3-none-any.whl.
File metadata
- Download URL: kaia_foundation-4.9.91-py3-none-any.whl
- Upload date:
- Size: 99.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
89af2710f1d6b988d545eecd24cc3240ab628a166ba2bfe1fcb6f682f579c367
|
|
| MD5 |
e19a3d8e4c6220064f9c8aa153b8ac98
|
|
| BLAKE2b-256 |
af581e204405c7302650a87fea2411a4eebaa295e7a5a718a042ee4d2dba53b3
|