Skip to main content

AI pipeline orchestration with automatic async I/O, CPU, and GPU management

Project description

castellum

AI pipeline orchestration with automatic async I/O, CPU, and GPU management.

Install

pip install castellum
# With OpenAI support:
pip install "castellum[openai]"
# With Anthropic support:
pip install "castellum[anthropic]"
# Everything:
pip install "castellum[all]"

Quick start

from castellum import ai_task, pipeline, Runtime
from castellum.backends.remote.openai import OpenAIClient

client = OpenAIClient()

@ai_task(kind="preprocess")
def chunk_docs(docs: list[str]) -> list[str]:
    return [c for doc in docs for c in doc.split("\n\n")]

@ai_task(kind="embedding", device="cuda", batchable=True, max_batch_size=128)
def embed_chunks(chunks: list[str]) -> list[list[float]]:
    return local_embedder.encode(chunks)

@ai_task(kind="llm_remote", model="gpt-4.1-mini")
async def answer(question: str, context: str) -> str:
    return await client.chat(
        model="gpt-4.1-mini",
        messages=[{"role": "user", "content": f"{context}\n\n{question}"}],
    )

@pipeline
async def rag_pipeline(docs: list[str], question: str) -> str:
    chunks = chunk_docs(docs)
    vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
    context = "\n".join(str(v) for v in vecs[:5])
    return await answer(question, context)

runtime = Runtime(
    max_cpu_workers=8,
    max_gpu_workers=2,
    max_concurrent_remote_calls=10,
    tokens_per_minute_limit=80_000,
)

result = await runtime.run(rag_pipeline, docs, question)
await runtime.aclose()

# Or stream tokens from an LLM task:
@ai_task(kind="llm_remote", model="gpt-4.1-mini", stream=True)
async def stream_answer(question: str, context: str):
    async for chunk in client.stream_chat(...):
        yield chunk

@pipeline
async def streaming_pipeline(docs: list[str], question: str):
    chunks = chunk_docs(docs)
    vecs = await embed_chunks.map(chunks, preferred_batch_size=64)
    async for token in stream_answer(question, str(vecs[:5])):
        yield token

async with Runtime() as runtime:
    gen = await runtime.run(streaming_pipeline, docs, "What?")
    async for token in gen:
        print(token, end="", flush=True)

Core concepts

Concept Description
@ai_task Decorates a function with scheduling metadata (kind, device, batchable, stream, etc.)
@pipeline Decorates an async function or async generator as an orchestrated flow
task.map(items) Fan-out a task over a collection with automatic batching
stream=True Tasks that yield tokens bypass the scheduler — callers consume via async for
Runtime Wires the event loop, thread pool, GPU workers, rate limiters, and retry policy

Running tests

# Install with dev dependencies
pip install "castellum[dev]"

# Run all tests
pytest

# Run remote backend tests (requires API endpoint)
export OPENAI_BASE_URL=http://localhost:20128/v1
export OPENAI_API_KEY=sk-...
pytest tests/backends/

License

Unlicense — public domain. See LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

castellum-0.2.1.tar.gz (74.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

castellum-0.2.1-py3-none-any.whl (18.2 kB view details)

Uploaded Python 3

File details

Details for the file castellum-0.2.1.tar.gz.

File metadata

  • Download URL: castellum-0.2.1.tar.gz
  • Upload date:
  • Size: 74.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for castellum-0.2.1.tar.gz
Algorithm Hash digest
SHA256 87b287c08d4d53b48366ba69f6dc5d1d52e0af4369ed72e21a1dce61372e43a9
MD5 1a5c649519434441591d511b6495ece8
BLAKE2b-256 7ec4ccc12e45ff245cf83e6191b3e2c98a101b9f91e3ae3db3717ffb7afe6e31

See more details on using hashes here.

File details

Details for the file castellum-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: castellum-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 18.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.17 {"installer":{"name":"uv","version":"0.11.17","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for castellum-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 59e0ec8d6d9eebd9c2b7007b9eea98a25e51be2364775a1a303b07ac5e744ce8
MD5 70ecc9115b3109d04f688500f81eeb90
BLAKE2b-256 ae5993adcdc739bda1408b232006a052f6b9dbed8a12df2c0483247b301817fd

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page