High-performance LLM client with batch processing, caching, and checkpoint recovery
Project description
flexllm
High-Performance LLM Client for Production
Batch processing with checkpoint recovery, response caching, load balancing, and cost tracking
Why flexllm?
Built for production batch processing at scale.
from flexllm import LLMClient
client = LLMClient(base_url="https://api.openai.com/v1", model="gpt-4", api_key="...")
# Process 100k requests with automatic checkpoint recovery
# Interrupted at 50k? Just restart - it continues from 50,001
results = await client.chat_completions_batch(
messages_list,
output_jsonl="results.jsonl", # Progress saved here
show_progress=True,
track_cost=True, # Real-time cost display
)
Scale out across multiple endpoints with zero code change.
from flexllm import LLMClient
# Same LLMClient API, just pass endpoints for multi-node
client = LLMClient(
endpoints=[
{"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50},
{"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20},
{"base_url": "http://gpu3:8000/v1", "model": "qwen"},
],
fallback=True, # Auto-switch on endpoint failure
)
results = await client.chat_completions_batch(messages_list, output_jsonl="results.jsonl")
Features
| Feature | Description |
|---|---|
| Checkpoint Recovery | Batch jobs auto-resume from interruption - process millions of requests safely |
| Multi-Endpoint Pool | Distribute tasks across GPU nodes with shared-queue dynamic balancing and automatic failover |
| Response Caching | Built-in caching with TTL and IPC multi-process sharing |
| Cost Tracking | Real-time cost monitoring with budget control |
| High-Performance Async | Fine-grained concurrency control, QPS limiting, and streaming |
| Multi-Provider | Supports OpenAI-compatible APIs, Gemini, Claude |
| Multimodal Preprocessing | Auto-convert local files/URLs to base64 for image_url, video_url, audio_url, input_audio |
| Agent (Tool-Use Loop) | AgentClient with automatic tool calling, parallel execution, multi-turn chat, and built-in tools (read/write/edit/glob/grep/bash) |
Installation
pip install flexllm
# With all features
pip install flexllm[all]
Claude Code Integration
Enable Claude Code to use flexllm for LLM API calls, batch processing, and more:
flexllm install-skill
After installation, Claude Code gains the ability to use flexllm across all your projects.
Quick Start
Basic Usage
from flexllm import LLMClient
# Recommended: use context manager for proper resource cleanup
async with LLMClient(
model="gpt-4",
base_url="https://api.openai.com/v1",
api_key="your-api-key"
) as client:
# Async call
response = await client.chat_completions([
{"role": "user", "content": "Hello!"}
])
# Sync version (also supports context manager)
with LLMClient(model="gpt-4", base_url="...", api_key="...") as client:
response = client.chat_completions_sync([
{"role": "user", "content": "Hello!"}
])
# Get token usage
result = await client.chat_completions(
messages=[{"role": "user", "content": "Hello!"}],
return_usage=True, # Returns ChatCompletionResult with usage info
)
print(f"Tokens: {result.usage}") # {'prompt_tokens': 10, 'completion_tokens': 5, ...}
Batch Processing with Checkpoint Recovery
Process millions of requests safely. If interrupted, just restart - it continues from where it left off.
messages_list = [
[{"role": "user", "content": f"Question {i}"}]
for i in range(100000)
]
# Interrupted at 50,000? Re-run and it continues from 50,001.
results = await client.chat_completions_batch(
messages_list,
output_jsonl="results.jsonl", # Progress saved here
show_progress=True,
)
Multi-Endpoint Pool
Distribute batch tasks across multiple GPU nodes / API endpoints. Faster endpoints automatically handle more tasks via a shared queue model, with automatic failover and health monitoring.
Single endpoint: pass
model/base_url. Multiple endpoints: passendpoints. SameLLMClient, same API.
from flexllm import LLMClient
client = LLMClient(
endpoints=[
# Each endpoint can have independent rate limits
{"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50, "max_qps": 100},
{"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20, "max_qps": 50},
{"base_url": "http://gpu3:8000/v1", "model": "qwen"},
],
fallback=True, # Auto-switch on endpoint failure
failure_threshold=3, # Mark unhealthy after 3 consecutive failures
recovery_time=60.0, # Try to recover after 60 seconds
)
# Single request — automatic failover across endpoints
result = await client.chat_completions(messages)
# Distributed batch — shared queue, dynamic load balancing, checkpoint recovery
results = await client.chat_completions_batch(
messages_list,
distribute=True,
output_jsonl="results.jsonl",
track_cost=True,
)
# Streaming with failover
async for chunk in client.chat_completions_stream(messages):
print(chunk, end="", flush=True)
Highlights:
- Shared Queue: Faster endpoints automatically pull more tasks — no manual tuning needed
- Automatic Failover: Failed requests retry on healthy endpoints; unhealthy nodes auto-recover
- Per-Endpoint Config: Independent
concurrency_limitandmax_qpsfor each endpoint - Full Feature Support: Checkpoint recovery, caching, cost tracking all work with Pool
Response Caching
from flexllm import LLMClient, ResponseCacheConfig
client = LLMClient(
model="gpt-4",
base_url="https://api.openai.com/v1",
api_key="your-api-key",
cache=ResponseCacheConfig(enabled=True, ttl=3600), # 1 hour TTL
)
# First call: API request (~2s, ~$0.01)
result1 = await client.chat_completions(messages)
# Second call: Cache hit (~0.001s, $0)
result2 = await client.chat_completions(messages)
Cost Tracking
# Track costs during batch processing
results, cost_report = await client.chat_completions_batch(
messages_list,
return_cost_report=True,
)
print(f"Total cost: ${cost_report.total_cost:.4f}")
# Real-time cost display in progress bar
results = await client.chat_completions_batch(
messages_list,
track_cost=True, # Shows 💰 $0.0012 in progress bar
)
Streaming
# Token-by-token streaming
async for chunk in client.chat_completions_stream(messages):
print(chunk, end="", flush=True)
# Batch streaming - process results as they complete
async for result in client.iter_chat_completions_batch(messages_list):
process(result)
Thinking Mode (Reasoning Models)
Unified interface for DeepSeek-R1, Qwen3, Claude extended thinking, Gemini thinking.
result = await client.chat_completions(
messages,
thinking=True, # Enable thinking
return_raw=True,
)
# Unified parsing across all providers
parsed = client.parse_thoughts(result.data)
print("Thinking:", parsed["thought"])
print("Answer:", parsed["answer"])
Multimodal Preprocessing
Automatically convert local file paths and URLs to base64 data URIs. Supports images, videos, and audio — just pass local paths in your messages:
from flexllm.msg_processors import messages_preprocess
messages = [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "/path/to/image.png"}},
{"type": "video_url", "video_url": {"url": "/path/to/video.mp4"}},
{"type": "input_audio", "input_audio": {"data": "/path/to/audio.wav", "format": "wav"}},
{"type": "text", "text": "Describe what you see and hear."},
],
}
]
# All local paths → base64 data URIs (async)
processed = await messages_preprocess(messages)
result = await client.chat_completions(processed)
| Content type | Source field | Output format |
|---|---|---|
image_url |
image_url.url |
data:image/...;base64,… (with resize support) |
video_url |
video_url.url |
data:video/...;base64,… |
audio_url |
audio_url.url |
data:audio/...;base64,… |
input_audio |
input_audio.data |
Raw base64 (no data: prefix, OpenAI format) |
Supported sources: local file paths, file:// URIs, HTTP/HTTPS URLs, existing data: URIs (passthrough).
Claude and Gemini clients automatically convert these to their native formats.
Tool Calls (Function Calling)
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather information",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location"],
},
},
}]
result = await client.chat_completions(
messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
tools=tools,
return_usage=True,
)
if result.tool_calls:
for call in result.tool_calls:
print(f"Call: {call.function['name']}({call.function['arguments']})")
Agent (Tool-Use Loop)
AgentClient wraps LLMClient and handles the tool-calling loop automatically: LLM calls → execute tools → feed results back → repeat until done.
from flexllm import AgentClient, LLMClient
client = LLMClient(model="gpt-4", base_url="...", api_key="...")
agent = AgentClient(
client=client,
system="You are a helpful assistant.",
tools=[{...}], # OpenAI-format tool definitions
tool_executor=my_tool_fn, # (name, arguments_json) -> result
max_rounds=10,
)
# Stateless single task
result = await agent.run("Check the weather in Beijing")
# result.content, result.rounds, result.tool_calls, result.usage
# Stateful multi-turn chat (auto-maintains message history)
r1 = await agent.chat("Hello")
r2 = await agent.chat("Check the weather") # carries r1 context
agent.reset()
# Structured output with Pydantic
from pydantic import BaseModel
class Decision(BaseModel):
action: str
reason: str
result = await agent.run("Analyze this", response_format=Decision)
result.parsed # -> Decision(action="approve", reason="...")
CLI
# Quick ask
flexllm ask "What is Python?"
# Interactive chat
flexllm chat
# Batch processing with cost tracking
flexllm batch input.jsonl -o output.jsonl --track-cost
flexllm batch input.jsonl -o output.jsonl -n 5 # First 5 records only
flexllm batch data.jsonl -o out.jsonl -uf text -sf sys # Custom field names
# Model management
flexllm list # Configured models
flexllm models # Remote available models
flexllm set-model gpt-4 # Set default model
flexllm test # Test connection
flexllm init # Initialize config file
# Serve - wrap LLM as HTTP API (for fine-tuned model deployment)
flexllm serve -m qwen-finetuned -s "You are an assistant"
flexllm serve --thinking true -p 8000 -v # With thinking mode + request logging
# Agent mode with built-in tools
flexllm agent --tools code "读取 main.py 并分析" # Code tools (read/edit/glob/grep/bash)
flexllm agent --tools all "创建并修改文件" # All tools (includes write)
flexllm agent --tools code -v "调试问题" # Verbose mode (show execution details)
flexllm chat --tools code # Interactive multi-turn agent
flexllm agent --tools shell,dtflow "清洗data.jsonl" # Legacy CLI tools
# Utilities
flexllm pricing gpt-4 # Query model pricing
flexllm credits # Check API key balance
flexllm mock # Start mock LLM server for testing
Configuration
Config file location: ~/.flexllm/config.yaml
See flexllm_config.example.yaml for a comprehensive configuration example with all available options, or flexllm_config.quickstart.yaml for a minimal quick-start template.
# Default model
default: "gpt-4"
# Global system prompt (applied to all commands unless overridden)
system: "You are a helpful assistant."
# Global user content template (applied to all user messages unless overridden)
# Use {content} as placeholder for original user content
# user_template: "{content}/detail"
# Model list
models:
- id: gpt-4
name: gpt-4
provider: openai
base_url: https://api.openai.com/v1
api_key: your-api-key
system: "You are a GPT-4 assistant." # Model-specific system prompt (optional)
- id: local-finetuned
name: local-finetuned
provider: openai
base_url: http://localhost:8000/v1
api_key: EMPTY
user_template: "{content}/detail" # Model-specific user template for fine-tuned models (optional)
# Model params: any field beyond meta fields (id/name/provider/base_url/api_key/system/user_template)
# is automatically passed through to the LLM API
max_tokens: 512
temperature: 0.3
- id: local-ollama
name: local-ollama
provider: openai
base_url: http://localhost:11434/v1
api_key: EMPTY
# Batch command config (optional)
batch:
concurrency: 20
cache: true
track_cost: true
system: "You are a batch processing assistant." # Batch-specific system prompt (optional)
# user_template: "[INST]{content}[/INST]" # Batch-specific user template (optional)
Model params priority (higher priority overrides lower):
- CLI argument (e.g.,
-t 0.5,--max-tokens 100) - Batch config (batch command only, e.g.,
batch.temperature) - Model config (e.g.,
models[].temperature,models[].max_tokens) - Command defaults (e.g., chat/chat-web defaults: temperature=0.7, max_tokens=2048)
Any field in model config beyond the meta fields (id, name, provider, base_url, api_key, system, user_template) is treated as a model call parameter and automatically passed through to the LLM API.
System prompt priority (higher priority overrides lower):
- CLI argument (
-s/--system) - Batch config (
batch.system) - Model config (
models[].system) - Global config (
system)
User template priority (higher priority overrides lower):
- CLI argument (
--user-template) - Batch config (
batch.user_template) - Model config (
models[].user_template) - Global config (
user_template)
User template uses {content} as placeholder for original user content. Useful for fine-tuned models requiring specific prompt formats (e.g., "{content}/detail", "[INST]{content}[/INST]").
Environment variables (higher priority than config file):
FLEXLLM_BASE_URL/OPENAI_BASE_URLFLEXLLM_API_KEY/OPENAI_API_KEYFLEXLLM_MODEL/OPENAI_MODEL
Architecture
flexllm/
├── clients/ # All client implementations
│ ├── base.py # Abstract base class (LLMClientBase)
│ ├── llm.py # Unified entry point (LLMClient)
│ ├── openai.py # OpenAI-compatible backend
│ ├── gemini.py # Google Gemini backend
│ ├── claude.py # Anthropic Claude backend
│ ├── pool.py # Multi-endpoint load balancer
│ └── router.py # Provider routing strategies
├── agent/ # Agent layer (tool-use loop)
│ ├── client.py # AgentClient implementation
│ ├── types.py # AgentResult, ToolCallRecord
│ └── tools/ # Built-in tools (read/write/edit/glob/grep/bash)
├── cli/ # CLI commands and helpers
├── pricing/ # Cost estimation and tracking
├── serve.py # HTTP API server (flexllm serve)
├── cache/ # Response caching with IPC
├── async_api/ # High-performance async engine
└── msg_processors/ # Multi-modal message processing
The architecture follows a simple layered design:
AgentClient (tool-use loop, multi-turn chat, structured output)
│
└── LLMClient (single endpoint or multi-endpoint)
│ │
│ ├── ProviderRouter (round_robin)
│ ├── Health Monitor (failure threshold + auto recovery)
│ └── Shared Task Queue (dynamic load balancing)
│ │
└──────────── Backend Clients ─────┘
├── OpenAIClient
├── GeminiClient
└── ClaudeClient
│
└── LLMClientBase (Abstract - 4 methods to implement)
│
├── ConcurrentRequester (Async engine)
├── ResponseCache (Caching layer)
└── CostTracker (Cost monitoring)
API Reference
LLMClient
LLMClient(
provider: str = "auto", # "auto", "openai", "gemini", "claude"
model: str, # Model name
base_url: str = None, # API base URL (required for openai)
api_key: str = "EMPTY", # API key
cache: ResponseCacheConfig, # Cache config
concurrency_limit: int = 10, # Max concurrent requests
max_qps: float = None, # Max requests per second
retry_times: int = 3, # Retry count on failure
timeout: int = 120, # Request timeout (seconds)
)
Main Methods
| Method | Description |
|---|---|
chat_completions(messages) |
Single async request |
chat_completions_sync(messages) |
Single sync request |
chat_completions_batch(messages_list) |
Batch async with checkpoint |
iter_chat_completions_batch(messages_list) |
Streaming batch results |
chat_completions_stream(messages) |
Token-by-token streaming |
AgentClient
AgentClient(
client: LLMClient, # LLMClient instance (composition, not inheritance)
system: str = None, # System prompt
tools: list[dict] = None, # OpenAI-format tool definitions
tool_executor: Callable = None, # (name, arguments_json) -> result (sync or async)
max_rounds: int = 10, # Max tool-calling rounds per run
max_context_tokens: int = None, # Optional context window limit
)
| Method | Description |
|---|---|
run(user_input) |
Stateless single task with tool-use loop |
chat(user_input) |
Stateful multi-turn chat (auto-maintains history) |
reset() |
Clear conversation history |
Returns AgentResult with .content, .rounds, .tool_calls, .usage, .parsed.
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flexllm-0.8.4.tar.gz.
File metadata
- Download URL: flexllm-0.8.4.tar.gz
- Upload date:
- Size: 295.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f4d4280b7be2feb6e089e3f5d5afc0650208a94271fc39fd304da5949803b63
|
|
| MD5 |
eb7fd3a3f14e8424d6946285d2aba358
|
|
| BLAKE2b-256 |
6fbbb0197226ab2fb1c0f0b3cfd6b98d2c9acf13253f1939b290560aa6e4d1c5
|
Provenance
The following attestation bundles were made for flexllm-0.8.4.tar.gz:
Publisher:
python-publish.yml on KenyonY/flexllm
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flexllm-0.8.4.tar.gz -
Subject digest:
2f4d4280b7be2feb6e089e3f5d5afc0650208a94271fc39fd304da5949803b63 - Sigstore transparency entry: 1072646271
- Sigstore integration time:
-
Permalink:
KenyonY/flexllm@4cfd844982b11e190623a9b37f3d150cf5c55779 -
Branch / Tag:
refs/tags/v0.8.4 - Owner: https://github.com/KenyonY
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@4cfd844982b11e190623a9b37f3d150cf5c55779 -
Trigger Event:
push
-
Statement type:
File details
Details for the file flexllm-0.8.4-py3-none-any.whl.
File metadata
- Download URL: flexllm-0.8.4-py3-none-any.whl
- Upload date:
- Size: 234.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e0444aa6de4a16105d63b4d1d151435865553c4ff0c53453cdda11e4a973a27e
|
|
| MD5 |
a3f3d14856cbd7b33ad46667feccecf3
|
|
| BLAKE2b-256 |
2d8eac240139da3206c0536ffdb88d99f97aa141358a54f3893343960d29021f
|
Provenance
The following attestation bundles were made for flexllm-0.8.4-py3-none-any.whl:
Publisher:
python-publish.yml on KenyonY/flexllm
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
flexllm-0.8.4-py3-none-any.whl -
Subject digest:
e0444aa6de4a16105d63b4d1d151435865553c4ff0c53453cdda11e4a973a27e - Sigstore transparency entry: 1072646277
- Sigstore integration time:
-
Permalink:
KenyonY/flexllm@4cfd844982b11e190623a9b37f3d150cf5c55779 -
Branch / Tag:
refs/tags/v0.8.4 - Owner: https://github.com/KenyonY
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@4cfd844982b11e190623a9b37f3d150cf5c55779 -
Trigger Event:
push
-
Statement type: