SLIM-powered messaging
Project description
Pattern Agentic Messaging
An async SLIM wrapper with a FastAPI-like interface
Installation
pip install pattern_agentic_messaging
Usage
Server
Route messages to decorated methods based on a discriminator field
like type:
from pattern_agentic_messaging import PASlimApp, PASlimConfig
from .models import QuestionRequest, StatusRequest, AnswerResponse
config = PASlimConfig(
local_name="org/ns/server/instance1",
endpoint="https://slim.example.com",
auth_secret="shared-secret",
message_discriminator="type"
)
app = PASlimApp(config)
agent = None
@app.on_session_connect
async def on_connect(session):
agent = await create_agent(...)
session.context = {
"agent": agent
}
@app.on_message
async def handle_prompt(session, msg: QuestionRequest):
agent = session.context.get("agent")
response = await agent.ask(msg.question)
await session.send(AnswerResponse(answer=response))
@app.on_message
async def handle_status(session, msg: StatusRequest):
await session.send({"type": "status", "value": "ready"})
@app.on_message
async def handle_other(session, msg):
await session.send({"error": f"Unknown message type: {msg}"})
app.run()
Use PASlimConfigGroup to create a group channel.
The models are pydantic models, which must have a literal field corresponding to the discriminator:
from pydantic import BaseModel
class QuestionRequest(BaseModel):
type: Literal["question"] = "question"
prompt: str
class StatusRequest(BaseModel):
type: Literal["status"] = "status"
class AnswerResponse(BaseModel):
type: Literal["answer"] = "answer"
answer: str
Client
Connect to a specific peer:
from pattern_agentic_messaging import PASlimApp, PASlimConfig
config = PASlimConfig(
local_name="org/ns/client/instance1",
endpoint="https://slim.example.com",
auth_secret="shared-secret"
)
async with PASlimApp(config) as app:
async with await app.connect("org/ns/server/instance1") as session:
await session.send({"type": "prompt", "prompt": "Hello world!"})
async for msg in session:
print(f"RECEIVED: {msg}")
Alternatively to join a group channel:
from pattern_agentic_messaging import PASlimApp, PASlimConfig
config = PASlimConfig(
local_name="org/ns/participant/p1",
endpoint="https://slim.example.com",
auth_secret="shared-secret"
)
async with PASlimApp(config) as app:
async with await app.join_channel() as session:
async for msg in session:
print(f"Channel message: {msg}")
await session.send({"type": "response", "msg": "received"})
Low-level usage
The API behind the decorator pattern can be used directly:
from pattern_agentic_messaging import PASlimApp, PASlimConfig
config = PASlimConfig(
local_name="org/ns/server/instance1",
endpoint="https://slim.example.com",
auth_secret="shared-secret"
)
async with PASlimApp(config) as app:
async for session, msg in app:
if not isinstance(msg, dict) or "prompt" not in msg:
await session.send({"error": "Invalid format"})
continue
result = await process(msg["prompt"])
await session.send({"result": result})
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pattern_agentic_messaging-0.9.0.tar.gz.
File metadata
- Download URL: pattern_agentic_messaging-0.9.0.tar.gz
- Upload date:
- Size: 13.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ce30769380b18d72c105d7da5c844932e8cf1f3f01ad73a7eab33e4ea72dcfee
|
|
| MD5 |
d4e14a83cfadc86259286d15c4cf11de
|
|
| BLAKE2b-256 |
7145a0683b20e2f0f2c8ecba7f50f325433c361dd1b3ea7213a78f85e67f868c
|
File details
Details for the file pattern_agentic_messaging-0.9.0-py3-none-any.whl.
File metadata
- Download URL: pattern_agentic_messaging-0.9.0-py3-none-any.whl
- Upload date:
- Size: 11.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
54577972514e38714c3acabdb00922cf0255ba0b408a22bda34f68de370746b9
|
|
| MD5 |
8d82d1e7bbe022ab1a2875d3257abf1f
|
|
| BLAKE2b-256 |
b97d497a5f0bb2323393331eefd062f7473f37e5aac688faaa59259eba7d6718
|