A trivial set of API bindings for AI models, because I'd like them to be easy to use.
Project description
TrivialAI
(A set of trivial bindings for AI models)
Install
pip install trivialai
# Optional: HTTP/2 for OpenAI/Anthropic
# pip install "trivialai[http2]"
# Optional: AWS Bedrock support (via boto3)
# pip install "trivialai[bedrock]"
Requirements
- Python ≥ 3.10 (the codebase uses
X | Ytype unions). - Uses httpx for HTTP-based providers and boto3 for Bedrock.
Quick start
>>> from trivialai import claude, gcp, ollama, chatgpt, bedrock
Synchronous usage
Ollama
>>> client = ollama.Ollama("gemma2:2b", "http://localhost:11434/")
# or ollama.Ollama("deepseek-coder-v2:latest", "http://localhost:11434/")
# or ollama.Ollama("mannix/llama3.1-8b-abliterated:latest", "http://localhost:11434/")
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hi there—platypus!"
>>> client.generate_json("sys msg", "Return {'name': 'Platypus'} as JSON").content
{'name': 'Platypus'}
Claude (Anthropic API)
>>> client = claude.Claude("claude-3-5-sonnet-20240620", os.environ["ANTHROPIC_API_KEY"])
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"
GCP (Vertex AI)
>>> client = gcp.GCP("gemini-1.5-flash-001", "/path/to/gcp_creds.json", "us-central1")
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"
ChatGPT (OpenAI API)
>>> client = chatgpt.ChatGPT("gpt-4o-mini", os.environ["OPENAI_API_KEY"])
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"
AWS Bedrock (Claude / Llama / Nova / etc)
Bedrock support is provided via the Bedrock client, which implements the same LLMMixin interface as the others.
1) One-time AWS setup
- Enable Bedrock + model access in a Bedrock-supported region.
- Ensure your IAM user/role can call Bedrock runtime APIs (
bedrock:Converse*,bedrock:InvokeModel*, etc). - Provide credentials via the normal AWS credential chain (
aws configure, env vars, instance role) or explicit keys.
2) Choosing the right model_id
Bedrock distinguishes between:
- Foundation model IDs, like:
anthropic.claude-3-5-sonnet-20241022-v2:0 - Inference profile IDs, which are region-prefixed, like:
us.anthropic.claude-3-5-sonnet-20241022-v2:0
Some models/regions require using the inference profile ID. If you see a validation error about on-demand throughput, switch to the region-prefixed ID.
3) Minimal Bedrock demo
from trivialai import bedrock
client = bedrock.Bedrock(
model_id="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
region="us-east-1",
)
res = client.generate(
"This is a test message. Make sure your reply contains the word 'margarine'",
"Hello there! Can you hear me?"
)
print(res.content)
res_json = client.generate_json(
"You are a JSON-only assistant.",
"Return {'name':'Platypus'} as JSON."
)
print(res_json.content)
Streaming (NDJSON-style events) via BiStream
All providers expose a common streaming shape via stream(...).
Important: stream(...) (and helpers like stream_checked(...) / stream_json(...)) return a BiStream, which supports both:
- sync iteration (
for ev in ...) - async iteration (
async for ev in ...)
You usually don’t need to call provider-specific astream(...) anymore.
Event schema
A streaming LLM yields NDJSON-style events:
-
{"type":"start", "provider":"<ollama|openai|anthropic|gcp|bedrock>", "model":"..."} -
{"type":"delta", "text":"...", "scratchpad":"..."}- For Ollama,
scratchpadmay contain model “thinking” extracted from<think>…</think>. - For other providers,
scratchpadis typically""in deltas.
- For Ollama,
-
{"type":"end", "content":"...", "scratchpad": <str|None>, "tokens": <int|None>} -
{"type":"error", "message":"..."}
On top of that, stream_checked(...) / stream_json(...) append a final parse event:
{"type":"final", "ok": true|false, "parsed": ..., "error": ..., "raw": ...}
Example: streaming (sync)
client = ollama.Ollama("gemma2:2b", "http://localhost:11434/")
for ev in client.stream("sys", "Explain, think step-by-step."):
if ev["type"] == "delta":
print(ev["text"], end="")
elif ev["type"] == "end":
print("\n-- scratchpad --")
print(ev["scratchpad"])
Example: streaming + parse-at-end
from trivialai.util import loadch
for ev in client.stream_checked(loadch, "sys", "Return a JSON object gradually."):
if ev["type"] in {"start", "delta", "end"}:
# UI updates
pass
elif ev["type"] == "final":
print("Parsed JSON:", ev["parsed"])
Shortcut:
for ev in client.stream_json("sys", "Return {'name':'Platypus'} as JSON."):
if ev["type"] == "final":
print("Parsed:", ev["parsed"])
Example: streaming (async)
async for ev in client.stream("sys", "Stream something."):
...
BiStream: one stream interface for sync + async
from trivialai.bistream import BiStream
What it wraps
BiStream[T] can wrap:
- a sync
Iterable[T](generator/list/range/…) - an async
AsyncIterable[T](async generator/…) - another
BiStream[T]
…and exposes both iterator interfaces.
Key behavior (new / important)
-
Single-consumer: it’s a stream, not a list. Once consumed, it’s exhausted.
-
Mode-locked: a given instance may be consumed either sync or async. If you start consuming it sync, you can’t later consume the same instance async (and vice versa). This prevents subtle “half-sync / half-async” bugs.
-
Bridging behavior:
- async → sync: driven by a dedicated background event loop thread (used only for bridging).
- sync → async: an async wrapper calls
next()inside the event loop thread; if anext()blocks, the loop is blocked andBiStreamwill log a warning once.
Construction notes
BiStream.ensure(x)returnsxunchanged if it’s already aBiStream.BiStream(other_bistream)shares the same underlying iterators, so consumption progress is shared.
Chaining streams with then / branch / “merge”
TrivialAI uses a small set of mode-preserving combinators to build pipelines without caring whether you’re in sync or async code.
then(...): append a follow-up stream after upstream terminates
then is termination-driven (not event-driven):
- it yields all upstream events unchanged
- when upstream ends, it calls your function exactly once:
fn(done) - it then yields events from the returned follow-up stream (if any)
done is the upstream iterator’s return value if the underlying generator explicitly returns something; otherwise it’s usually None.
Pseudocode example: append a “summary” stage after streaming completes
base = client.stream("sys", "Answer, streaming.")
pipeline = base.then(lambda done: [
{"type": "note", "text": "stream ended"},
{"type": "done", "ok": True},
])
for ev in pipeline:
handle(ev)
Pseudocode example: append parsing/validation stage
def parse_after_end(_done):
# returns a stream (iterable) of extra events
yield {"type": "final", "ok": True, "parsed": compute_structured_result()}
pipeline = client.stream("sys", "Return JSON gradually.").then(parse_after_end)
branch(...): fan-out work over items, then fan-in via .sequence() or .interleave()
There are two entry points:
-
Free function:
bistream.branch(src_items, mk_stream)- Takes an items stream and produces a
FanOut(a “stream of branch streams”).
- Takes an items stream and produces a
-
Method:
BiStream.branch(items, per_item, ...)- “Gated” fan-out: it treats
selfas a prefix that is drained first, then it fans outitems. - This is useful when you want “do A, then spawn B tasks”.
- “Gated” fan-out: it treats
A FanOut is not an event stream yet — it must be fanned back in:
.sequence()— run branches one-by-one, preserving order..interleave(concurrency=...)— run branches concurrently and merge events as they arrive.
Pseudocode: fan out per-document work and merge
base = client.stream("sys", "Start by describing the plan.")
docs = ["doc1", "doc2", "doc3"]
def per_doc(doc):
# return a stream for each doc (sync or async)
return client.stream("sys", f"Summarize: {doc}")
# gated fan-out: stream base first, then start doc work
fan = base.branch(docs, per_doc)
# merge strategy 1: sequential (stable ordering)
merged = fan.sequence()
for ev in merged:
handle(ev)
Pseudocode: concurrent merge with interleave
fan = base.branch(docs, per_doc)
# merge strategy 2: interleaved (best throughput / earliest partials)
merged = fan.interleave(concurrency=8)
async for ev in merged:
handle(ev)
Notes:
interleave(...)is implemented as an async merge internally. If you iterate it synchronously,BiStreamwill bridge it for you via the background loop.- If any branch raises,
interleavecancels the other branches and re-raises.
“Merge” in this library
There isn’t a separate merge(...) function: the fan-in step is the merge:
fan.sequence()== merge sequentiallyfan.interleave(...)== merge concurrently / interleaved
Extra helpers you’ll see in pipelines
tap(...): side effects without changing events
stream = client.stream("sys", "Stream.").tap(lambda ev: log(ev))
Optional filters:
focus(ev) -> bool: only tap matching eventsignore(ev) -> bool: tap everything except matching events
repeat_until(...): loop a stream-producing step with an event-based stop
Useful for “agent loops” that keep running steps until a “final”/“conclusion”/etc appears.
from trivialai.bistream import repeat_until, isType
looped = repeat_until(
src=client.stream("sys", "First attempt..."),
step=lambda driver: client.stream("sys", f"Next attempt, based on {driver}..."),
stop=isType("final"),
max_iters=10,
)
repeat_until best-effort closes underlying iterators on early exit and on exceptions/consumer abort.
Embeddings
from trivialai.embedding import OllamaEmbedder
embed = OllamaEmbedder(model="nomic-embed-text", server="http://localhost:11434")
vec = embed("hello world")
Notes & compatibility
-
Dependencies:
httpxreplacesrequests. Usehttpx[http2]if you want HTTP/2 for OpenAI/Anthropic. Useboto3for AWS Bedrock. -
Scratchpad:
- Ollama may surface
<think>content asscratchpaddeltas and a final scratchpad string. - Other providers usually emit
scratchpad=""in deltas andNonein the finalend.
- Ollama may surface
-
GCP/Vertex AI: streaming may fall back to a single final chunk unless a native streaming provider implementation is present.
-
BiStream: single-use and single-consumer by design — don’t try to consume the same instance concurrently from multiple tasks.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file trivialai-0.5.23.tar.gz.
File metadata
- Download URL: trivialai-0.5.23.tar.gz
- Upload date:
- Size: 67.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
beebb8f80bcb3c59b1f6a569301d642e89563c02ba2ffc40432920bb3c42e46e
|
|
| MD5 |
58738f701dd0b030ff7b5557ea34d7f8
|
|
| BLAKE2b-256 |
3ce6b101af8e2ac14a7ad992411a265a9916d7f94ba311f2e492254119f651be
|
File details
Details for the file trivialai-0.5.23-py3-none-any.whl.
File metadata
- Download URL: trivialai-0.5.23-py3-none-any.whl
- Upload date:
- Size: 57.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.9.25
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
edac468b5e254a8c6f342c3dea9ed2ccb9540b0701729512f363a06c398eb2ee
|
|
| MD5 |
c29e8e81e328f16be21c8505cf360900
|
|
| BLAKE2b-256 |
4aef8cb50e9ca1648ea382d74dfa51137348ec6bc3898cc444a08b715dafd98e
|