Skip to main content

Streaming agents

Project description

Cogency

Streaming agents with stateless context assembly

Architecture

Cogency enables stateful agent execution through:

  1. Persist-then-rebuild: Write every LLM output event to storage immediately, rebuild context from storage on each execution
  2. Delimiter protocol: Explicit state signaling (§think, §call, §execute, §respond, §end)
  3. Stateless design: Agent and context assembly are pure functions, all state externalized to storage

This eliminates stale state bugs, enables crash recovery, and provides concurrent safety by treating storage as single source of truth.

Execution Modes

Resume: WebSocket session persists between tool calls

agent = Agent(llm="openai", mode="resume")
# Maintains LLM session, injects tool results without context replay
# Constant token usage per turn

Replay: Fresh HTTP request per iteration

agent = Agent(llm="openai", mode="replay")
# Rebuilds context from storage each iteration
# Context grows with conversation
# Universal LLM compatibility

Auto: Resume with fallback to Replay

agent = Agent(llm="openai", mode="auto")  # Default
# Uses WebSocket when available, falls back to HTTP

Token Efficiency

Resume mode maintains LLM session state, eliminating context replay on every tool call:

Turns Replay (context replay) Resume (session state) Efficiency
8 31,200 tokens 6,000 tokens 5.2x
16 100,800 tokens 10,800 tokens 9.3x
32 355,200 tokens 20,400 tokens 17.4x

Mathematical proof: docs/proof.md

Installation

pip install cogency
export OPENAI_API_KEY="your-key"

Usage

from cogency import Agent

agent = Agent(llm="openai")
async for event in agent("What files are in this directory?"):
    if event["type"] == "respond":
        print(event["content"])

Event Streaming

Semantic mode (default): Complete thoughts

async for event in agent("Debug this code", chunks=False):
    if event["type"] == "think":
        print(f"~ {event['content']}")
    elif event["type"] == "respond":
        print(f"> {event['content']}")

Token mode: Real-time streaming

async for event in agent("Debug this code", chunks=True):
    if event["type"] == "respond":
        print(event["content"], end="", flush=True)

Multi-turn Conversations

# Stateless (default)
async for event in agent("What's in this directory?"):
    if event["type"] == "respond":
        print(event["content"])

# Stateful with profile learning
async for event in agent(
    "Continue our code review",
    conversation_id="review_session",
    user_id="developer"  # For profile learning and multi-tenancy
):
    if event["type"] == "respond":
        print(event["content"])

Built-in Tools

  • read, write, edit, list, find
  • search, scrape
  • recall
  • shell

Custom Tools

from cogency import Tool, ToolResult

class DatabaseTool(Tool):
    name = "query_db"
    description = "Execute SQL queries"
    
    async def execute(self, sql: str, user_id: str):
        # Your implementation
        return ToolResult(
            outcome="Query executed",
            content="Results..."
        )

agent = Agent(llm="openai", tools=[DatabaseTool()])

Configuration

agent = Agent(
    llm="openai",                    # or "gemini", "anthropic"
    mode="auto",                     # "resume", "replay", or "auto"
    storage=custom_storage,          # Custom Storage implementation
    identity="Custom agent identity",
    instructions="Additional context",
    tools=[CustomTool()],
    max_iterations=10,
    history_window=None,             # None = full history (default), int = sliding window
    profile=True,                    # Enable automatic user learning
    learn_every=5,                   # Profile update frequency
    debug=False
)

Context Management

Cogency uses conversational message assembly for natural LLM interaction:

Storage: Events stored as typed records (clean content, no delimiters)

{"type": "user", "content": "debug this"}
{"type": "think", "content": "checking logs"}
{"type": "call", "content": '{"name": "read", ...}'}

Assembly: Transforms to proper conversational structure

[
  {"role": "system", "content": "PROTOCOL + TOOLS"},
  {"role": "user", "content": "debug this"},
  {"role": "assistant", "content": "§think: checking logs\n§call: {...}\n§execute"},
  {"role": "user", "content": "§result: ..."}
]

Cost control with history_window:

  • history_window=None - Full conversation history (default)
  • history_window=20 - Last 20 messages (sliding window for cost control)
  • Custom compaction: Query storage directly and implement app-level strategy

Considerations:

  • Resume mode: Context sent once at connection, minimal impact
  • Replay mode: Context grows with conversation, windowing recommended for long sessions
  • Frontier models: Handle longer contexts better, can use None
  • Weaker models: May benefit from smaller windows (e.g., 10-20 messages)

Multi-Provider Support

agent = Agent(llm="openai")     # GPT-4o Realtime API (WebSocket)
agent = Agent(llm="gemini")     # Gemini Live (WebSocket)
agent = Agent(llm="anthropic")  # Claude (HTTP only)

Memory System

Passive profile: Automatic user preference learning

agent = Agent(llm="openai", profile=True)
# Learns patterns from interactions, embedded in system prompt

Active recall: Cross-conversation search

# Agent uses recall tool to query past interactions
§call: {"name": "recall", "args": {"query": "previous python debugging"}}
§execute
[SYSTEM: Found 3 previous debugging sessions...]
§respond: Based on your previous Python work...

Streaming Protocol

Agents signal execution state explicitly:

§think: I need to examine the code structure first
§call: {"name": "read", "args": {"file": "main.py"}}
§execute
[SYSTEM: Found syntax error on line 15]
§respond: Fixed the missing semicolon. Code runs correctly now.
§end

Parser detects delimiters, accumulator handles tool execution, persister writes to storage.

See docs/protocol.md for complete specification.

Documentation

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cogency-3.2.0.tar.gz (55.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cogency-3.2.0-py3-none-any.whl (74.1 kB view details)

Uploaded Python 3

File details

Details for the file cogency-3.2.0.tar.gz.

File metadata

  • Download URL: cogency-3.2.0.tar.gz
  • Upload date:
  • Size: 55.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Darwin/24.5.0

File hashes

Hashes for cogency-3.2.0.tar.gz
Algorithm Hash digest
SHA256 8fdc6ecdf2bc7778be020f537e20b1b75e0cccddcf07750b66bb0722d26acebf
MD5 e511d9ee5729f0b4f5ace7b6fd012dd5
BLAKE2b-256 5b94340eaed7d0b82c963c9394f398d5c13e34f918dd497ed47470fff7e13ec1

See more details on using hashes here.

File details

Details for the file cogency-3.2.0-py3-none-any.whl.

File metadata

  • Download URL: cogency-3.2.0-py3-none-any.whl
  • Upload date:
  • Size: 74.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.3 CPython/3.12.10 Darwin/24.5.0

File hashes

Hashes for cogency-3.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d7f6556f89d4b5bc8c7370630d3f571b8e279ab4ba42b82c05fa997eff3c9e35
MD5 2afb11f175178606a2858f5b23a477b7
BLAKE2b-256 fa292320a9b873a94ff67d92f1231d7f54006030693628ec92748297727d24b3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page