Skip to main content

Python SDK for Compresr - Intelligent prompt compression service

Project description

Compresr Python SDK

Intelligent context compression service to optimize LLM costs and performance. Reduce your LLM API costs by 30-70% through intelligent context compression.

Installation

pip install compresr

Quick Start

API Key Setup

Get your API key from compresr.ai:

  1. Create an account at compresr.ai
  2. Navigate to Dashboard → API Keys
  3. Click "Create New Key" and copy it (shown only once!)

Two Client Types

1. CompressionClient — Token-Level Compression

Intelligently removes less important tokens while preserving meaning. Best for compressing raw, unstructured text like system prompts, documents, or retrieved passages before sending to an LLM. Supports a tunable compression_ratio to control how aggressively to compress.

from compresr import CompressionClient

client = CompressionClient(api_key="cmp_your_api_key")

# Agnostic compression (espresso_v1, default) — no query needed
result = client.compress(
    context="Your very long context that needs compression...",
)

print(f"Original: {result.data.original_tokens} tokens")
print(f"Compressed: {result.data.compressed_tokens} tokens")
print(f"Saved: {result.data.tokens_saved} tokens")

# Query-specific compression (latte_v1) — query REQUIRED
result = client.compress(
    context="Python was created in 1991. JavaScript in 1995. Java in 1995.",
    query="Who created Python?",
    compression_model_name="latte_v1",
)

print(f"Compressed (query-relevant): {result.data.compressed_context}")

2. FilterClient — Chunk-Level Filtering

Keeps or drops entire chunks based on query relevance — no partial rewriting. Best for RAG pipelines where your retriever returns many chunks and you want to discard the irrelevant ones before stuffing them into a prompt. Fast and binary: a chunk is either in or out.

from compresr import FilterClient

client = FilterClient(api_key="cmp_your_api_key")

# Query is always required
result = client.filter(
    chunks=[
        "Python is a programming language created by Guido van Rossum.",
        "JavaScript was created by Brendan Eich for web browsers.",
        "Java was developed by James Gosling at Sun Microsystems.",
    ],
    query="Who created Python?",
)

print(f"Filtered chunks: {result.data.compressed_context}")
print(f"Saved: {result.data.tokens_saved} tokens")

Integration with OpenAI

Agnostic compression:

from compresr import CompressionClient
from openai import OpenAI

compresr = CompressionClient(api_key="cmp_xxx")
openai_client = OpenAI(api_key="sk-xxx")

compressed = compresr.compress(
    context="Your long system prompt or document...",
)

response = openai_client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": compressed.data.compressed_context},
        {"role": "user", "content": "Analyze this data..."}
    ]
)

print(f"Saved {compressed.data.tokens_saved} tokens!")

Query-specific compression (RAG/QA):

from compresr import CompressionClient
from openai import OpenAI

compresr = CompressionClient(api_key="cmp_xxx")
openai_client = OpenAI(api_key="sk-xxx")

user_question = "What is machine learning?"

compressed = compresr.compress(
    context="Retrieved documents with lots of information...",
    query=user_question,
    compression_model_name="latte_v1",
)

response = openai_client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": compressed.data.compressed_context},
        {"role": "user", "content": user_question}
    ]
)

Chunk filtering for RAG pipelines:

from compresr import FilterClient
from openai import OpenAI

filter_client = FilterClient(api_key="cmp_xxx")
openai_client = OpenAI(api_key="sk-xxx")

user_question = "What is Python?"
retrieved_chunks = ["Chunk about Python...", "Chunk about Java...", "Chunk about ML..."]

filtered = filter_client.filter(
    chunks=retrieved_chunks,
    query=user_question,
)

response = openai_client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "\n".join(filtered.data.compressed_context)},
        {"role": "user", "content": user_question}
    ]
)

Streaming Support

Both clients support real-time streaming:

from compresr import CompressionClient, FilterClient

# Compression streaming
client = CompressionClient(api_key="cmp_your_api_key")
for chunk in client.compress_stream(
    context="Your long context...",
):
    print(chunk.content, end="", flush=True)

# Query-specific compression streaming
for chunk in client.compress_stream(
    context="Your long context...",
    query="What is important here?",
    compression_model_name="latte_v1",
):
    print(chunk.content, end="", flush=True)

# Filter streaming
filter_client = FilterClient(api_key="cmp_your_api_key")
for chunk in filter_client.filter_stream(
    chunks=["Chunk 1...", "Chunk 2..."],
    query="What is relevant?",
):
    print(chunk.content, end="", flush=True)

Async Support

Both clients support async/await:

import asyncio
from compresr import CompressionClient, FilterClient

async def main():
    # Compression async
    client = CompressionClient(api_key="cmp_your_api_key")
    result = await client.compress_async(
        context="Your context...",
    )

    # Query-specific compression async
    result = await client.compress_async(
        context="Your context...",
        query="What matters here?",
        compression_model_name="latte_v1",
    )

    # Filter async
    filter_client = FilterClient(api_key="cmp_your_api_key")
    filtered = await filter_client.filter_async(
        chunks=["Chunk 1...", "Chunk 2..."],
        query="What is relevant?",
    )

    await client.close()
    await filter_client.close()

asyncio.run(main())

API Reference

Client Initialization

from compresr import CompressionClient, FilterClient

# Token-level compression
client = CompressionClient(
    api_key="cmp_your_api_key",  # Required
    timeout=30                    # Optional: request timeout in seconds
)

# Chunk-level filtering
filter_client = FilterClient(
    api_key="cmp_your_api_key",  # Required
    timeout=30                    # Optional: request timeout in seconds
)

CompressionClient Methods

Method Description
compress() Compress context (latte_v1 requires query param)
compress_async() Async compress
compress_stream() Stream compression

FilterClient Methods

Method Description
filter() Filter chunks by query relevance (query REQUIRED)
filter_async() Async filter
filter_stream() Stream filtered chunks

Response Structure

# CompressResult (same for both clients)
result.data.original_context      # Original input
result.data.compressed_context    # Compressed/filtered output
result.data.original_tokens       # Token count before
result.data.compressed_tokens     # Token count after
result.data.actual_compression_ratio  # Achieved ratio (0-1)
result.data.tokens_saved          # Tokens saved
result.data.duration_ms           # Processing time

Available Models

Compression Models (CompressionClient)

Model Query Compression Ratio Best For
espresso_v1 Not needed Supported System prompts, documents, general context — when there is no specific question
latte_v1 Required Supported RAG / QA — compress retrieved text while preserving answer-relevant tokens

Filter Models (FilterClient)

Model Query Compression Ratio Best For
coldbrew_v1 Required Not supported RAG chunk selection — quickly drop irrelevant chunks from a retrieval set

Error Handling

from compresr import CompressionClient, FilterClient
from compresr.exceptions import (
    CompresrError,
    AuthenticationError,
    RateLimitError,
    ValidationError,
)

client = CompressionClient(api_key="cmp_your_api_key")

try:
    result = client.compress(context="Your context...")
except AuthenticationError:
    print("Invalid API key")
except RateLimitError:
    print("Rate limit exceeded")
except ValidationError as e:
    print(f"Invalid request: {e}")
except CompresrError as e:
    print(f"API error: {e}")

Requirements

  • Python 3.9+
  • httpx >= 0.27.0
  • pydantic >= 2.10.0

License

Proprietary License

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

compresr-2.0.1.tar.gz (17.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

compresr-2.0.1-py3-none-any.whl (18.9 kB view details)

Uploaded Python 3

File details

Details for the file compresr-2.0.1.tar.gz.

File metadata

  • Download URL: compresr-2.0.1.tar.gz
  • Upload date:
  • Size: 17.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for compresr-2.0.1.tar.gz
Algorithm Hash digest
SHA256 01e78b5ea97cf534609d0625b8133baac5a8fed0879a9d725c1064b7ff40dcda
MD5 595b6ae6d73195bc783f3d4b2eac5f9a
BLAKE2b-256 5715d8050a9b685331e6af536819cadc6c9e92e779e5ef2ada6c073f27c96be7

See more details on using hashes here.

File details

Details for the file compresr-2.0.1-py3-none-any.whl.

File metadata

  • Download URL: compresr-2.0.1-py3-none-any.whl
  • Upload date:
  • Size: 18.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.11

File hashes

Hashes for compresr-2.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9041d873451d68af9110af2230d109ca816173b7db5228a31d6d5d08235506e2
MD5 d29146e11b735bbcb9a7dd327b464d96
BLAKE2b-256 2d51c110ab4215f7af284da55895d0fa8405add6a073f9aae2d5f2eaa4deb209

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page