Skip to main content

A trivial set of API bindings for AI models, because I'd like them to be easy to use.

Project description

TrivialAI

(A set of trivial bindings for AI models)

Install

pip install trivialai
# Optional: HTTP/2 for OpenAI/Anthropic
# pip install "trivialai[http2]"
# Optional: AWS Bedrock support (via boto3)
# pip install "trivialai[bedrock]"

Requirements

  • Python ≥ 3.10 (the codebase uses X | Y type unions).
  • Uses httpx for HTTP-based providers and boto3 for Bedrock.

Quick start

>>> from trivialai import claude, gcp, ollama, chatgpt, bedrock

Synchronous usage

Ollama

>>> client = ollama.Ollama("gemma2:2b", "http://localhost:11434/")
# or ollama.Ollama("deepseek-coder-v2:latest", "http://localhost:11434/")
# or ollama.Ollama("mannix/llama3.1-8b-abliterated:latest", "http://localhost:11434/")
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hi there—platypus!"
>>> client.generate_json("sys msg", "Return {'name': 'Platypus'} as JSON").content
{'name': 'Platypus'}

Claude (Anthropic API)

>>> client = claude.Claude("claude-3-5-sonnet-20240620", os.environ["ANTHROPIC_API_KEY"])
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"

GCP (Vertex AI)

>>> client = gcp.GCP("gemini-1.5-flash-001", "/path/to/gcp_creds.json", "us-central1")
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"

ChatGPT (OpenAI API)

>>> client = chatgpt.ChatGPT("gpt-4o-mini", os.environ["OPENAI_API_KEY"])
>>> client.generate("sys msg", "Say hi with 'platypus'.").content
"Hello, platypus!"

AWS Bedrock (Claude / Llama / Nova / etc)

Bedrock support is provided via the Bedrock client, which implements the same LLMMixin interface as the others.

1) One-time AWS setup

  1. Enable Bedrock + model access in a Bedrock-supported region.
  2. Ensure your IAM user/role can call Bedrock runtime APIs (bedrock:Converse*, bedrock:InvokeModel*, etc).
  3. Provide credentials via the normal AWS credential chain (aws configure, env vars, instance role) or explicit keys.

2) Choosing the right model_id

Bedrock distinguishes between:

  • Foundation model IDs, like: anthropic.claude-3-5-sonnet-20241022-v2:0
  • Inference profile IDs, which are region-prefixed, like: us.anthropic.claude-3-5-sonnet-20241022-v2:0

Some models/regions require using the inference profile ID. If you see a validation error about on-demand throughput, switch to the region-prefixed ID.

3) Minimal Bedrock demo

from trivialai import bedrock

client = bedrock.Bedrock(
    model_id="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
    region="us-east-1",
)

res = client.generate(
    "This is a test message. Make sure your reply contains the word 'margarine'",
    "Hello there! Can you hear me?"
)
print(res.content)

res_json = client.generate_json(
    "You are a JSON-only assistant.",
    "Return {'name':'Platypus'} as JSON."
)
print(res_json.content)

Streaming (NDJSON-style events) via BiStream

All providers expose a common streaming shape via stream(...).

Important: stream(...) (and helpers like stream_checked(...) / stream_json(...)) return a BiStream, which supports both:

  • sync iteration (for ev in ...)
  • async iteration (async for ev in ...)

You usually don’t need to call provider-specific astream(...) anymore.

Event schema

A streaming LLM yields NDJSON-style events:

  • {"type":"start", "provider":"<ollama|openai|anthropic|gcp|bedrock>", "model":"..."}

  • {"type":"delta", "text":"...", "scratchpad":"..."}

    • For Ollama, scratchpad may contain model “thinking” extracted from <think>…</think>.
    • For other providers, scratchpad is typically "" in deltas.
  • {"type":"end", "content":"...", "scratchpad": <str|None>, "tokens": <int|None>}

  • {"type":"error", "message":"..."}

On top of that, stream_checked(...) / stream_json(...) append a final parse event:

  • {"type":"final", "ok": true|false, "parsed": ..., "error": ..., "raw": ...}

Example: streaming (sync)

client = ollama.Ollama("gemma2:2b", "http://localhost:11434/")

for ev in client.stream("sys", "Explain, think step-by-step."):
    if ev["type"] == "delta":
        print(ev["text"], end="")
    elif ev["type"] == "end":
        print("\n-- scratchpad --")
        print(ev["scratchpad"])

Example: streaming + parse-at-end

from trivialai.util import loadch

for ev in client.stream_checked(loadch, "sys", "Return a JSON object gradually."):
    if ev["type"] in {"start", "delta", "end"}:
        # UI updates
        pass
    elif ev["type"] == "final":
        print("Parsed JSON:", ev["parsed"])

Shortcut:

for ev in client.stream_json("sys", "Return {'name':'Platypus'} as JSON."):
    if ev["type"] == "final":
        print("Parsed:", ev["parsed"])

Example: streaming (async)

async for ev in client.stream("sys", "Stream something."):
    ...

BiStream: one stream interface for sync + async

from trivialai.bistream import BiStream

What it wraps

BiStream[T] can wrap:

  • a sync Iterable[T] (generator/list/range/…)
  • an async AsyncIterable[T] (async generator/…)
  • another BiStream[T]

…and exposes both iterator interfaces.

Key behavior (new / important)

  • Single-consumer: it’s a stream, not a list. Once consumed, it’s exhausted.

  • Mode-locked: a given instance may be consumed either sync or async. If you start consuming it sync, you can’t later consume the same instance async (and vice versa). This prevents subtle “half-sync / half-async” bugs.

  • Bridging behavior:

    • async → sync: driven by a dedicated background event loop thread (used only for bridging).
    • sync → async: an async wrapper calls next() inside the event loop thread; if a next() blocks, the loop is blocked and BiStream will log a warning once.

Construction notes

  • BiStream.ensure(x) returns x unchanged if it’s already a BiStream.
  • BiStream(other_bistream) shares the same underlying iterators, so consumption progress is shared.

Chaining streams with then / branch / “merge”

TrivialAI uses a small set of mode-preserving combinators to build pipelines without caring whether you’re in sync or async code.

then(...): append a follow-up stream after upstream terminates

then is termination-driven (not event-driven):

  • it yields all upstream events unchanged
  • when upstream ends, it calls your function exactly once: fn(done)
  • it then yields events from the returned follow-up stream (if any)

done is the upstream iterator’s return value if the underlying generator explicitly returns something; otherwise it’s usually None.

Pseudocode example: append a “summary” stage after streaming completes

base = client.stream("sys", "Answer, streaming.")

pipeline = base.then(lambda done: [
    {"type": "note", "text": "stream ended"},
    {"type": "done", "ok": True},
])

for ev in pipeline:
    handle(ev)

Pseudocode example: append parsing/validation stage

def parse_after_end(_done):
    # returns a stream (iterable) of extra events
    yield {"type": "final", "ok": True, "parsed": compute_structured_result()}

pipeline = client.stream("sys", "Return JSON gradually.").then(parse_after_end)

branch(...): fan-out work over items, then fan-in via .sequence() or .interleave()

There are two entry points:

  • Free function: bistream.branch(src_items, mk_stream)

    • Takes an items stream and produces a FanOut (a “stream of branch streams”).
  • Method: BiStream.branch(items, per_item, ...)

    • “Gated” fan-out: it treats self as a prefix that is drained first, then it fans out items.
    • This is useful when you want “do A, then spawn B tasks”.

A FanOut is not an event stream yet — it must be fanned back in:

  • .sequence() — run branches one-by-one, preserving order.
  • .interleave(concurrency=...) — run branches concurrently and merge events as they arrive.

Pseudocode: fan out per-document work and merge

base = client.stream("sys", "Start by describing the plan.")
docs = ["doc1", "doc2", "doc3"]

def per_doc(doc):
    # return a stream for each doc (sync or async)
    return client.stream("sys", f"Summarize: {doc}")

# gated fan-out: stream base first, then start doc work
fan = base.branch(docs, per_doc)

# merge strategy 1: sequential (stable ordering)
merged = fan.sequence()

for ev in merged:
    handle(ev)

Pseudocode: concurrent merge with interleave

fan = base.branch(docs, per_doc)

# merge strategy 2: interleaved (best throughput / earliest partials)
merged = fan.interleave(concurrency=8)

async for ev in merged:
    handle(ev)

Notes:

  • interleave(...) is implemented as an async merge internally. If you iterate it synchronously, BiStream will bridge it for you via the background loop.
  • If any branch raises, interleave cancels the other branches and re-raises.

“Merge” in this library

There isn’t a separate merge(...) function: the fan-in step is the merge:

  • fan.sequence() == merge sequentially
  • fan.interleave(...) == merge concurrently / interleaved

Extra helpers you’ll see in pipelines

tap(...): side effects without changing events

stream = client.stream("sys", "Stream.").tap(lambda ev: log(ev))

Optional filters:

  • focus(ev) -> bool: only tap matching events
  • ignore(ev) -> bool: tap everything except matching events

repeat_until(...): loop a stream-producing step with an event-based stop

Useful for “agent loops” that keep running steps until a “final”/“conclusion”/etc appears.

from trivialai.bistream import repeat_until, isType

looped = repeat_until(
    src=client.stream("sys", "First attempt..."),
    step=lambda driver: client.stream("sys", f"Next attempt, based on {driver}..."),
    stop=isType("final"),
    max_iters=10,
)

repeat_until best-effort closes underlying iterators on early exit and on exceptions/consumer abort.


Embeddings

from trivialai.embedding import OllamaEmbedder

embed = OllamaEmbedder(model="nomic-embed-text", server="http://localhost:11434")
vec = embed("hello world")

Notes & compatibility

  • Dependencies: httpx replaces requests. Use httpx[http2] if you want HTTP/2 for OpenAI/Anthropic. Use boto3 for AWS Bedrock.

  • Scratchpad:

    • Ollama may surface <think> content as scratchpad deltas and a final scratchpad string.
    • Other providers usually emit scratchpad="" in deltas and None in the final end.
  • GCP/Vertex AI: streaming may fall back to a single final chunk unless a native streaming provider implementation is present.

  • BiStream: single-use and single-consumer by design — don’t try to consume the same instance concurrently from multiple tasks.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

trivialai-0.5.23.tar.gz (67.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

trivialai-0.5.23-py3-none-any.whl (57.6 kB view details)

Uploaded Python 3

File details

Details for the file trivialai-0.5.23.tar.gz.

File metadata

  • Download URL: trivialai-0.5.23.tar.gz
  • Upload date:
  • Size: 67.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for trivialai-0.5.23.tar.gz
Algorithm Hash digest
SHA256 beebb8f80bcb3c59b1f6a569301d642e89563c02ba2ffc40432920bb3c42e46e
MD5 58738f701dd0b030ff7b5557ea34d7f8
BLAKE2b-256 3ce6b101af8e2ac14a7ad992411a265a9916d7f94ba311f2e492254119f651be

See more details on using hashes here.

File details

Details for the file trivialai-0.5.23-py3-none-any.whl.

File metadata

  • Download URL: trivialai-0.5.23-py3-none-any.whl
  • Upload date:
  • Size: 57.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.25

File hashes

Hashes for trivialai-0.5.23-py3-none-any.whl
Algorithm Hash digest
SHA256 edac468b5e254a8c6f342c3dea9ed2ccb9540b0701729512f363a06c398eb2ee
MD5 c29e8e81e328f16be21c8505cf360900
BLAKE2b-256 4aef8cb50e9ca1648ea382d74dfa51137348ec6bc3898cc444a08b715dafd98e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page