Skip to main content

AI pipeline orchestration with automatic async I/O, CPU, and GPU management

Project description

castellum

AI pipeline orchestration with automatic async I/O, CPU, and GPU management.

Install

pip install castellum
# With OpenAI support:
pip install "castellum[openai]"
# With Anthropic support:
pip install "castellum[anthropic]"
# Everything:
pip install "castellum[all]"

Quick start

from castellum import ai_task, pipeline, Runtime
from castellum.backends.remote.openai import OpenAIClient

client = OpenAIClient()

@ai_task(kind="preprocess")
def chunk_docs(docs: list[str]) -> list[str]:
    return [c for doc in docs for c in doc.split("\n\n")]

@ai_task(kind="embedding", device="cuda", batchable=True, max_batch_size=128)
def embed_chunks(chunks: list[str]) -> list[list[float]]:
    return local_embedder.encode(chunks)

@ai_task(kind="llm_remote", model="gpt-4.1-mini")
async def answer(question: str, context: str) -> str:
    return await client.chat(
        model="gpt-4.1-mini",
        messages=[{"role": "user", "content": f"{context}\n\n{question}"}],
    )

@pipeline
async def rag_pipeline(docs: list[str], question: str) -> str:
    chunks = chunk_docs(docs)
    vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
    context = "\n".join(str(v) for v in vecs[:5])
    return await answer(question, context)

runtime = Runtime(
    max_cpu_workers=8,
    max_gpu_workers=2,
    max_concurrent_remote_calls=10,
    tokens_per_minute_limit=80_000,
)

result = await runtime.run(rag_pipeline, docs, question)
await runtime.aclose()

# Or stream tokens from an LLM task:
@ai_task(kind="llm_remote", model="gpt-4.1-mini", stream=True)
async def stream_answer(question: str, context: str):
    async for chunk in client.stream_chat(...):
        yield chunk

@pipeline
async def streaming_pipeline(docs: list[str], question: str):
    chunks = chunk_docs(docs)
    vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
    async for token in stream_answer(question, str(vecs[:5])):
        yield token

async with Runtime() as runtime:
    gen = await runtime.run(streaming_pipeline, docs, "What?")
    async for token in gen:
        print(token, end="", flush=True)

Core concepts

Concept Description
@ai_task Decorates a function with scheduling metadata (kind, device, batchable, stream, etc.)
@pipeline Decorates an async function or async generator as an orchestrated flow
task.map(items) Fan-out a task over a collection with automatic batching
stream=True Tasks that yield tokens bypass the scheduler — callers consume via async for
Runtime Wires the event loop, thread pool, GPU workers, rate limiters, and retry policy

Running tests

# Install with dev dependencies
pip install "castellum[dev]"

# Run all tests
pytest

# Run remote backend tests (requires API endpoint)
export OPENAI_BASE_URL=http://localhost:20128/v1
export OPENAI_API_KEY=sk-...
pytest tests/backends/

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

castellum-0.2.0.tar.gz (74.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

castellum-0.2.0-py3-none-any.whl (18.2 kB view details)

Uploaded Python 3

File details

Details for the file castellum-0.2.0.tar.gz.

File metadata

  • Download URL: castellum-0.2.0.tar.gz
  • Upload date:
  • Size: 74.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for castellum-0.2.0.tar.gz
Algorithm Hash digest
SHA256 421428101d478448b34e13e2e2f535f452256ad2d67d198eb07daab09ee584f6
MD5 1b13e5ff9ec26401842e51c1a2942c82
BLAKE2b-256 71be2cefd46f62976d212016e6bec091b7d7832ca3bdb031d029ebf35f21e6bb

See more details on using hashes here.

File details

Details for the file castellum-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: castellum-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 18.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for castellum-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1dab7759cd2f089802daefe442d44144a822662d82e3a6054d11187f182b45b8
MD5 6f0a04de8710b756355a81d8bf98d19b
BLAKE2b-256 610ba898d202ee429aaa6259fa9c3508d5fb66eabb2b75b93169c6a299970a91

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page