AI pipeline orchestration with automatic async I/O, CPU, and GPU management
Project description
castellum
AI pipeline orchestration with automatic async I/O, CPU, and GPU management.
Install
pip install castellum
# With OpenAI support:
pip install "castellum[openai]"
# With Anthropic support:
pip install "castellum[anthropic]"
# Everything:
pip install "castellum[all]"
Quick start
from castellum import ai_task, pipeline, Runtime
from castellum.backends.remote.openai import OpenAIClient
client = OpenAIClient()
@ai_task(kind="preprocess")
def chunk_docs(docs: list[str]) -> list[str]:
return [c for doc in docs for c in doc.split("\n\n")]
@ai_task(kind="embedding", device="cuda", batchable=True, max_batch_size=128)
def embed_chunks(chunks: list[str]) -> list[list[float]]:
return local_embedder.encode(chunks)
@ai_task(kind="llm_remote", model="gpt-4.1-mini")
async def answer(question: str, context: str) -> str:
return await client.chat(
model="gpt-4.1-mini",
messages=[{"role": "user", "content": f"{context}\n\n{question}"}],
)
@pipeline
async def rag_pipeline(docs: list[str], question: str) -> str:
chunks = chunk_docs(docs)
vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
context = "\n".join(str(v) for v in vecs[:5])
return await answer(question, context)
runtime = Runtime(
max_cpu_workers=8,
max_gpu_workers=2,
max_concurrent_remote_calls=10,
tokens_per_minute_limit=80_000,
)
result = await runtime.run(rag_pipeline, docs, question)
await runtime.aclose()
# Or stream tokens from an LLM task:
@ai_task(kind="llm_remote", model="gpt-4.1-mini", stream=True)
async def stream_answer(question: str, context: str):
async for chunk in client.stream_chat(...):
yield chunk
@pipeline
async def streaming_pipeline(docs: list[str], question: str):
chunks = chunk_docs(docs)
vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
async for token in stream_answer(question, str(vecs[:5])):
yield token
async with Runtime() as runtime:
gen = await runtime.run(streaming_pipeline, docs, "What?")
async for token in gen:
print(token, end="", flush=True)
Core concepts
| Concept | Description |
|---|---|
@ai_task |
Decorates a function with scheduling metadata (kind, device, batchable, stream, etc.) |
@pipeline |
Decorates an async function or async generator as an orchestrated flow |
task.map(items) |
Fan-out a task over a collection with automatic batching |
stream=True |
Tasks that yield tokens bypass the scheduler — callers consume via async for |
Runtime |
Wires the event loop, thread pool, GPU workers, rate limiters, and retry policy |
Running tests
# Install with dev dependencies
pip install "castellum[dev]"
# Run all tests
pytest
# Run remote backend tests (requires API endpoint)
export OPENAI_BASE_URL=http://localhost:20128/v1
export OPENAI_API_KEY=sk-...
pytest tests/backends/
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
castellum-0.2.0.tar.gz
(74.0 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
castellum-0.2.0-py3-none-any.whl
(18.2 kB
view details)
File details
Details for the file castellum-0.2.0.tar.gz.
File metadata
- Download URL: castellum-0.2.0.tar.gz
- Upload date:
- Size: 74.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
421428101d478448b34e13e2e2f535f452256ad2d67d198eb07daab09ee584f6
|
|
| MD5 |
1b13e5ff9ec26401842e51c1a2942c82
|
|
| BLAKE2b-256 |
71be2cefd46f62976d212016e6bec091b7d7832ca3bdb031d029ebf35f21e6bb
|
File details
Details for the file castellum-0.2.0-py3-none-any.whl.
File metadata
- Download URL: castellum-0.2.0-py3-none-any.whl
- Upload date:
- Size: 18.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1dab7759cd2f089802daefe442d44144a822662d82e3a6054d11187f182b45b8
|
|
| MD5 |
6f0a04de8710b756355a81d8bf98d19b
|
|
| BLAKE2b-256 |
610ba898d202ee429aaa6259fa9c3508d5fb66eabb2b75b93169c6a299970a91
|