Skip to main content

A trivial set of API bindings for AI models, because I'd like them to be easy to use.

Project description

TrivialAI

(A set of trivial bindings for AI models)

Install

pip install trivialai
# Optional: HTTP/2 for OpenAI/Anthropic
# pip install "trivialai[http2]"
# Optional: AWS Bedrock support (via boto3)
# pip install "trivialai[bedrock]"

Requirements

  • Python ≥ 3.10 (the codebase uses X | Y type unions).
  • Uses httpx for HTTP-based providers and boto3 for Bedrock.

Quick start

>>> from trivialai import claude, gcp, ollama, chatgpt, bedrock

Synchronous usage

Ollama

>>> client = ollama.Ollama("gemma2:2b", "http://localhost:11434/")
# or ollama.Ollama("deepseek-coder-v2:latest", "http://localhost:11434/")
# or ollama.Ollama("mannix/llama3.1-8b-abliterated:latest", "http://localhost:11434/")
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hi there—platypus!"
>>> client.generate_json("sys msg", "Return {'name': 'Platypus'} as JSON").content
{'name': 'Platypus'}

Claude (Anthropic API)

>>> client = claude.Claude("claude-3-5-sonnet-20240620", os.environ["ANTHROPIC_API_KEY"])
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"

GCP (Vertex AI)

>>> client = gcp.GCP("gemini-1.5-flash-001", "/path/to/gcp_creds.json", "us-central1")
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"

ChatGPT (OpenAI API)

>>> client = chatgpt.ChatGPT("gpt-4o-mini", os.environ["OPENAI_API_KEY"])
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"

AWS Bedrock (Claude / Llama / Nova / etc)

Bedrock support is provided via the Bedrock client, which implements the same LLMMixin interface as the others.

1) One-time AWS setup

  1. Enable Bedrock + model access in a Bedrock-supported region.
  2. Ensure your IAM user/role can call Bedrock runtime APIs (bedrock:Converse*, bedrock:InvokeModel*, etc).
  3. Provide credentials via the normal AWS credential chain (aws configure, env vars, instance role) or explicit keys.

2) Choosing the right model_id

Bedrock distinguishes between:

  • Foundation model IDs, like: anthropic.claude-3-5-sonnet-20241022-v2:0
  • Inference profile IDs, which are region-prefixed, like: us.anthropic.claude-3-5-sonnet-20241022-v2:0

Some models/regions require using the inference profile ID. If you see a validation error about on-demand throughput, switch to the region-prefixed ID.

3) Minimal Bedrock demo

from trivialai import bedrock

client = bedrock.Bedrock(
    model_id="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
    region="us-east-1",
)

res = client.generate(
    "This is a test message. Make sure your reply contains the word 'margarine'",
    "Hello there! Can you hear me?"
)
print(res.content)

res_json = client.generate_json(
    "You are a JSON-only assistant.",
    "Return {'name':'Platypus'} as JSON."
)
print(res_json.content)

Streaming (NDJSON-style events) via BiStream

All providers expose a common streaming shape via stream(...).

Important: stream(...) (and helpers like stream_checked(...) / stream_json(...)) return a BiStream, which supports both:

  • sync iteration (for ev in ...)
  • async iteration (async for ev in ...)

You usually don’t need to call provider-specific astream(...) anymore.

Event schema

A streaming LLM yields NDJSON-style events:

  • {"type":"start", "provider":"<ollama|openai|anthropic|gcp|bedrock>", "model":"..."}

  • {"type":"delta", "text":"...", "scratchpad":"..."}

    • For Ollama, scratchpad may contain model “thinking” extracted from <think>…</think>.
    • For other providers, scratchpad is typically "" in deltas.
  • {"type":"end", "content":"...", "scratchpad": <str|None>, "tokens": <int|None>}

  • {"type":"error", "message":"..."}

On top of that, stream_checked(...) / stream_json(...) append a final parse event:

  • {"type":"final", "ok": true|false, "parsed": ..., "error": ..., "raw": ...}

Example: streaming (sync)

client = ollama.Ollama("gemma2:2b", "http://localhost:11434/")

for ev in client.stream("sys", "Explain, think step-by-step."):
    if ev["type"] == "delta":
        print(ev["text"], end="")
    elif ev["type"] == "end":
        print("\n-- scratchpad --")
        print(ev["scratchpad"])

Example: streaming + parse-at-end

from trivialai.util import loadch

for ev in client.stream_checked(loadch, "sys", "Return a JSON object gradually."):
    if ev["type"] in {"start", "delta", "end"}:
        # UI updates
        pass
    elif ev["type"] == "final":
        print("Parsed JSON:", ev["parsed"])

Shortcut:

for ev in client.stream_json("sys", "Return {'name':'Platypus'} as JSON."):
    if ev["type"] == "final":
        print("Parsed:", ev["parsed"])

Example: streaming (async)

async for ev in client.stream("sys", "Stream something."):
    ...

BiStream: one stream interface for sync + async

from trivialai.bistream import BiStream

What it wraps

BiStream[T] can wrap:

  • a sync Iterable[T] (generator/list/range/…)
  • an async AsyncIterable[T] (async generator/…)
  • another BiStream[T]

…and exposes both iterator interfaces.

Key behavior (important)

  • Single-consumer: it’s a stream, not a list. Once consumed, it’s exhausted.

  • Mode-locked: a given instance may be consumed either sync or async. If you start consuming it sync, you can’t later consume the same instance async (and vice versa). This prevents subtle “half-sync / half-async” bugs.

  • Bridging behavior:

    • async → sync: driven by a dedicated background event loop thread (used only for bridging).
    • sync → async: an async wrapper calls next() inside the event loop thread; if a next() blocks, the loop is blocked and BiStream will log a warning once.

Construction notes

  • BiStream.ensure(x) returns x unchanged if it’s already a BiStream.
  • BiStream(other_bistream) shares the same underlying iterators, so consumption progress is shared.

Chaining streams with then / map / mapcat / branch

TrivialAI uses a small set of mode-preserving combinators to build pipelines without caring whether you’re in sync or async code.

then(...): append a follow-up stage after upstream terminates

then is termination-driven (not event-driven):

  • yields all upstream events unchanged
  • when upstream ends, it calls your follow-up exactly once
  • yields all events from the returned follow-up stream (if any)

New behavior: your follow-up can be either:

  1. 0-arg: then(lambda: stream)
  2. 1-arg: then(lambda done: stream)

done is:

  • sync: StopIteration.value if the generator returns a value (else None)
  • async: first StopAsyncIteration arg if present (else None)

Pseudocode: append a constant postlude

base = client.stream("sys", "Answer, streaming.")

pipeline = base.then(lambda: [
    {"type": "note", "text": "stream ended"},
    {"type": "done", "ok": True},
])

for ev in pipeline:
    handle(ev)

Pseudocode: use done when you have it

def gen():
    yield {"type": "delta", "text": "hi"}
    return {"tokens": 123}

pipeline = BiStream(gen()).then(lambda done: [{"type": "stats", "done": done}])
# yields: delta, then stats

Pattern: parse/validate after end

def parse_after_end(_done):
    yield {"type": "final", "ok": True, "parsed": compute_structured_result()}

pipeline = client.stream("sys", "Return JSON gradually.").then(parse_after_end)

map(...): transform each event

map is the standard per-event transformation:

# prefix all delta text with ">> "
pipeline = client.stream("sys", "Stream.").map(
    lambda ev: (ev | {"text": ">> " + ev["text"]}) if ev.get("type") == "delta" else ev
)

This stays mode-preserving: sync in → sync out, async in → async out.


mapcat(...): per-item stream expansion (flatMap), with optional concurrency

mapcat lets you turn each event/item into an entire stream and flatten the result.

  • mapcat(fn) defaults to sequential flattening (like sequence()).
  • mapcat(fn, concurrency=N) flattens by interleaving up to N active branches.

Pseudocode: expand “files” into per-file agent streams (sequential)

files = BiStream(["a.py", "b.py", "c.py"])

def per_file(path):
    return agent.streamed(f"Analyze {path}")

events = files.mapcat(per_file)  # sequential
for ev in events:
    handle(ev)

Pseudocode: concurrent interleaving (async-friendly)

files = BiStream(["a.py", "b.py", "c.py"])

def per_file(path):
    return agent.streamed(f"Analyze {path}")  # may be async stream

events = files.mapcat(per_file, concurrency=8)  # interleaved merge
async for ev in events:
    handle(ev)

Notes:

  • mapcat(..., concurrency>0) uses FanOut.interleave(...) internally.
  • If you consume the result synchronously, it will be bridged via the background loop (same as any async BiStream).

branch(...): fan-out, then fan-in via .sequence() / .interleave()

There are two entry points:

  • Free function: bistream.branch(src_items, mk_stream) → returns FanOut
  • Method: BiStream.branch(items, per_item, ...) → “gated” fan-out (drain prefix first)

A FanOut is not an event stream yet — it must be fanned back in:

  • .sequence() — run branches one-by-one, preserving order
  • .interleave(concurrency=...) — run branches concurrently and merge events as they arrive

Pseudocode: gated fan-out

base = client.stream("sys", "First: describe the plan.")
docs = ["doc1", "doc2", "doc3"]

def per_doc(doc):
    return client.stream("sys", f"Summarize: {doc}")

fan = base.branch(docs, per_doc)     # base is the prefix
merged = fan.interleave(concurrency=8)

for ev in merged:
    handle(ev)

Extra helpers you’ll see in pipelines

tap(...): side effects without changing events

stream = client.stream("sys", "Stream.").tap(lambda ev: log(ev))

Optional filters:

  • focus(ev) -> bool: only tap matching events
  • ignore(ev) -> bool: tap everything except matching events

repeat_until(...): loop a stream-producing step with an event-based stop

Useful for “agent loops” that keep running steps until a “final”/“conclusion”/etc appears.

from trivialai.bistream import repeat_until, is_type

looped = repeat_until(
    src=client.stream("sys", "First attempt..."),
    step=lambda driver: client.stream("sys", f"Next attempt, based on {driver}..."),
    stop=is_type("final"),
    max_iters=10,
)

repeat_until best-effort closes underlying iterators on early exit and on exceptions/consumer abort.


Embeddings

from trivialai.embedding import OllamaEmbedder

embed = OllamaEmbedder(model="nomic-embed-text", server="http://localhost:11434")
vec = embed("hello world")

Notes & compatibility

  • Dependencies: httpx replaces requests. Use httpx[http2] if you want HTTP/2 for OpenAI/Anthropic. Use boto3 for AWS Bedrock.

  • Scratchpad:

    • Ollama may surface <think> content as scratchpad deltas and a final scratchpad string.
    • Other providers usually emit scratchpad="" in deltas and None in the final end.
  • GCP/Vertex AI: streaming may fall back to a single final chunk unless a native streaming provider implementation is present.

  • BiStream: single-use and single-consumer by design — don’t try to consume the same instance concurrently from multiple tasks.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trivialai-0.5.25.tar.gz (69.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trivialai-0.5.25-py3-none-any.whl (58.9 kB view details)

Uploaded Python 3

File details

Details for the file trivialai-0.5.25.tar.gz.

File metadata

  • Download URL: trivialai-0.5.25.tar.gz
  • Upload date:
  • Size: 69.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for trivialai-0.5.25.tar.gz
Algorithm Hash digest
SHA256 1e26d43a21f68fc280ca8847a607e995a881b6918d4c893237f64d5635ca5ca3
MD5 1f17656b128d59b5728e424bcaddb674
BLAKE2b-256 5959796fdd6b0a33cad5336004271601fd3db5d76b4f5b2c00aed8d5f985b73c

See more details on using hashes here.

File details

Details for the file trivialai-0.5.25-py3-none-any.whl.

File metadata

  • Download URL: trivialai-0.5.25-py3-none-any.whl
  • Upload date:
  • Size: 58.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for trivialai-0.5.25-py3-none-any.whl
Algorithm Hash digest
SHA256 a47f1d511a83a644c079a2afcbbf56a1b84068cd29652ae1f5807454d56217e2
MD5 26bc3ec5bc410100170442a2be8f7e87
BLAKE2b-256 f431f1554124e861b4d089b7513f57088a6f798cffb67f27bb1d9aded0d9e4eb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page