Skip to main content

A pluggable, non-blocking asynchronous task queue and MCP server for AI Agents

Project description

AgyQueue: Non-Blocking Asynchronous Task Execution for AI Agents

Run on Google Cloud

AgyQueue is a lightweight, non-blocking asynchronous task execution framework built for AI agents. It allows agents to offload long-running operations—such as code generation, compilation, testing, and validation—to background workers while keeping client conversation threads responsive.


🚀 Core Capabilities

  1. Asynchronous Task Offloading: Agents submit tasks via MCP tool calls or REST API requests and immediately receive a task_id, returning execution control back to the user chat session.
  2. Resilience & Reconnection: Task status is saved in a persistent database store, allowing clients to query progress or fetch final outputs even after network drops or worker crashes.
  3. Interactive Console UI: Built-in dark mode dashboard that visualizes enqueued runs, progress bars, vertical event history timelines (e.g. WorkflowExecutionStarted, WorkflowTaskScheduled), and aggregated markdown reports.
  4. Well-Architected Architecture:
    • Threaded Connection Pooling: Uses ThreadedConnectionPool for PostgreSQL backend tasks, preventing socket exhaust under high concurrent query volumes.
    • Graceful Shutdown: Catches SIGTERM / SIGINT signals, allowing workers to finish their active task run cleanly before eviction.
    • Workspace Isolation: Spawns copy-on-write scratch worktree folders for subprocess executions, ensuring zero file-system cross-interference.
  5. Configurable Multi-Channel Notifications: Real-time push notifications over Slack webhook and SMTP Email when tasks hit terminal states.
  6. Model Context Protocol (MCP) Support: Serves as a standard stdio/sse MCP server compatible with Cursor, Claude Desktop, Claude Code, and Copilot CLI.

🛠️ Architecture

AgyQueue supports both hybrid cloud-agnostic execution flows and fully-managed Google Cloud configurations:

1. Cloud-Agnostic Core Flow

AgyQueue Cloud-Agnostic Architecture

2. Google Cloud Native Production Deployment

AgyQueue GCP System Architecture

Google Cloud & Google AI Managed Services Mapping

When migrating from local development/fallback to production cloud scaling, AgyQueue integrates natively with Google Cloud managed services:

Component Dev Fallback Production GCP Service
Agent Reasoning Core Local LLM Vertex AI Gemini API (Gemini 2.5 Flash / Pro)
API Server Compute Local Python Process Google Cloud Run (Event-driven serverless container)
Worker Node Compute Local Daemon Process Google Kubernetes Engine (GKE) or Cloud Run Jobs
Task Message Broker SQLite (FIFO Table) Google Cloud Pub/Sub or Cloud Memorystore for Redis
Persistent Task Store SQLite Database Google Cloud SQL for PostgreSQL or Cloud Spanner
Workspace Repos Local Directory Google Cloud Storage (GCS) Buckets
Traffic Router Direct Port Binding Google Cloud Load Balancing (HTTPS Layer 7 Router)

📦 Installation & Setup

Install as Python Package (Local Development)

You can install AgyQueue locally in editable mode for development and testing:

python3 -m venv .venv
source .venv/bin/activate
pip install -e .
cp .env.example .env

Install from PyPI

Once published, users can install AgyQueue directly from PyPI:

pip install agyqueue

🏁 Quick Start

1. Run Standalone Dev Demo

Spins up a local server, enqueues an SRE compliance check and FastAPI code generator workflow, executes it in isolated directories, and outputs the final report:

python examples/demo.py

2. Open the Web Console Dashboard

Start the standalone SSE server and background worker:

# Terminal 1: Start Server
export AGYQUEUE_TRANSPORT=sse
python -m agyqueue.mcp_server

# Terminal 2: Start Worker
python -m agyqueue.worker

Navigate to http://localhost:8000/dashboard to submit runs, inspect progress, and view timeline event history logs.


📁 Examples & Client Integrations

The examples/ folder contains detailed client scripts for every connection method:


🔌 Connecting to Agentic Frameworks

1. Google Agent Development Kit (ADK) & Agent Engine

The Google ADK allows Python-defined agents to be deployed to Agent Engine (Vertex AI Reasoning Engine). Import and register AgyQueue's functional tools directly:

from google.adk.agents import Agent
from agyqueue.client import submit_async_task, check_task_progress, get_task_output, cancel_running_task

# Define the coordinator agent
orchestrator_agent = Agent(
    name="deployment_coordinator",
    model="gemini-2.5-flash",
    instruction=(
        "You coordinate SRE compliance checks and API code generation workloads. "
        "Use submit_async_task to spawn background jobs. Do not wait for jobs in a loop. "
        "Return the task_id to the user/orchestrator so that progress can be tracked asynchronously."
    ),
    tools=[
        submit_async_task,
        check_task_progress,
        get_task_output,
        cancel_running_task
    ]
)

2. Registering with Gemini Enterprise & Agent Registry

Expose the deployed URL to Gemini Enterprise using the agents-cli tool:

agents-cli publish gemini-enterprise \
  --name "agyqueue-service" \
  --description "Exposes asynchronous background task execution, cancellation, and progress tracking tools." \
  --url "https://agyqueue-server-dev-xxxx-uc.a.run.app/sse" \
  --type "mcp"

💻 Local IDE Integration (MCP)

Claude Desktop Configuration

Add the following to your claude_desktop_config.json (located in ~/Library/Application Support/Claude/ on macOS):

{
  "mcpServers": {
    "agyqueue": {
      "command": "python",
      "args": ["-m", "agyqueue.mcp_server"],
      "env": {
        "PYTHONPATH": "/absolute/path/to/your/agyqueue/repo",
        "AGYQUEUE_TRANSPORT": "stdio",
        "AGYQUEUE_DB_PATH": "/absolute/path/to/your/agyqueue/repo/agyqueue.db"
      }
    }
  }
}

Cursor Configuration

  1. Open Cursor Settings -> Features -> MCP.
  2. Click + Add New MCP Server.
  3. Configure:
    • Name: AgyQueue
    • Type: command
    • Command: PYTHONPATH=. .venv/bin/python -m agyqueue.mcp_server (relative to your repo path).

VS Code Configuration (Cline / Roo Code Extensions)

Append the AgyQueue stdio server configuration to cline_mcp_settings.json:

{
  "mcpServers": {
    "agyqueue": {
      "command": "python",
      "args": ["-m", "agyqueue.mcp_server"],
      "env": {
        "PYTHONPATH": "/absolute/path/to/your/agyqueue/repo",
        "AGYQUEUE_TRANSPORT": "stdio",
        "AGYQUEUE_DB_PATH": "/absolute/path/to/your/agyqueue/repo/agyqueue.db"
      }
    }
  }
}

Claude Code CLI Integration

Add AgyQueue as a local tool under the "mcpServers" dictionary in ~/.config/claude-code/mcp.json.


🌳 Multi-Agent Tree-Based Orchestration Pattern

AgyQueue natively supports parent-child task aggregation. When an orchestrator agent splits a large request into parallel child workloads, it submits the subtasks with a parent_id parameter:

  1. Task Submission:
    • The orchestrator spawns subagents by submitting child tasks referencing the parent ID.
    • The parent task status transitions to WAITING.
  2. Worker Isolation:
    • Background workers pick up and execute child tasks in isolated workspaces (git_worktree or copy-on-write temp folders) in parallel.
  3. Automatic Resumption:
    • As soon as the final sibling completes, the worker recognizes that all siblings are done and automatically re-queues the parent task.
    • The orchestrator wakes up, reads the logs of all completed subtasks, aggregates the results, and marks the parent task as COMPLETED.

🐳 Docker Compose Deployment (Redis + SSE Mode)

To run the complete system with Redis queues and PostgreSQL stores:

  1. Build and Run:
    docker compose up --build
    
  2. Verify over SSE:
    python examples/mcp_sse_client.py
    

☁️ Cloud Deployment (Cloud Run & GKE)

For Google Cloud production environments, configuration templates are located in deployment/:

  • Cloud Run: Serverless container setups with Memorystore Redis and Cloud SQL PostgreSQL.
  • GKE: Scalable Kubernetes microservices with Horizontal Pod Autoscalers (HPA).

🔔 Slack & Email Notifications

To configure alerts on task completion or failure, set the following environment variables:

# Enable channels (comma-separated list)
export AGYQUEUE_NOTIFICATIONS="slack,email"

# 1. Slack Webhook Configuration
export SLACK_WEBHOOK_URL="https://example.com/slack-webhook-url"

# 2. Email SMTP Configuration
export SMTP_HOST="smtp.gmail.com"
export SMTP_PORT="587"
export SMTP_USER="your-email@gmail.com"
export SMTP_PASSWORD="your-app-password"
export SMTP_FROM="noreply@agyqueue.internal"
export SMTP_TO="recipient-alert-inbox@domain.com"

🚀 Publishing to PyPI

Step 1: Install Build Tools

pip install --upgrade build twine

Step 2: Build the Distribution Package

python -m build

Step 3: Upload to PyPI

Run twine to securely publish to PyPI under your profile:

python -m twine upload dist/*

Once uploaded, anyone can install and run the AgyQueue CLI globally:

pip install agyqueue

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agyqueue-0.1.2.tar.gz (37.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agyqueue-0.1.2-py3-none-any.whl (36.7 kB view details)

Uploaded Python 3

File details

Details for the file agyqueue-0.1.2.tar.gz.

File metadata

  • Download URL: agyqueue-0.1.2.tar.gz
  • Upload date:
  • Size: 37.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for agyqueue-0.1.2.tar.gz
Algorithm Hash digest
SHA256 d760c01b1bff00b89fdf4c7d7b4cade055a3c10d5b2dbdadcf3c01b712e1da6e
MD5 49ec36167a94f4d04fbb0b1817bb42d7
BLAKE2b-256 f3c4db04a7eda5ae24041caa7763caf52a28b9f3b431b4fd141a2d832dc57396

See more details on using hashes here.

File details

Details for the file agyqueue-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: agyqueue-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 36.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for agyqueue-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 ee572e8835d49b408bbd1680298019e6988daa94760f61e5b821f9c6a1818ea8
MD5 ed1950a3d696b68572769977be980cc4
BLAKE2b-256 95f2f9cefbc26068915b863fc0c97b656edf0c93362a762065f09dfd4556ba68

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page