Skip to main content

AI pipeline orchestration with automatic async I/O, CPU, and GPU management

Project description

castellum

AI pipeline orchestration with automatic async I/O, CPU, and GPU management.

Install

pip install castellum
# With OpenAI support:
pip install "castellum[openai]"
# With Anthropic support:
pip install "castellum[anthropic]"
# Everything:
pip install "castellum[all]"

Quick start

from castellum import ai_task, pipeline, Runtime
from castellum.backends.remote.openai import OpenAIClient

client = OpenAIClient()

@ai_task(kind="preprocess")
def chunk_docs(docs: list[str]) -> list[str]:
    return [c for doc in docs for c in doc.split("\n\n")]

@ai_task(kind="embedding", device="cuda", batchable=True, max_batch_size=128)
def embed_chunks(chunks: list[str]) -> list[list[float]]:
    return local_embedder.encode(chunks)

@ai_task(kind="llm_remote", model="gpt-4.1-mini")
async def answer(question: str, context: str) -> str:
    return await client.chat(
        model="gpt-4.1-mini",
        messages=[{"role": "user", "content": f"{context}\n\n{question}"}],
    )

@pipeline
async def rag_pipeline(docs: list[str], question: str) -> str:
    chunks = chunk_docs(docs)
    vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
    context = "\n".join(str(v) for v in vecs[:5])
    return await answer(question, context)

runtime = Runtime(
    max_cpu_workers=8,
    max_gpu_workers=2,
    max_concurrent_remote_calls=10,
    tokens_per_minute_limit=80_000,
)

result = await runtime.run(rag_pipeline, docs, question)
await runtime.aclose()

# Or stream tokens from an LLM task:
@ai_task(kind="llm_remote", model="gpt-4.1-mini", stream=True)
async def stream_answer(question: str, context: str):
    async for chunk in client.stream_chat(...):
        yield chunk

@pipeline
async def streaming_pipeline(docs: list[str], question: str):
    chunks = chunk_docs(docs)
    vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
    async for token in stream_answer(question, str(vecs[:5])):
        yield token

async with Runtime() as runtime:
    gen = await runtime.run(streaming_pipeline, docs, "What?")
    async for token in gen:
        print(token, end="", flush=True)

Core concepts

Concept Description
@ai_task Decorates a function with scheduling metadata (kind, device, batchable, stream, etc.)
@pipeline Decorates an async function or async generator as an orchestrated flow
task.map(items) Fan-out a task over a collection with automatic batching
stream=True Tasks that yield tokens bypass the scheduler — callers consume via async for
Runtime Wires the event loop, thread pool, GPU workers, rate limiters, and retry policy

Running tests

# Install with dev dependencies
pip install "castellum[dev]"

# Run all tests
pytest

# Run remote backend tests (requires API endpoint)
export OPENAI_BASE_URL=http://localhost:20128/v1
export OPENAI_API_KEY=sk-...
pytest tests/backends/

License

Unlicense — public domain. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

castellum-0.3.0.tar.gz (74.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

castellum-0.3.0-py3-none-any.whl (19.2 kB view details)

Uploaded Python 3

File details

Details for the file castellum-0.3.0.tar.gz.

File metadata

  • Download URL: castellum-0.3.0.tar.gz
  • Upload date:
  • Size: 74.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for castellum-0.3.0.tar.gz
Algorithm Hash digest
SHA256 23a10b51e6efc3a4a5d3640b89e66519d8db026ff7c7d5317d0377e5faaafe56
MD5 781e2852dd60893b78545d33ddcb28b8
BLAKE2b-256 27f884bb6cee0f842ec2352a36463efe642500bc26d86eb32b10da4b922980fa

See more details on using hashes here.

File details

Details for the file castellum-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: castellum-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 19.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for castellum-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ab6aa00327d0f4be406d73c4827bf0c316a766025444ad4723138020584926ae
MD5 b3d3ebccf0de3360fc4a2ef26142aea2
BLAKE2b-256 7dc0f213f629d515cc36ce3c56eb721d6a076ccaa5fa131c4ee06d2b9fdfeff6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page