Python SDK for Akasha — The Shared Cognitive Fabric for Intelligent Agent Systems
Project description
Akasha Python SDK
The official Python client for Akasha — The Shared Cognitive Fabric for Intelligent Agent Systems.
Installation
pip install akasha-client
Quick Start
from akasha import AkashaHttpClient
# Connect (auto-TLS with self-signed certs → verify_ssl=False for dev)
client = AkashaHttpClient(
"https://localhost:7777",
api_key="ak_your_api_key",
verify_ssl=False,
)
# Write a record
record = client.put("agents/planner/state", {
"status": "thinking",
"task": "summarize document",
"confidence": 0.85,
})
print(f"Created: {record.path} (v{record.version})")
# Read it back
state = client.get("agents/planner/state")
print(f"Status: {state.value['status']}")
# Query with glob patterns
all_states = client.query("agents/*/state")
for r in all_states:
print(f" {r.path}: {r.value}")
# Delete
client.delete("agents/planner/state")
client.close()
CAS (Compare-And-Swap) — Optimistic Concurrency
Prevent lost updates when multiple agents write to the same path:
from akasha import AkashaHttpClient, CasConflictError
client = AkashaHttpClient("https://localhost:7777", verify_ssl=False)
# Read current version
record = client.get("shared/counter")
try:
# Only update if nobody else changed it since our read
updated = client.put_cas(
"shared/counter",
{"count": record.value["count"] + 1},
expected_version=record.version,
)
print(f"Updated to v{updated.version}")
except CasConflictError as e:
print(f"Conflict! Expected v{e.expected_version}, got v{e.actual_version}")
print(f"Current value: {e.current}")
Memory Layers
Akasha organizes knowledge into four cognitive layers:
from akasha import AkashaHttpClient, MemoryLayer
client = AkashaHttpClient("https://localhost:7777", verify_ssl=False)
# Working memory (volatile, 30 min TTL)
client.put(
f"{MemoryLayer.WORKING.prefix}agent-1/scratch",
{"current_task": "analyze"},
ttl_seconds=MemoryLayer.WORKING.default_ttl_seconds,
)
# Episodic memory (event log, 24h TTL)
client.put(
f"{MemoryLayer.EPISODIC.prefix}agent-1/events/2024-01-15",
{"action": "summarized", "input_tokens": 4500},
ttl_seconds=MemoryLayer.EPISODIC.default_ttl_seconds,
)
# Semantic memory (permanent knowledge)
client.put(
f"{MemoryLayer.SEMANTIC.prefix}domain/users/preferences",
{"theme": "dark", "language": "es"},
)
# Query all episodic memories
episodes = client.query(MemoryLayer.EPISODIC.glob_all)
Authentication
# API Key (recommended for agents)
client = AkashaHttpClient(
"https://akasha.example.com",
api_key="ak_live_abc123",
)
# JWT Token (for user sessions)
client = AkashaHttpClient(
"https://akasha.example.com",
token="eyJhbGciOiJIUzI1NiIs...",
)
Context Manager
with AkashaHttpClient("https://localhost:7777", verify_ssl=False) as client:
client.put("test/hello", {"message": "world"})
record = client.get("test/hello")
print(record.value)
# Connection auto-closed
Async Support
import asyncio
from akasha import AsyncAkashaClient # gRPC-based async client
async def main():
async with AsyncAkashaClient("localhost:50051") as client:
await client.put("agents/worker/status", {"busy": True})
record = await client.get("agents/worker/status")
print(record.value)
asyncio.run(main())
API Reference
AkashaHttpClient
| Method | Description |
|---|---|
put(path, value, *, ttl_seconds, tags) |
Write a record |
put_cas(path, value, *, expected_version, ...) |
Write with CAS concurrency control |
get(path) → Record | None |
Read a record |
delete(path) → bool |
Delete a record |
query(pattern, *, limit) → list[Record] |
Glob query |
list_agents() → list[dict] |
List registered agents |
tree() → dict |
Full state tree snapshot |
health() → dict |
Health check |
metrics() → dict |
Server metrics |
Record
| Field | Type | Description |
|---|---|---|
path |
str |
Hierarchical path |
value |
Any |
Stored value (auto JSON) |
version |
int |
Monotonic version counter |
created_at |
datetime |
Creation timestamp |
updated_at |
datetime |
Last update timestamp |
ttl_seconds |
float | None |
Time-to-live |
tags |
dict[str, str] |
Key-value metadata |
CasConflictError
| Field | Type | Description |
|---|---|---|
expected_version |
int |
Version you expected |
actual_version |
int |
Server's current version |
current |
dict |
Current record data |
Requirements
- Python ≥ 3.10
httpx≥ 0.27 (HTTP client)grpcio≥ 1.60 (gRPC client)msgpack≥ 1.0 (serialization)
Links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
akasha_client-1.0.9.tar.gz
(19.5 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file akasha_client-1.0.9.tar.gz.
File metadata
- Download URL: akasha_client-1.0.9.tar.gz
- Upload date:
- Size: 19.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b0fcb0ef926235f447d1756854c14b4e092b670e7bc587ecc8069c448b771ac2
|
|
| MD5 |
17eacbd91c096e15855a47b3b1b8b293
|
|
| BLAKE2b-256 |
120b6b5c8f41ef5f763f6e8e1b478590a364e380756f59442be5c25a536c9b4f
|
File details
Details for the file akasha_client-1.0.9-py3-none-any.whl.
File metadata
- Download URL: akasha_client-1.0.9-py3-none-any.whl
- Upload date:
- Size: 24.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ec34487781939e88e7dbfc1bee95cee918f30d6778a004667cc1057d01d5727
|
|
| MD5 |
96cb8be41135c8b8bcfa236cd3aad778
|
|
| BLAKE2b-256 |
1d575b8399f2f39184b00861ecf45c5b3e9b3bcb2e99a14a9734467172dbd212
|