Skip to main content

Add your description here

Project description

crow-cli

crow-cli is an Agent Client Protocol (ACP) native agent implementation that serves as the core execution engine for the Crow agent framework.

System Requirements

Platform Requirements
Linux glibc 2.35+ (Ubuntu 22.04+, Debian 12+, or equivalent)
macOS macOS 13+ (Ventura) - Intel & Apple Silicon
Windows Windows 10+ (64-bit)

Note: Linux binaries are built on Ubuntu 22.04 for maximum compatibility. If you're on an older distribution (Ubuntu 20.04, Debian 11, etc.), you'll need to build from source.

Installation

# Ensure you're in the correct project directory
git clone https://github.com/crow-cli/crow-cli.git
uv venv
# Install dependencies using uv
uv --project /path/to/crow/crow-cli sync

Or run directly:

uvx crow-cli --help

If you like having it available globally, you can install it using pip:

uv tool install crow-cli --python 3.14

Quick Start

uvx crow-cli init

Run Programmatically

import asyncio
from crow_cli.agent.main import agent_run

async def main():
    await agent_run()

if __name__ == "__main__":
    asyncio.run(main())

Configuration

Configuration lives in ~/.crow/config.yaml. See the configuration section below for details.

Features

1. ACP Protocol Native

  • Implements all ACP agent endpoints (initialize, new_session, load_session, prompt, cancel)
  • Full streaming support for token-by-token responses
  • Session persistence to SQLite database

2. MCP Tool Integration

  • Automatically discovers tools from connected MCP servers
  • Supports both MCP and ACP-native tool execution
  • Tool execution with progress updates

3. Streaming ReAct Loop

  • Real-time streaming of thinking tokens (for reasoning models)
  • Content token streaming
  • Tool call progress updates (pending → in_progress → completed/failed)

4. Cancellation Support

  • Task-based cancellation: Uses asyncio.Task.cancel() to immediately interrupt the LLM stream
  • State accumulator: Preserves partial thinking/content on cancellation
  • Safe history: Never persists tool calls on cancellation (avoids breaking conversation history)
  • Clean propagation: CancelledError propagates through the entire async stack

5. ACP Terminal Support

When the ACP client supports terminals (clientCapabilities.terminal: true):

  • Uses ACP-native terminals instead of MCP terminal calls
  • Better terminal display in the client
  • Live output streaming
  • Proper terminal lifecycle management

6. JSON Repair for Tool Calls

  • Automatically validates and repairs malformed JSON in tool call arguments
  • Critical for models like qwen3.5-plus that may produce incomplete JSON during streaming
  • Falls back to empty object {} if repair fails
  • Prevents poisoned conversation history from breaking future API calls

Built-in Tools

The agent automatically discovers and registers tools from connected MCP servers:

  • crow-mcp_terminal - Execute shell commands in the workspace
  • crow-mcp_write - Write content to files
  • crow-mcp_read - Read files with line numbers
  • crow-mcp_edit - Fuzzy string replacement in files
  • crow-mcp_web_search - Search the web using a search engine
  • crow-mcp_web_fetch - Fetch URL content as markdown

Architecture

High-Level Overview

┌──────────────────────────────────────────────────────────────────────────┐
│                           CROW-CLI ARCHITECTURE                           │
└──────────────────────────────────────────────────────────────────────────┘

┌─────────────────┐         ┌─────────────────┐         ┌─────────────────┐
│   client/       │  ───→   │   agent/        │  ───→   │   agent/        │
│   main.py       │         │   main.py       │         │   react.py      │
│                 │         │                 │         │                 │
│ CrowClient      │         │ AcpAgent        │         │ react_loop()    │
│ .prompt()       │         │ .prompt()       │         │                 │
│                 │         │                 │         │ 6 methods:      │
│                 │         │ ┌─────────────┐ │         │ • send_request  │
│                 │         │ │_prompt_tasks│ │         │ • process_chunk │
│                 │         │ │_cancel_events││         │ • process_tool..│
│                 │         │ │_state_accum..││         │ • process_resp..│
│                 │         │ └─────────────┘ │         │ • execute_tools │
│                 │         │                 │         │ • react_loop    │
└─────────────────┘         └─────────────────┘         └─────────────────┘
       │                           │                           │
       │  ACP Protocol             │  asyncio.Task             │
       │  (session_update)         │  cancellation             │
       │                           │                           │
       ▼                           ▼                           ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                         STREAMING FLOW                                   │
│                                                                          │
│  LLM Stream → process_chunk() → yield chunks → session_update() → UI    │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

The ReAct Loop - 6 Methods

┌──────────────────────────────────────────────────────────────────────────┐
│                        REACT_LOOP() - THE ORCHESTRATOR                   │
│                                                                          │
│  for turn in range(max_turns):                                          │
│      ┌─────────────────────────────────────────────────────────────┐    │
│      │  1. send_request()                                          │    │
│      │     - POST to LLM API                                       │    │
│      │     - Retry logic (exponential backoff)                     │    │
│      │     - Returns: async stream generator                       │    │
│      └─────────────────────────────────────────────────────────────┘    │
│                          │                                               │
│                          ▼                                               │
│      ┌─────────────────────────────────────────────────────────────┐    │
│      │  2. process_response()                                      │    │
│      │     - Iterates over stream                                  │    │
│      │     - Calls process_chunk() for each chunk                  │    │
│      │     - Yields: (msg_type, token)                             │    │
│      │                                                             │    │
│      │     ┌───────────────────────────────────────────────────┐   │    │
│      │     │  2a. process_chunk()                              │   │    │
│      │     │     - Parse delta from chunk                      │   │    │
│      │     │     - Accumulate: thinking, content, tool_calls   │   │    │
│      │     │     - Return: new_token for yielding              │   │    │
│      │     └───────────────────────────────────────────────────┘   │    │
│      │                                                             │    │
│      │     ┌───────────────────────────────────────────────────┐   │    │
│      │     │  2b. process_tool_call_inputs()                   │   │    │
│      │     │     - Called at end of stream                     │   │    │
│      │     │     - Validate JSON arguments                     │   │    │
│      │     │     - Repair malformed JSON (qwen3.5-plus fix)    │   │    │
│      │     │     - Return: list[tool_call dicts]               │   │    │
│      │     └───────────────────────────────────────────────────┘   │    │
│      └─────────────────────────────────────────────────────────────┘    │
│                          │                                               │
│                          ▼                                               │
│      ┌─────────────────────────────────────────────────────────────┐    │
│      │  3. execute_tool_calls()                                    │    │
│      │     - Route to: ACP terminal / MCP tools                    │    │
│      │     - Execute: read, write, edit, terminal, custom          │    │
│      │     - Return: list[tool_results]                            │    │
│      └─────────────────────────────────────────────────────────────┘    │
│                          │                                               │
│                          ▼                                               │
│      ┌─────────────────────────────────────────────────────────────┐    │
│      │  4. session.add_assistant_response()                        │    │
│      │     - Persist: thinking + content + tool_calls              │    │
│      │                                                             │    │
│      │  5. session.add_tool_response()                             │    │
│      │     - Persist: tool_results                                 │    │
│      └─────────────────────────────────────────────────────────────┘    │
│                                                                          │
│  Repeat until: no tool_calls → yield final_history → return             │
└──────────────────────────────────────────────────────────────────────────┘

Method Responsibilities

Method Purpose
send_request() HTTP to LLM with retry logic (exponential backoff)
process_chunk() Parse single streaming delta from LLM
process_tool_call_inputs() Validate/repair tool call JSON arguments
process_response() Orchestrate full stream, yield chunks
execute_tool_calls() Route & execute tools via MCP/ACP
react_loop() Main loop, ties it all together

Cancellation System

Evolution: Old vs New

OLD APPROACH ❌ - Scattered cancel event checks:

async def react_loop(..., cancel_event: asyncio.Event):
    for turn in range(max_turns):
        response = await send_request(...)
        async for chunk in response:
            if cancel_event.is_set():  # Check here
                break
            yield chunk
        
        # Check again after tool execution
        if cancel_event.is_set():  # Check here too
            break

Problems:

  • Checks scattered everywhere (easy to miss one)
  • Doesn't actually interrupt the LLM stream
  • Race conditions between checks

NEW APPROACH ✅ - Task-based cancellation:

# In agent/main.py - AcpAgent.prompt()
async def _execute_turn():
    async for chunk in react_loop(...):
        await self._conn.session_update(...)

task = asyncio.create_task(_execute_turn())
self._prompt_tasks[session_id] = task

try:
    return await task
except asyncio.CancelledError:
    return PromptResponse(stop_reason="cancelled")

# In agent/main.py - AcpAgent.cancel()
async def cancel(session_id: str):
    task = self._prompt_tasks.get(session_id)
    if task and not task.done():
        task.cancel()  # Forcefully interrupts LLM stream!

Benefits:

  • Single cancellation point (task.cancel())
  • Actually interrupts the LLM stream (asyncio magic)
  • No scattered checks to miss
  • Clean exception propagation
  • State accumulator preserves partial progress

Cancellation Sequence

User          Client           AcpAgent           react_loop          LLM
  │              │                  │                   │               │
  │  [Ctrl+C]    │                  │                   │               │
  │─────────────>│                  │                   │               │
  │              │                  │                   │               │
  │              │  cancel()        │                   │               │
  │              │─────────────────>│                   │               │
  │              │                  │                   │               │
  │              │                  │  task.cancel()    │               │
  │              │                  │──────────────────>│               │
  │              │                  │                   │               │
  │              │                  │                   │  CancelledError
  │              │                  │                   │<──────────────
  │              │                  │                   │               │
  │              │                  │                   │  # Don't persist
  │              │                  │                   │  # tool calls!
  │              │                  │                   │  session.add_..
  │              │                  │                   │  (empty tools)
  │              │                  │                   │               │
  │              │                  │  CancelledError   │               │
  │              │                  │<──────────────────│               │
  │              │                  │                   │               │
  │              │  PromptResponse  │                   │               │
  │              │  (cancelled)     │                   │               │
  │              │<─────────────────│                   │               │
  │              │                  │                   │               │
  │  "Cancelled" │                  │                   │               │
  │<─────────────│                  │                   │               │

Key Cancellation Insights

  1. asyncio.Task.cancel() is like yanking the power cord - it sends CancelledError through the entire async stack at the exact point where it's blocked waiting for I/O.

  2. State accumulator preserves partial thinking/content so we can persist something even when cancelled mid-stream.

  3. NEVER persist tool calls on cancellation - Tools weren't executed, so no tool responses exist in history. Next API call would fail with: "An assistant message with tool_calls must be followed by tool messages responding to each tool_call_id"

  4. CancelledError propagates up through the entire async call stack, interrupting at every level.

State Accumulator Pattern

Purpose: Preserve partial progress when cancellation hits mid-stream.

# In AcpAgent.__init__()
self._state_accumulators: dict[str, dict] = {}
# session_id → {"thinking": [], "content": [], "tool_calls": {}}

# In AcpAgent.prompt()
self._state_accumulators[session_id] = {
    "thinking": [],
    "content": [],
    "tool_calls": {},
}

# In process_response()
state_accumulator.update({
    "thinking": thinking,
    "content": content,
    "tool_calls": tool_calls,
})

async for chunk in response:
    thinking, content, tool_calls, new_token = process_chunk(...)
    state_accumulator["thinking"] = thinking  # Update
    state_accumulator["content"] = content    # Update
    state_accumulator["tool_calls"] = tool_calls  # Update

# In react_loop() cancellation handler
except asyncio.CancelledError:
    # state_accumulator has partial progress!
    session.add_assistant_response(
        state_accumulator["thinking"],  # What we got
        state_accumulator["content"],   # What we got
        [],  # NEVER tool calls!
    )

Tool Calling Stream Flow

LLM Response Stream:
┌─────────────────────────────────────────────────────────────────────────┐
│  Chunk 1: {"delta": {"content": "Let me"}}                             │
│  Chunk 2: {"delta": {"content": " check that"}}                        │
│  Chunk 3: {"delta": {"tool_calls": [{"index": 0,                       │
│               "function": {"name": "read"}}]}}                         │
│  Chunk 4: {"delta": {"tool_calls": [{"index": 0,                       │
│               "function": {"arguments": "{\"path\":"}}]}}              │
│  Chunk 5: {"delta": {"tool_calls": [{"index": 0,                       │
│               "function": {"arguments": " \"/tmp/test.txt\"}"}}]}}     │
│  Chunk 6: {"delta": {}, "usage": {...}}  ← END                         │
└─────────────────────────────────────────────────────────────────────────┘
                          │
                          ▼
process_chunk() accumulates:
┌─────────────────────────────────────────────────────────────────────────┐
│  content = ["Let me", " check that"]                                    │
│  tool_calls = {                                                         │
│      0: {                                                               │
│          "id": "call_abc123",                                           │
│          "function_name": "read",                                       │
│          "arguments": ["{\"path\":", " \"/tmp/test.txt\"}"]            │
│      }                                                                  │
│  }                                                                      │
└─────────────────────────────────────────────────────────────────────────┘
                          │
                          ▼
process_tool_call_inputs() validates/repairs:
┌─────────────────────────────────────────────────────────────────────────┐
│  arguments_str = "".join(tool_calls[0]["arguments"])                    │
│              = "{\"path\": \"/tmp/test.txt\"}"                          │
│                                                                         │
│  try:                                                                   │
│      json.loads(arguments_str)  # VALID!                                │
│  except:                                                                │
│      # Repair logic for qwen3.5-plus                                    │
│      # (add missing braces, fallback to {})                             │
│                                                                         │
│  tool_call_inputs = [{                                                  │
│      "id": "call_abc123",                                               │
│      "type": "function",                                                │
│      "function": {                                                      │
│          "name": "read",                                                │
│          "arguments": "{\"path\": \"/tmp/test.txt\"}"                  │
│      }                                                                  │
│  }]                                                                     │
└─────────────────────────────────────────────────────────────────────────┘
                          │
                          ▼
execute_tool_calls() routes and executes:
┌─────────────────────────────────────────────────────────────────────────┐
│  if tool_name == "read" and use_acp_read:                               │
│      result = execute_acp_read(...)                                     │
│  elif tool_name == "edit":                                              │
│      result = execute_acp_edit(...)                                     │
│  else:                                                                  │
│      result = execute_acp_tool(...)  # MCP fallback                     │
│                                                                         │
│  tool_results = [{                                                      │
│      "role": "tool",                                                    │
│      "tool_call_id": "call_abc123",                                     │
│      "content": "file contents here..."                                 │
│  }]                                                                     │
└─────────────────────────────────────────────────────────────────────────┘

Session Management

Creating a New Session

# When connecting via ACP, a new session is created automatically
# with the working directory and MCP servers provided by the client

Loading an Existing Session

# Sessions persist to the database and can be loaded by ID
# The load_session endpoint handles this automatically

Session Data Storage

Sessions are stored in SQLite with three main tables:

  • Prompt - System prompt templates (Jinja2)
  • Session - Session metadata (config, tools, model, cwd)
  • Message - Conversation messages (one row = one message, JSON-serialized)

Usage with ACP Clients

crow-cli is designed to work with any ACP-compatible client:

// In Zed
{
  "agent_servers": {
    "crow-cli": {
      "type": "custom",
      "command": "uvx",
      "args": ["crow-cli", "acp"]
    }
  }
}

ACP Client Capabilities

The agent automatically detects and uses client capabilities:

Capability When Enabled Behavior
terminal Client supports ACP terminals Uses ACP-native terminals
fs.write_text_file Client supports file writing Uses ACP file write
fs.read_text_file Client supports file reading Uses ACP file read

Project Structure

crow-cli/
├── src/crow_cli/
│   ├── __init__.py
│   ├── agent/
│   │   ├── __init__.py
│   │   ├── compact.py        # Conversation compaction
│   │   ├── configure.py      # Agent configuration
│   │   ├── context.py        # Context providers (directory tree, file fetching)
│   │   ├── db.py             # SQLAlchemy database models
│   │   ├── llm.py            # LLM client configuration
│   │   ├── logger.py         # Logging utilities
│   │   ├── main.py           # Agent entry point (AcpAgent class)
│   │   ├── mcp_client.py     # MCP client creation + tool extraction
│   │   ├── prompt.py         # Prompt building
│   │   ├── react.py          # ReAct loop implementation (6 methods)
│   │   ├── session.py        # Session management + persistence
│   │   ├── tools.py          # Tool execution functions
│   │   └── prompts/          # Jinja2 system prompt templates
│   │       └── system_prompt.jinja2
│   ├── cli/
│   │   ├── __init__.py
│   │   ├── init_cmd.py       # `crow init` command
│   │   └── main.py           # CLI entry point
│   └── client/
│       ├── __init__.py
│       └── main.py           # Programmatic client (CrowClient)
├── config/
│   ├── compose.yaml          # Docker compose for services
│   ├── config.yaml           # Default configuration
│   ├── .env.example          # Environment variables template
│   ├── searxng/
│   │   └── settings.yml      # SearXNG search config
│   └── prompts/              # Override prompts (user customization)
│       └── system_prompt.jinja2
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_agent_init.py
│   └── unit/
│       └── test_session.py
├── examples/
│   ├── mc_escher_loop.py
│   └── quick_test.py
├── pyproject.toml
├── README.md
├── TODO.md
└── run_tests.sh

Development

Running Tests

# From the project root
uv run --project /path/to/crow-cli pytest crow-cli/tests/

Building

# Build the package
uv build --project /path/to/crow-cli

# Install locally
pip install --force-reinstall ./crow-cli/dist/*.whl

Troubleshooting

Connection Issues

If the agent can't connect to MCP servers:

  1. Verify MCP server config in ~/.crow/config.yaml
  2. Check that the MCP server path is correct
  3. Ensure the server is executable

Session Loading Failures

If sessions fail to load:

  1. Check database exists: ls ~/.crow/crow.db
  2. Verify database permissions: chmod 644 ~/.crow/crow.db
  3. Check session ID exists in database

Terminal Not Working

If ACP terminals aren't working:

  1. Check client capabilities: clientCapabilities.terminal should be true
  2. Verify MCP terminal fallback is configured
  3. Check terminal command is valid in the workspace directory

Tool Call JSON Errors

If you see errors about malformed tool call arguments:

  1. This is handled automatically by process_tool_call_inputs()
  2. The function attempts to repair common JSON issues (missing braces/brackets)
  3. Falls back to empty object {} if repair fails
  4. Check logs for "Malformed tool arguments" warnings

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

crow_cli-0.1.22.tar.gz (183.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

crow_cli-0.1.22-py3-none-any.whl (80.6 kB view details)

Uploaded Python 3

File details

Details for the file crow_cli-0.1.22.tar.gz.

File metadata

  • Download URL: crow_cli-0.1.22.tar.gz
  • Upload date:
  • Size: 183.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for crow_cli-0.1.22.tar.gz
Algorithm Hash digest
SHA256 d8418041d07a98ac8c8d0c96ba848d84e82eebdbdf511d5850bf0747f744780a
MD5 ab302c2b50abb612c904bcf9e94c98e0
BLAKE2b-256 37bdc7e1c11650a8c725badd02bf41ddd7f28775883f45f6fccc0a31f5a5877d

See more details on using hashes here.

File details

Details for the file crow_cli-0.1.22-py3-none-any.whl.

File metadata

  • Download URL: crow_cli-0.1.22-py3-none-any.whl
  • Upload date:
  • Size: 80.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for crow_cli-0.1.22-py3-none-any.whl
Algorithm Hash digest
SHA256 f25924528a200e15a8bfe363e2be1f800b42e83f2a52a5084b12c3121eec9290
MD5 2971778ffde69adab7f87000bcd53357
BLAKE2b-256 79e8d18f56a31e17ac162e743cf0c587fb0bd4edf7ad51eefaf2c08942b2589c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page