Skip to main content

A pluggable, non-blocking asynchronous task queue and MCP server for AI Agents

Project description

AgyQueue: Non-Blocking Asynchronous Task Execution for AI Agents

Run on Google Cloud

AgyQueue is a lightweight, non-blocking asynchronous task execution framework built for AI agents. It allows agents to offload long-running operations—such as code generation, compilation, testing, and validation—to background workers while keeping client conversation threads responsive.


🚀 Core Capabilities

  1. Asynchronous Task Offloading: Agents submit tasks via MCP tool calls or REST API requests and immediately receive a task_id, returning execution control back to the user chat session.
  2. Resilience & Reconnection: Task status is saved in a persistent database store, allowing clients to query progress or fetch final outputs even after network drops or worker crashes.
  3. Interactive Console UI: Built-in dark mode dashboard that visualizes enqueued runs, progress bars, vertical event history timelines (e.g. WorkflowExecutionStarted, WorkflowTaskScheduled), and aggregated markdown reports.
  4. Well-Architected Architecture:
    • Threaded Connection Pooling: Uses ThreadedConnectionPool for PostgreSQL backend tasks, preventing socket exhaust under high concurrent query volumes.
    • Graceful Shutdown: Catches SIGTERM / SIGINT signals, allowing workers to finish their active task run cleanly before eviction.
    • Workspace Isolation: Spawns copy-on-write scratch worktree folders for subprocess executions, ensuring zero file-system cross-interference.
  5. Configurable Multi-Channel Notifications: Real-time push notifications over Slack webhook and SMTP Email when tasks hit terminal states.
  6. Model Context Protocol (MCP) Support: Serves as a standard stdio/sse MCP server compatible with Cursor, Claude Desktop, Claude Code, and Copilot CLI.

🛠️ Architecture

AgyQueue supports both hybrid cloud-agnostic execution flows and fully-managed Google Cloud configurations:

1. Cloud-Agnostic Core Flow

AgyQueue Cloud-Agnostic Architecture

2. Google Cloud Native Production Deployment

AgyQueue GCP System Architecture

Google Cloud & Google AI Managed Services Mapping

When migrating from local development/fallback to production cloud scaling, AgyQueue integrates natively with Google Cloud managed services:

Component Dev Fallback Production GCP Service
Agent Reasoning Core Local LLM Vertex AI Gemini API (Gemini 2.5 Flash / Pro)
API Server Compute Local Python Process Google Cloud Run (Event-driven serverless container)
Worker Node Compute Local Daemon Process Google Kubernetes Engine (GKE) or Cloud Run Jobs
Task Message Broker SQLite (FIFO Table) Google Cloud Pub/Sub or Cloud Memorystore for Redis
Persistent Task Store SQLite Database Google Cloud SQL for PostgreSQL or Cloud Spanner
Workspace Repos Local Directory Google Cloud Storage (GCS) Buckets
Traffic Router Direct Port Binding Google Cloud Load Balancing (HTTPS Layer 7 Router)

📦 Installation & Setup

Install as Python Package (Local Development)

You can install AgyQueue locally in editable mode for development and testing:

python3 -m venv .venv
source .venv/bin/activate
pip install -e .
cp .env.example .env

Install from PyPI

Once published, users can install AgyQueue directly from PyPI:

pip install agyqueue

🏁 Quick Start

1. Run Standalone Dev Demo

Spins up a local server, enqueues an SRE compliance check and FastAPI code generator workflow, executes it in isolated directories, and outputs the final report:

python examples/demo.py

2. Open the Web Console Dashboard

Start the standalone SSE server and background worker:

# Terminal 1: Start Server
export AGYQUEUE_TRANSPORT=sse
python -m agyqueue.mcp_server

# Terminal 2: Start Worker
python -m agyqueue.worker

Navigate to http://localhost:8000/dashboard to submit runs, inspect progress, and view timeline event history logs.


📁 Examples & Client Integrations

The examples/ folder contains detailed client scripts for every connection method:


🔌 Connecting to Agentic Frameworks

1. Google Agent Development Kit (ADK) & Agent Engine

The Google ADK allows Python-defined agents to be deployed to Agent Engine (Vertex AI Reasoning Engine). Import and register AgyQueue's functional tools directly:

from google.adk.agents import Agent
from agyqueue.client import submit_async_task, check_task_progress, get_task_output, cancel_running_task

# Define the coordinator agent
orchestrator_agent = Agent(
    name="deployment_coordinator",
    model="gemini-2.5-flash",
    instruction=(
        "You coordinate SRE compliance checks and API code generation workloads. "
        "Use submit_async_task to spawn background jobs. Do not wait for jobs in a loop. "
        "Return the task_id to the user/orchestrator so that progress can be tracked asynchronously."
    ),
    tools=[
        submit_async_task,
        check_task_progress,
        get_task_output,
        cancel_running_task
    ]
)

2. Registering with Gemini Enterprise & Agent Registry

Expose the deployed URL to Gemini Enterprise using the agents-cli tool:

agents-cli publish gemini-enterprise \
  --name "agyqueue-service" \
  --description "Exposes asynchronous background task execution, cancellation, and progress tracking tools." \
  --url "https://agyqueue-server-dev-xxxx-uc.a.run.app/sse" \
  --type "mcp"

💻 Local IDE Integration (MCP)

Claude Desktop Configuration

Add the following to your claude_desktop_config.json (located in ~/Library/Application Support/Claude/ on macOS):

{
  "mcpServers": {
    "agyqueue": {
      "command": "python",
      "args": ["-m", "agyqueue.mcp_server"],
      "env": {
        "PYTHONPATH": "/absolute/path/to/your/agyqueue/repo",
        "AGYQUEUE_TRANSPORT": "stdio",
        "AGYQUEUE_DB_PATH": "/absolute/path/to/your/agyqueue/repo/agyqueue.db"
      }
    }
  }
}

Cursor Configuration

  1. Open Cursor Settings -> Features -> MCP.
  2. Click + Add New MCP Server.
  3. Configure:
    • Name: AgyQueue
    • Type: command
    • Command: PYTHONPATH=. .venv/bin/python -m agyqueue.mcp_server (relative to your repo path).

VS Code Configuration (Cline / Roo Code Extensions)

Append the AgyQueue stdio server configuration to cline_mcp_settings.json:

{
  "mcpServers": {
    "agyqueue": {
      "command": "python",
      "args": ["-m", "agyqueue.mcp_server"],
      "env": {
        "PYTHONPATH": "/absolute/path/to/your/agyqueue/repo",
        "AGYQUEUE_TRANSPORT": "stdio",
        "AGYQUEUE_DB_PATH": "/absolute/path/to/your/agyqueue/repo/agyqueue.db"
      }
    }
  }
}

Claude Code CLI Integration

Add AgyQueue as a local tool under the "mcpServers" dictionary in ~/.config/claude-code/mcp.json.


🌳 Multi-Agent Tree-Based Orchestration Pattern

AgyQueue natively supports parent-child task aggregation. When an orchestrator agent splits a large request into parallel child workloads, it submits the subtasks with a parent_id parameter:

  1. Task Submission:
    • The orchestrator spawns subagents by submitting child tasks referencing the parent ID.
    • The parent task status transitions to WAITING.
  2. Worker Isolation:
    • Background workers pick up and execute child tasks in isolated workspaces (git_worktree or copy-on-write temp folders) in parallel.
  3. Automatic Resumption:
    • As soon as the final sibling completes, the worker recognizes that all siblings are done and automatically re-queues the parent task.
    • The orchestrator wakes up, reads the logs of all completed subtasks, aggregates the results, and marks the parent task as COMPLETED.

🐳 Docker Compose Deployment (Redis + SSE Mode)

To run the complete system with Redis queues and PostgreSQL stores:

  1. Build and Run:
    docker compose up --build
    
  2. Verify over SSE:
    python examples/mcp_sse_client.py
    

☁️ Cloud Deployment (Cloud Run & GKE)

For Google Cloud production environments, configuration templates are located in deployment/:

  • Cloud Run: Serverless container setups with Memorystore Redis and Cloud SQL PostgreSQL.
  • GKE: Scalable Kubernetes microservices with Horizontal Pod Autoscalers (HPA).

🔔 Slack & Email Notifications

To configure alerts on task completion or failure, set the following environment variables:

# Enable channels (comma-separated list)
export AGYQUEUE_NOTIFICATIONS="slack,email"

# 1. Slack Webhook Configuration
export SLACK_WEBHOOK_URL="https://example.com/slack-webhook-url"

# 2. Email SMTP Configuration
export SMTP_HOST="smtp.gmail.com"
export SMTP_PORT="587"
export SMTP_USER="your-email@gmail.com"
export SMTP_PASSWORD="your-app-password"
export SMTP_FROM="noreply@agyqueue.internal"
export SMTP_TO="recipient-alert-inbox@domain.com"

🚀 Publishing to PyPI

Step 1: Install Build Tools

pip install --upgrade build twine

Step 2: Build the Distribution Package

python -m build

Step 3: Upload to PyPI

Run twine to securely publish to PyPI under your profile:

python -m twine upload dist/*

Once uploaded, anyone can install and run the AgyQueue CLI globally:

pip install agyqueue

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agyqueue-0.1.0.tar.gz (36.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agyqueue-0.1.0-py3-none-any.whl (35.2 kB view details)

Uploaded Python 3

File details

Details for the file agyqueue-0.1.0.tar.gz.

File metadata

  • Download URL: agyqueue-0.1.0.tar.gz
  • Upload date:
  • Size: 36.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for agyqueue-0.1.0.tar.gz
Algorithm Hash digest
SHA256 72c273f783dad7921f0fa404e5cf711ca8422c54ab013479de0f17d2733475b5
MD5 cfdfae3b97ef09c84da2a088c75b2e1d
BLAKE2b-256 47af4c7dbf379481348f9f5ba1d8f35e148973bbb33626949ba18698ccd728c9

See more details on using hashes here.

File details

Details for the file agyqueue-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: agyqueue-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 35.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for agyqueue-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 78524300e3ece54b3d000979bd73319b07368a8dac80c9fef47bf89d33bcdcb5
MD5 3dc85fece84cf9196867d256a6d4ea7f
BLAKE2b-256 a189554b0ad6afdc0d57f3eaec6b62d59d8317c6d9bbaf8ef4e57d94de8fa5a6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page