Skip to main content

Context compression middleware for AI agent frameworks — automatically summarizes conversation history when token limits are approached

Project description

🗜️ AzureAICommunity - Agent - Context Compression Middleware

Automatic conversation history compression middleware for AI agent applications built on the Agent Framework.

PyPI Version Python Versions PyPI Downloads License PyPI Status GitHub Follow YouTube Channel YouTube Subscribers LinkedIn

**Keep long multi-turn

Getting Started · Configuration · Usage · Contributing


Overview

agentaicommunity-agent-context-compression is a plug-and-play context management layer for AI agent pipelines built on agent-framework. It counts tokens in the conversation history before each LLM call, and when the count approaches your configured limit it automatically summarises the older messages — keeping the session alive without hitting context-window errors.


✨ Features

Feature
🗜️ Automatic compression — fires transparently when the token threshold is hit
⚙️ Configurable trigger — set max_tokens and trigger_ratio to match your model's context window
🔒 Keep recent messageskeep_last_messages preserves the tail of the conversation verbatim
🔧 Tool-call aware — keeps assistant + tool message pairs together during split
🔔 Block or allowon_threshold_reached callback lets you log, alert, or stop the request
📝 Structured logging — pass your own logging.Logger; no print() calls
📊 Token usage trackinglast_usage on the middleware instance after each call (both modes)
🌊 Streaming support — works transparently with stream=True; usage captured via framework hooks
🔌 Provider-agnostic — works with any agent-framework LLM client (Ollama, OpenAI, Azure, etc.)

📦 Installation

pip install agentaicommunity-agent-context-compression

Or install from source:

cd ContextCompression
pip install -e .

🚀 Quick Start

Non-streaming

import asyncio
import logging
from agent_framework.ollama import OllamaChatClient
from context_compression import ContextCompressionMiddleware, TokenThresholdReachedError

logging.basicConfig(level=logging.INFO)

summarizer = ContextCompressionMiddleware(
    llm_client=OllamaChatClient(model="gemma3:4b"),  # LLM used to write the summary
    max_tokens=8000,        # compress when history approaches this size
    trigger_ratio=0.80,     # fire at 80% = 6400 tokens
    keep_last_messages=8,   # always keep the 8 most recent messages verbatim
    logger=logging.getLogger("ContextCompression"),
)

agent = OllamaChatClient(model="gemma3:4b").as_agent(
    name="MyAgent",
    instructions="You are a helpful assistant.",
    middleware=[summarizer],
)
session = agent.create_session()

async def main():
    for message in ["Hi, my name is Vinoth.", "I work in Python.", "What is my name?"]:
        response = await agent.run(message, session=session)
        print(response.text)

asyncio.run(main())

Streaming

async def main():
    messages = ["Hi, my name is Vinoth.", "I work in Python.", "What is my name?"]
    for message in messages:
        stream = agent.run(message, session=session, stream=True)
        async for update in stream:
            chunk = getattr(update, "text", None)
            if chunk:
                print(chunk, end="", flush=True)
        print()
        await stream.get_final_response()  # finalizes stream and populates last_usage

asyncio.run(main())

🧑‍💻 Usage

Threshold Callback Payload

Every call to on_threshold_reached receives a dict:

{
    "tokens_used":    87,   # current history token count
    "max_tokens":    100,   # your configured max
    "trigger_tokens": 75,   # the threshold that was crossed
}

Return True → compression proceeds normally.
Return False → request is blocked and TokenThresholdReachedError is raised.

Token Usage After Each Call

last_usage is populated after every call — both streaming and non-streaming:

# Non-streaming
response = await agent.run("Hello", session=session)

# Streaming
stream = agent.run("Hello", session=session, stream=True)
async for update in stream:
    pass
await stream.get_final_response()

# Either way, last_usage is populated:
u = summarizer.last_usage
print(u["input_token_count"])   # tokens sent to LLM
print(u["output_token_count"])  # tokens in the response
print(u["total_token_count"])   # input + output

Handling TokenThresholdReachedError

from context_compression import ContextCompressionMiddleware, TokenThresholdReachedError

try:
    response = await agent.run(message, session=session)
except TokenThresholdReachedError as e:
    print(f"Blocked: {e}")
    # handle gracefully — notify user, end session, etc.

⚙️ Configuration

ContextCompressionMiddleware

Parameter Type Default Description
llm_client any LLM client required Client used to generate the summary (can be a smaller/cheaper model)
max_tokens int 8000 History size limit (tiktoken count)
trigger_ratio float 0.80 Compression fires at max_tokens × trigger_ratio
keep_last_messages int 8 Number of recent messages to keep verbatim after compression
model_encoding str "cl100k_base" tiktoken encoding for token counting
on_threshold_reached Callable[[dict], bool] None Callback fired at threshold. Return True to compress, False to block
logger logging.Logger None Your logger. Falls back to logging.getLogger(__name__)

Blocking runaway sessions

def my_callback(info: dict) -> bool:
    if info["tokens_used"] > 500_000:
        return False   # block — raises TokenThresholdReachedError
    return True        # allow compression

middleware = ContextCompressionMiddleware(
    ...,
    on_threshold_reached=my_callback,
)

Provider Compatibility:
Works with any LLM client that implements the agent-framework ChatClient interface.


🤝 Contributing

Contributions are welcome! Please open an issue to discuss what you'd like to change before submitting a pull request.

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/my-feature)
  3. Commit your changes (git commit -m 'Add my feature')
  4. Push to the branch (git push origin feature/my-feature)
  5. Open a Pull Request

👤 Author

Built and maintained by Vinoth Rajendran.


📄 License

MIT — see LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file agentaicommunity_agent_context_compression-0.1.0.tar.gz.

File metadata

File hashes

Hashes for agentaicommunity_agent_context_compression-0.1.0.tar.gz
Algorithm Hash digest
SHA256 579b4e1879c8e9fa1bec14faf5855a65376183f3d4490d9604c4c0c2ff3c174e
MD5 1a282e0909efc0fcaf3827825aa9fee7
BLAKE2b-256 a80a2ba5f72565e475bbb537ae50206e8d24f089375946b58f21f897ceac3ee8

See more details on using hashes here.

File details

Details for the file agentaicommunity_agent_context_compression-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agentaicommunity_agent_context_compression-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a4c48a212551989138c3f562aaf6eb61d07a2370cf76ea25c5b4c344bb616d59
MD5 a63b75f24c4931a5f6b4f7b071a53846
BLAKE2b-256 cf5839d9b95a408915908e340fa2798abc40b5b0b9a279def7d1f622ed7a686f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page