Skip to main content

High-performance LLM client with batch processing, caching, and checkpoint recovery

Project description

flexllm

High-Performance LLM Client for Production
Batch processing with checkpoint recovery, response caching, load balancing, and cost tracking

PyPI version License pypi downloads


Why flexllm?

Built for production batch processing at scale.

from flexllm import LLMClient

client = LLMClient(base_url="https://api.openai.com/v1", model="gpt-4", api_key="...")

# Process 100k requests with automatic checkpoint recovery
# Interrupted at 50k? Just restart - it continues from 50,001
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",  # Progress saved here
    show_progress=True,
    track_cost=True,  # Real-time cost display
)

Scale out across multiple endpoints with zero code change.

from flexllm import LLMClientPool

# Same API, multiple GPU nodes — faster endpoints automatically handle more tasks
pool = LLMClientPool(
    endpoints=[
        {"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50},
        {"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20},
        {"base_url": "http://gpu3:8000/v1", "model": "qwen"},
    ],
    fallback=True,  # Auto-switch on endpoint failure
)

results = await pool.chat_completions_batch(messages_list, output_jsonl="results.jsonl")

Features

Feature Description
Checkpoint Recovery Batch jobs auto-resume from interruption - process millions of requests safely
Multi-Endpoint Pool Distribute tasks across GPU nodes with shared-queue dynamic balancing and automatic failover
Response Caching Built-in caching with TTL and IPC multi-process sharing
Cost Tracking Real-time cost monitoring with budget control
High-Performance Async Fine-grained concurrency control, QPS limiting, and streaming
Multi-Provider Supports OpenAI-compatible APIs, Gemini, Claude

Installation

pip install flexllm

# With all features
pip install flexllm[all]

Claude Code Integration

Enable Claude Code to use flexllm for LLM API calls, batch processing, and more:

flexllm install-skill

After installation, Claude Code gains the ability to use flexllm across all your projects.


Quick Start

Basic Usage

from flexllm import LLMClient

# Recommended: use context manager for proper resource cleanup
async with LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key"
) as client:
    # Async call
    response = await client.chat_completions([
        {"role": "user", "content": "Hello!"}
    ])

# Sync version (also supports context manager)
with LLMClient(model="gpt-4", base_url="...", api_key="...") as client:
    response = client.chat_completions_sync([
        {"role": "user", "content": "Hello!"}
    ])

# Get token usage
result = await client.chat_completions(
    messages=[{"role": "user", "content": "Hello!"}],
    return_usage=True,  # Returns ChatCompletionResult with usage info
)
print(f"Tokens: {result.usage}")  # {'prompt_tokens': 10, 'completion_tokens': 5, ...}

Batch Processing with Checkpoint Recovery

Process millions of requests safely. If interrupted, just restart - it continues from where it left off.

messages_list = [
    [{"role": "user", "content": f"Question {i}"}]
    for i in range(100000)
]

# Interrupted at 50,000? Re-run and it continues from 50,001.
results = await client.chat_completions_batch(
    messages_list,
    output_jsonl="results.jsonl",  # Progress saved here
    show_progress=True,
)

Multi-Endpoint Pool

Distribute batch tasks across multiple GPU nodes / API endpoints. Faster endpoints automatically handle more tasks via a shared queue model, with automatic failover and health monitoring.

LLMClient and LLMClientPool share the same API. Single endpoint → use LLMClient; multiple endpoints → use LLMClientPool.

from flexllm import LLMClientPool

pool = LLMClientPool(
    endpoints=[
        # Each endpoint can have independent rate limits
        {"base_url": "http://gpu1:8000/v1", "model": "qwen", "concurrency_limit": 50, "max_qps": 100},
        {"base_url": "http://gpu2:8000/v1", "model": "qwen", "concurrency_limit": 20, "max_qps": 50},
        {"base_url": "http://gpu3:8000/v1", "model": "qwen"},
    ],
    fallback=True,               # Auto-switch on endpoint failure
    failure_threshold=3,         # Mark unhealthy after 3 consecutive failures
    recovery_time=60.0,          # Try to recover after 60 seconds
)

# Single request — automatic failover across endpoints
result = await pool.chat_completions(messages)

# Distributed batch — shared queue, dynamic load balancing, checkpoint recovery
results = await pool.chat_completions_batch(
    messages_list,
    distribute=True,
    output_jsonl="results.jsonl",
    track_cost=True,
)

# Streaming with failover
async for chunk in pool.chat_completions_stream(messages):
    print(chunk, end="", flush=True)

Highlights:

  • Shared Queue: Faster endpoints automatically pull more tasks — no manual tuning needed
  • Automatic Failover: Failed requests retry on healthy endpoints; unhealthy nodes auto-recover
  • Per-Endpoint Config: Independent concurrency_limit and max_qps for each endpoint
  • Full Feature Support: Checkpoint recovery, caching, cost tracking all work with Pool

Response Caching

from flexllm import LLMClient, ResponseCacheConfig

client = LLMClient(
    model="gpt-4",
    base_url="https://api.openai.com/v1",
    api_key="your-api-key",
    cache=ResponseCacheConfig(enabled=True, ttl=3600),  # 1 hour TTL
)

# First call: API request (~2s, ~$0.01)
result1 = await client.chat_completions(messages)

# Second call: Cache hit (~0.001s, $0)
result2 = await client.chat_completions(messages)

Cost Tracking

# Track costs during batch processing
results, cost_report = await client.chat_completions_batch(
    messages_list,
    return_cost_report=True,
)
print(f"Total cost: ${cost_report.total_cost:.4f}")

# Real-time cost display in progress bar
results = await client.chat_completions_batch(
    messages_list,
    track_cost=True,  # Shows 💰 $0.0012 in progress bar
)

Streaming

# Token-by-token streaming
async for chunk in client.chat_completions_stream(messages):
    print(chunk, end="", flush=True)

# Batch streaming - process results as they complete
async for result in client.iter_chat_completions_batch(messages_list):
    process(result)

Thinking Mode (Reasoning Models)

Unified interface for DeepSeek-R1, Qwen3, Claude extended thinking, Gemini thinking.

result = await client.chat_completions(
    messages,
    thinking=True,      # Enable thinking
    return_raw=True,
)

# Unified parsing across all providers
parsed = client.parse_thoughts(result.data)
print("Thinking:", parsed["thought"])
print("Answer:", parsed["answer"])

Tool Calls (Function Calling)

tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get weather information",
        "parameters": {
            "type": "object",
            "properties": {"location": {"type": "string"}},
            "required": ["location"],
        },
    },
}]

result = await client.chat_completions(
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
    tools=tools,
    return_usage=True,
)

if result.tool_calls:
    for call in result.tool_calls:
        print(f"Call: {call.function['name']}({call.function['arguments']})")

CLI

# Quick ask
flexllm ask "What is Python?"

# Interactive chat
flexllm chat

# Batch processing with cost tracking
flexllm batch input.jsonl -o output.jsonl --track-cost
flexllm batch input.jsonl -o output.jsonl -n 5           # First 5 records only
flexllm batch data.jsonl -o out.jsonl -uf text -sf sys   # Custom field names

# Model management
flexllm list              # Configured models
flexllm models            # Remote available models
flexllm set-model gpt-4   # Set default model
flexllm test              # Test connection
flexllm init              # Initialize config file

# Utilities
flexllm pricing gpt-4     # Query model pricing
flexllm credits           # Check API key balance
flexllm mock              # Start mock LLM server for testing

Configuration

Config file location: ~/.flexllm/config.yaml

See config.example.yaml for a comprehensive configuration example with all available options, or config.quickstart.yaml for a minimal quick-start template.

# Default model
default: "gpt-4"

# Global system prompt (applied to all commands unless overridden)
system: "You are a helpful assistant."

# Global user content template (applied to all user messages unless overridden)
# Use {content} as placeholder for original user content
# user_template: "{content}/detail"

# Model list
models:
  - id: gpt-4
    name: gpt-4
    provider: openai
    base_url: https://api.openai.com/v1
    api_key: your-api-key
    system: "You are a GPT-4 assistant."  # Model-specific system prompt (optional)

  - id: local-finetuned
    name: local-finetuned
    provider: openai
    base_url: http://localhost:8000/v1
    api_key: EMPTY
    user_template: "{content}/detail"  # Model-specific user template for fine-tuned models (optional)

  - id: local-ollama
    name: local-ollama
    provider: openai
    base_url: http://localhost:11434/v1
    api_key: EMPTY

# Batch command config (optional)
batch:
  concurrency: 20
  cache: true
  track_cost: true
  system: "You are a batch processing assistant."  # Batch-specific system prompt (optional)
  # user_template: "[INST]{content}[/INST]"  # Batch-specific user template (optional)

System prompt priority (higher priority overrides lower):

  1. CLI argument (-s/--system)
  2. Batch config (batch.system)
  3. Model config (models[].system)
  4. Global config (system)

User template priority (higher priority overrides lower):

  1. CLI argument (--user-template)
  2. Batch config (batch.user_template)
  3. Model config (models[].user_template)
  4. Global config (user_template)

User template uses {content} as placeholder for original user content. Useful for fine-tuned models requiring specific prompt formats (e.g., "{content}/detail", "[INST]{content}[/INST]").

Environment variables (higher priority than config file):

  • FLEXLLM_BASE_URL / OPENAI_BASE_URL
  • FLEXLLM_API_KEY / OPENAI_API_KEY
  • FLEXLLM_MODEL / OPENAI_MODEL

Architecture

flexllm/
├── clients/           # All client implementations
│   ├── base.py        # Abstract base class (LLMClientBase)
│   ├── llm.py         # Unified entry point (LLMClient)
│   ├── openai.py      # OpenAI-compatible backend
│   ├── gemini.py      # Google Gemini backend
│   ├── claude.py      # Anthropic Claude backend
│   ├── pool.py        # Multi-endpoint load balancer
│   └── router.py      # Provider routing strategies
├── pricing/           # Cost estimation and tracking
│   ├── cost_tracker.py
│   └── token_counter.py
├── cache/             # Response caching with IPC
├── async_api/         # High-performance async engine
└── msg_processors/    # Multi-modal message processing

The architecture follows a simple layered design:

LLMClient (single endpoint)  /  LLMClientPool (multi-endpoint)
    │                                  │
    │                                  ├── ProviderRouter (round_robin)
    │                                  ├── Health Monitor (failure threshold + auto recovery)
    │                                  └── Shared Task Queue (dynamic load balancing)
    │                                  │
    └──────────── Backend Clients ─────┘
                    ├── OpenAIClient
                    ├── GeminiClient
                    └── ClaudeClient
                            │
                            └── LLMClientBase (Abstract - 4 methods to implement)
                                    │
                                    ├── ConcurrentRequester (Async engine)
                                    ├── ResponseCache (Caching layer)
                                    └── CostTracker (Cost monitoring)

API Reference

LLMClient

LLMClient(
    provider: str = "auto",        # "auto", "openai", "gemini", "claude"
    model: str,                    # Model name
    base_url: str = None,          # API base URL (required for openai)
    api_key: str = "EMPTY",        # API key
    cache: ResponseCacheConfig,    # Cache config
    concurrency_limit: int = 10,   # Max concurrent requests
    max_qps: float = None,         # Max requests per second
    retry_times: int = 3,          # Retry count on failure
    timeout: int = 120,            # Request timeout (seconds)
)

Main Methods

Method Description
chat_completions(messages) Single async request
chat_completions_sync(messages) Single sync request
chat_completions_batch(messages_list) Batch async with checkpoint
iter_chat_completions_batch(messages_list) Streaming batch results
chat_completions_stream(messages) Token-by-token streaming

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flexllm-0.5.3.tar.gz (173.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flexllm-0.5.3-py3-none-any.whl (159.2 kB view details)

Uploaded Python 3

File details

Details for the file flexllm-0.5.3.tar.gz.

File metadata

  • Download URL: flexllm-0.5.3.tar.gz
  • Upload date:
  • Size: 173.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.5.3.tar.gz
Algorithm Hash digest
SHA256 27663b7e17b35b8754d6a089a0ef69c047405734b00780dfb19051ec3da1e160
MD5 be091c3418b17b418e7bd4a64fa22ea3
BLAKE2b-256 845ee7a2b23387e8b0c506a98d316ff0f18db5523b2fb3620b069045efc670d1

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.5.3.tar.gz:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file flexllm-0.5.3-py3-none-any.whl.

File metadata

  • Download URL: flexllm-0.5.3-py3-none-any.whl
  • Upload date:
  • Size: 159.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for flexllm-0.5.3-py3-none-any.whl
Algorithm Hash digest
SHA256 0704968ef8f10c9acbb956297d8fe2f65a2468a17929e592683ad975c124046b
MD5 ef86fb55834fdef514885d079a356740
BLAKE2b-256 f05e15c738842c9840f2b48e85ca8bf28d00956ab4d1cd07af8b8b2529e075e8

See more details on using hashes here.

Provenance

The following attestation bundles were made for flexllm-0.5.3-py3-none-any.whl:

Publisher: python-publish.yml on KenyonY/flexllm

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page