Skip to main content

QueenBee and WorkerBee for CommandHive.xyz

Project description

Overview

This is agent orchestrator backend to create agents on fly, store all your MCP variables and get tasks done round the clock from these agents. Visit CommandHive Website

✅ Completed

  • Ensure sampling as MCP feature is working
  • Create Pub/Sub support using Redis
  • Create Pub/Sub support using Kafka
  • Create Pub/Sub support using MSK (Managed Kafka)
  • Ensure tool calling is done with confirmation
  • Check compatibility with Python 3.11
  • Check compatibility with Python 3.12

⏳ To Do Next

  • Integrate smart contract for tool calling
  • Each agent has its own wallet (defined in config)
  • Pub/Sub support for RabbitMQ
  • Pub/Sub support for Google Pub/Sub

Sample Agent

import asyncio
import json
from typing import Dict, List
import os
from mcp_agent.core.fastagent import FastAgent
from dotenv import load_dotenv
load_dotenv()
import redis.asyncio as aioredis

'''
 redis-cli PUBLISH agent:queen '{"type": "user", "content": "tell me price of polygon please", "channel_id": "agent:queen",
  "metadata": {"model": "claude-3-5-haiku-latest", "name": "default"}}'
'''

subagents_config = [
    {
        "name": "finder",
        "instruction": "You are an agent with access to the internet; you need to search about the latest prices of Bitcoin and other major cryptocurrencies and report back.",
        "servers": ["fetch", "brave"],
        "model": "haiku"
    },
    {
        "name": "reporter",
        "instruction": "You are an agent that takes the raw pricing data provided by the finder agent and produces a concise, human-readable summary highlighting current prices, 24-hour changes, and key market insights.",
        "servers": [],  
        "model": "haiku"
    }
]

# Sample JSON config for MCP
sample_json_config = {
    "mcp": {
        "servers": {
            "fetch": {
                "name": "fetch",
                "description": "A server for fetching links",
                "transport": "stdio",
                "command": "uvx",
                "args": ["mcp-server-fetch"],
                "tool_calls": [
                    {
                        "name": "fetch",
                        "seek_confirm": True,
                        "time_to_confirm": 120000,  
                        "default": "reject" 
                    }
                ]
            },
            "brave": {
                "name": "brave",
                "description": "Brave search server",
                "transport": "stdio",
                "command": "npx",
                "args": [
                    "-y",
                    "@modelcontextprotocol/server-brave-search"
                ],
                "env": {
                    "BRAVE_API_KEY": "BSANIwUPPxwC9wchogL5I6UNkWGffh3"
                }
            }
        }
    },
    "default_model": "haiku",   
    "logger": {
        "level": "info",
        "type": "console"
    },
    "pubsub_enabled": True,
    "pubsub_config": {
        "use_redis": True,
        "channel_name": "queen",
        "redis": {
            "host": "localhost",
            "port": 6379,
            "db": 0,
            "channel_prefix": "agent:"
        }
    },
    "anthropic": {
        "api_key": os.environ.get("CLAUDE_API_KEY", "") 
    }
}

# Create FastAgent instance
fast = FastAgent(
    name="queen",  # Changed name to match channel name used in publishing
    json_config=sample_json_config,
    parse_cli_args=False
)

# Dynamically create agents from JSON configuration using a for loop
def create_agents_from_config(config_list: List[Dict]) -> List[str]:
    """
    Create agents dynamically from JSON configuration.
    Returns a list of agent names for use in the orchestrator.
    """
    agent_names = []
    
    for agent_config in config_list:
        name = agent_config.get("name")
        instruction = agent_config.get("instruction", "")
        servers = agent_config.get("servers", [])
        model = agent_config.get("model", None)
        
        if not name:
            continue
            
        # Create agent decorator kwargs
        agent_kwargs = {
            "name": name,
            "instruction": instruction,
            "servers": servers
        }
        
        # Add model if specified
        if model:
            agent_kwargs["model"] = model
            
        # Create the agent using the decorator
        @fast.agent(**agent_kwargs)
        def agent_function():
            """Dynamically created agent function"""
            pass
            
        agent_names.append(name)
    
    return agent_names

# Create agents from configuration
created_agent_names = create_agents_from_config(subagents_config)

# Create orchestrator with the dynamically created agents
@fast.orchestrator(
    name="orchestrate", 
    agents=created_agent_names,  # Use the list of created agent names
    plan_type="full",
    model="haiku"
)
async def orchestrate_task():
    """Orchestrator function"""
    pass

async def main():
    """Test initializing FastAgent with JSON config in interactive mode."""
    
    # Create Redis client
    redis_client = aioredis.Redis(
        host="localhost",
        port=6379,
        db=0,
        decode_responses=True
    )
    
    # Register agents and keep it running
    async with fast.run() as agent:        
        try:
            # Subscribe to the input channel
            pubsub = redis_client.pubsub()
            await pubsub.subscribe("agent:queen")
            
            # Initial task for the orchestrator
            initial_task = """
           Can you find the price of VANA token and if it is more than 50 percent of it;s lowest then give command to sell it off. tell me now sell it off or hold it. 
            """
            
            await agent.orchestrate(initial_task)
            
            # Keep running and listen for Redis messages
            while True:
                # Process Redis messages directly
                message = await pubsub.get_message(ignore_subscribe_messages=True)
                if message and message.get('type') == 'message':
                    try:
                        # Process the message data
                        data = message.get('data')
                        if isinstance(data, bytes):
                            data = data.decode('utf-8')
                        
                        # Try to parse JSON
                        try:
                            data_obj = json.loads(data)
                            
                            # If this is a user message, extract content and send to orchestrator
                            if data_obj.get('type') == 'user' and 'content' in data_obj:
                                user_input = data_obj['content']
                                
                                # Send to orchestrator instead of individual agent
                                response = await agent.orchestrate(user_input)
                                
                        except json.JSONDecodeError:
                            # Try to process as plain text
                            response = await agent.orchestrate(data)
                            
                    except Exception as e:
                        import traceback
                
                # Small delay to prevent CPU spike
                await asyncio.sleep(0.05)
                
        finally:
            # Clean up Redis connection
            if 'pubsub' in locals():
                await pubsub.unsubscribe("agent:queen")
            await redis_client.close()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        pass

Sample Listener

import redis
import time

def main():
    # Connect to Redis (adjust host/port/db as needed)
    r = redis.Redis(host='localhost', port=6379, db=0)

    # Create a PubSub object and subscribe to the channel
    pubsub = r.pubsub()
    channel_name = 'agent:queen'
    pubsub.subscribe(channel_name)
    print(f"Subscribed to channel: {channel_name}")

    # Loop forever, polling for new messages
    while True:
        message = pubsub.get_message()
        if message:
            # Print the raw message dict
            print(message)
        # Sleep briefly to avoid busy‑waiting
        time.sleep(0.05)

if __name__ == '__main__':
    main()

Disclaimer

This repo is cloned from fast-agent

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bee_agent-0.3.0.tar.gz (270.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

bee_agent-0.3.0-py3-none-any.whl (350.6 kB view details)

Uploaded Python 3

File details

Details for the file bee_agent-0.3.0.tar.gz.

File metadata

  • Download URL: bee_agent-0.3.0.tar.gz
  • Upload date:
  • Size: 270.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for bee_agent-0.3.0.tar.gz
Algorithm Hash digest
SHA256 202917492cc15fd518ef30f8a10d8179930cd483771c431ebb33bd5a54cf31f4
MD5 d1e6be17a24ac44445d4084e00147501
BLAKE2b-256 258fd6db4d30418ce01a9b8d178e0fa4ac488d798c1bf416fe1ac2c7b767f783

See more details on using hashes here.

File details

Details for the file bee_agent-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: bee_agent-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 350.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.3

File hashes

Hashes for bee_agent-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8692c48eaaa65d48a3445cbfb4b3abdb0af950f51011780332c87f38dd2660df
MD5 476f890569c008a4e56cbfd227582e4d
BLAKE2b-256 c82785f53c9351a3a654fead2d3dc59b70021026cfd3204b7152f3f6e2a7755c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page