Skip to main content

分布式 Agent 调度框架

Project description

🚀 by-framework-python

Version Python Redis License


by-framework is a distributed Agent scheduling framework built on Redis Streams. It provides worker orchestration, session-scoped runtime state, and plugin-based agent capability registration for AI agent systems.


Table of Contents


✨ Key Features

  • Native Async: Built on Python asyncio, perfectly suited for I/O-intensive Agent tasks.
  • 🧩 Highly Plugin-Based: Plugin system with agent config registration for tools, prompts, skills, and lifecycle hooks.
  • 📊 State Management: Complete AgentContext support for easy streaming output, state transitions, and artifact handling.
  • 🔄 Decoupled Architecture: Separates control streams from data streams for scalable Worker orchestration.
  • 🎯 Agent Type Routing: Workers declare supported agent_types via get_agent_types() for routing and liveness checks.

🏗️ Architecture

The system uses event-driven design with high decoupling:

┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│   Client    │──────▶│  Redis Input │──────▶│   Gateway    │
│ (Gateway)   │       │     MQ       │       │   Worker     │
└─────────────┘       └──────────────┘       └──────┬───────┘
        ▲                                              │
        │                                              │
        │                                              ▼
┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│  Consumer   │◀─────│  Redis Data  │◀─────│   Business   │
│ / Backend   │       │   Streams    │       │   Logic      │
└─────────────┘       └──────────────┘       └──────────────┘

Core Components

  • Access Layer: GatewayClient publishes control commands to Redis Input MQ.
  • Scheduling Layer: Uses Redis Stream for competitive consumption and routing among Worker clusters.
  • Execution Layer: GatewayWorker actively pulls tasks and executes business logic in isolated workspaces.
  • Output Layer: Data is asynchronously pushed to session-scoped data streams for downstream consumers.

Worker Routing Semantics

There are three layers of routing semantics:

  • membership: Worker declares which agent_types it supports. This is a static relationship, only updated at startup and graceful shutdown.
  • online / heartbeat: Whether the Worker can currently accept tasks. Only online Workers are considered valid send targets.
  • worker_id lock: Prevents duplicate startup of the same worker_id. This is instance mutex, not part of agent type routing.

Production main path uses agent type stream:

  • Client writes to byai_gateway:ctrl:agent_type:{agent_type}
  • Multiple Workers under the same agent type consume competitively via Redis consumer group
  • Only checks "whether at least one online Worker exists for this agent type" before sending
  • Does not pre-select a specific Worker before sending

Debug or direct control path uses worker stream:

  • When target_worker_id is explicitly provided, messages are written to byai_gateway:ctrl:worker:{worker_id}
  • This path is only for debug, direct dispatch, or worker-level control commands
  • Explicitly checks if the worker is online before sending

Data Flow

User Request
    ↓
Gateway (write to control stream)
    ↓
Worker (consume control stream, process task)
    ↓
Redis Stream (write to session data stream)
    ↓
Backend or consumer (read session data stream)
    ↓
Frontend (render real-time AI response)

📦 Installation

Prerequisites

  • Python 3.12+
  • Redis 7.0+ (for message queue)

Install via pip

# Basic installation
pip install by-framework

# Development mode installation
pip install -e ".[dev]"

Install via uv (Recommended)

# Clone the project and install all dependencies
cd by-framework-python
uv sync

Workspace Development

This repository uses uv workspace to manage local package development.

Common commands:

# Sync workspace dependencies in repository root
uv sync

# Run package tests
uv run pytest tests

🚀 Quick Start

1. Create a Simple Agent Worker

Create my_agent.py:

import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker

class MyAssistant(GatewayWorker):
    def get_agent_types(self):
        # Declare the Agent types this Worker can handle
        return ["weather_agent", "chat_agent"]

    async def process_command(self, command, context: AgentContext):
        # Send streaming text chunks
        await context.emit_chunk("Processing your request...\n")

        # Simulate time-consuming operation
        await asyncio.sleep(0.5)

        # Update task state
        await context.emit_state("thinking")

        # Read request content from the command payload
        user_input = (
            command.content if isinstance(command.content, str) else str(command.content)
        )

        # Send thinking process
        await context.emit_chunk(f"I received: {user_input}\n")
        await asyncio.sleep(0.3)

        # Send final result
        await context.emit_chunk("This is my reply!")

        return {
            "status": "success",
            "message": "Task completed",
            "data": {"answer": "The weather is sunny today"}
        }

if __name__ == "__main__":
    run_worker(
        worker_class=MyAssistant,
        worker_id="worker-01",
        redis_host="localhost",
        redis_port=6379,
    )

2. Start Redis

docker run -d -p 6379:6379 redis:7-alpine

3. Start Worker

cd by-framework-python
uv run python my_agent.py

4. Send a Test Task

Create send_task.py:

import asyncio

from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis


async def send_task():
    # Initialize shared Redis client and registry
    redis = init_redis(host="localhost", port=6379)
    registry = WorkerRegistry(redis_client=redis)

    # ByaiGatewayClient accepts redis_client / registry
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="weather_agent",
        session_id="session-001",
        content="How's the weather in Beijing today?",
    )

    if response.success:
        print(f"Task sent, message ID: {response.message_id}")
    else:
        print(f"Send failed: {response.error}")

    await close_redis()


asyncio.run(send_task())

Run:

uv run python send_task.py

💡 Deep Dive

GatewayWorker Base Class

GatewayWorker is the base class for all custom Workers. You need to implement the following methods:

Method Required Description
get_agent_types() Yes Returns list of Agent types this Worker can handle
process_command(command, context) Yes Handle specific business logic

AgentContext

AgentContext provides the ability to interact with the runtime environment:

from by_framework import AgentContext, ArtifactEvent


async def process_command(self, command, context: AgentContext):
    # 1. Send streaming output
    await context.emit_chunk("Processing...")

    # 2. Send artifacts/structured data
    await context.emit_artifact(ArtifactEvent(url="https://example.com/result.json"))

    # 3. Get message ID and session ID
    msg_id = context.message_id
    session_id = context.session_id

    # 4. Call other Agents (supports suspending current task waiting for reply)
    result = await context.call_agent(
        target_agent_type="translator_agent",
        content="Hello",
        wait_for_reply=True
    )

Commands and Message Protocol

AskAgentCommand (Task Command)

from by_framework.core.protocol.commands import AskAgentCommand
from by_framework.core.protocol.message_header import MessageHeader

command = AskAgentCommand(
    header=MessageHeader(
        message_id="msg_123",
        session_id="sess_456",
        target_agent_type="weather_agent"
    ),
    content="Query Beijing weather",
    extra_payload={
        "location": "Beijing"
    }
)

Common EventType Values

Event Type Description
answerDelta Incremental answer content
reasoningLogDelta Reasoning or intermediate log output
appStreamResponse Marks stream completion
taskCreate Task creation related event
taskStop Task termination related event

🔌 Plugin System

Plugins are the foundation of By-Framework's extensibility. You can register tools, prompt templates, etc. through plugins.

Plugin Directory Structure

my_plugins/
├── weather_plugin.py
├── calculator_plugin.py
└── custom_hooks.py

Writing a Plugin

Create my_cool_plugin.py:

from by_framework import AgentConfig, AgentContext, Plugin, PluginBuildContext, PluginManifest
from typing import Any

class WeatherPlugin(Plugin):
    def __init__(self):
        super().__init__(PluginManifest(
            plugin_id="weather_plugin",
            version="1.0.0",
        ))

    async def register_agent_configs(self, build_context: PluginBuildContext) -> list[AgentConfig]:
        # Plugin registers capabilities by returning a list of AgentConfig
        config = AgentConfig(
            agent_id="weather_assistant",
            tools={
                "get_current_weather": self._get_weather,
                "get_forecast": self._get_forecast
            },
            prompts={
                "system_prompt": "You are a weather assistant..."
            }
        )
        return [config]

    async def _get_weather(self, city: str) -> dict[str, Any]:
        """Get current weather"""
        # In real projects, this would call a real weather API
        return {
            "city": city,
            "temperature": 25,
            "condition": "Sunny",
            "humidity": 60
        }

    async def _get_forecast(self, city: str, days: int = 3) -> list[dict]:
        """Get weather forecast"""
        return [
            {"day": 1, "high": 28, "low": 18, "condition": "Sunny"},
            {"day": 2, "high": 26, "low": 16, "condition": "Cloudy"},
            {"day": 3, "high": 24, "low": 14, "condition": "Overcast"}
        ][:days]

    # Plugin lifecycle hooks
    async def on_task_start(self, context: AgentContext):
        """Called when task starts"""
        print(f"Task {context.message_id} started")

    async def on_task_complete(self, context: AgentContext, result: Any):
        """Called when task completes successfully"""
        print(f"Task {context.message_id} completed")

    async def on_task_error(self, context: AgentContext, error: Exception):
        """Called when task encounters an error"""
        print(f"Task {context.message_id} error: {error}")

Using Plugins

Method 1: Pass via plugin_list parameter

from by_framework import run_worker
from my_cool_plugin import WeatherPlugin

run_worker(
    worker_class=MyAssistant,
    worker_id="worker-01",
    plugin_list=[WeatherPlugin()]
)

Method 2: Configure via plugin_configurator callback

from by_framework import run_worker
from my_cool_plugin import WeatherPlugin

def configure_plugins(registry):
    registry.register_bundle(WeatherPlugin())

run_worker(
    worker_class=MyAssistant,
    worker_id="worker-01",
    plugin_configurator=configure_plugins
)

Method 3: Load plugin modules from a directory

run_worker(
    worker_class=MyAssistant,
    plugin_dir="./my_plugins"  # Scans .py files in this directory once at startup
)

📡 Sending Tasks

Using ByaiGatewayClient

ByaiGatewayClient is a wrapper around GatewayClient that uses shared Byai codec for message serialization by default, supporting higher-level message protocols.

import asyncio

from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis


async def main():
    redis = init_redis(host="localhost", port=6379)
    registry = WorkerRegistry(redis_client=redis)
    client = ByaiGatewayClient(redis_client=redis, registry=registry)

    response = await client.send_message(
        target_agent_type="weather_agent",
        session_id="session_123",
        user_code="user_123",
        content="Query today's weather in Beijing",
    )

    if response.success:
        print(f"Task sent, message ID: {response.message_id}")
    else:
        print(f"Send failed: {response.error}")

    await close_redis()


asyncio.run(main())

Sending Path Explanation

GatewayClient.send_message(...) has two modes:

  • Default agent type mode: Writes to agent type stream based on target_agent_type, and verifies if an online worker exists when require_online_worker=True.
  • Direct worker mode: When target_worker_id is provided, writes directly to worker stream, suitable for debug or direct control.

This means:

  • response.target_worker_id may be empty in agent type mode, because the actual worker is only determined when a consumer in that agent type reads the message.
  • If canceling a task that has already started executing, the execution registry will fill in the worker_id when the worker truly starts processing.

🧪 Examples

Example 1: Basic Streaming Output

class StreamingAgent(GatewayWorker):
    def get_agent_types(self):
        return ["streaming_demo"]

    async def process_command(self, command, context: AgentContext):
        text = "This is a sample text for streaming output."

        for char in text:
            await context.emit_chunk(char)
            await asyncio.sleep(0.05)

        return {"status": "done"}

Example 2: Registering Plugin Capabilities

Tools, prompts, and skills are registered through the plugin mechanism and exposed through AgentConfig.

from by_framework import AgentConfig, GatewayWorker, Plugin, PluginBuildContext, PluginManifest


class CalculatorPlugin(Plugin):
    def __init__(self):
        super().__init__(PluginManifest(plugin_id="calculator"))

    async def register_agent_configs(
        self, build_context: PluginBuildContext
    ) -> list[AgentConfig]:
        return [
            AgentConfig(
                agent_id="tool_demo",
                tools={"calculate": self.calculate},
            )
        ]

    async def calculate(self, a: float, b: float, op: str) -> float:
        if op == "+":
            return a + b
        if op == "-":
            return a - b
        if op == "*":
            return a * b
        if op == "/":
            return a / b if b != 0 else 0
        return 0


class ToolAgent(GatewayWorker):
    def get_agent_types(self):
        return ["tool_demo"]

    async def process_command(self, command, context: AgentContext):
        config = context.get_agent_config("tool_demo")
        await context.emit_chunk(
            f"Registered tools: {list(config.tools.keys()) if config else []}"
        )
        return {"status": "success"}

🧩 Advanced Capabilities

User-in-the-Loop Flows

Workers can suspend execution and wait for user input through context.ask_user(...). The follow-up reply is delivered back to the worker as a ResumeCommand.

from by_framework import AgentContext, AskUserEvent, GatewayWorker, ResumeCommand


class ApprovalAgent(GatewayWorker):
    def get_agent_types(self):
        return ["approval_agent"]

    async def process_command(self, command, context: AgentContext):
        if isinstance(command, ResumeCommand):
            await context.emit_chunk(f"User replied: {command.content}")
            return {"status": "completed"}

        return await context.ask_user(
            AskUserEvent(prompt="Please confirm the deployment window.")
        )

Scatter-Gather Dispatch

dispatch_group(...) can enqueue multiple subtasks under one task group, and collect_group_results(...) can gather the callback payloads when they finish.

tasks = [
    {"target_agent_type": "researcher", "content": "Collect references"},
    {"target_agent_type": "writer", "content": "Draft the summary"},
]

group = await context.dispatch_group(tasks, wait_for_reply=True)
results = await context.collect_group_results(group["task_group_id"])

Byai Typed Worker Layer

If your business payloads use BaiYing message objects, ByaiWorker and ByaiAgentContext provide typed content decoding/encoding on top of the generic worker runtime.

Service Discovery Utilities

The repository also ships Redis-backed service discovery and a discovery-aware HTTP client:

  • ServiceRegistry for service registration and heartbeat
  • DiscoveryClient for cached service lookup and load balancing
  • DiscoveryHttpClient for node-switching HTTP retries on discovered instances

These utilities live in src/by_framework/core/discovery.py and src/by_framework/util/discovery_http_client.py.


🛠️ Configuration Reference

run_worker Function Parameters

run_worker function supports rich configuration options:

Parameter Type Description Default
worker_class Type[GatewayWorker] Required. Business Worker class. -
worker_id str Unique identifier for Worker instance. "worker-1"
redis_host str Redis server address. "localhost"
redis_port int Redis port. 6379
redis_db int Redis database number. 0
redis_password str Redis password (optional). None
redis_username str Redis username (optional). None
workspace_dir str Local working directory for task execution. "/tmp/gateway-workspace"
consumer_group str Redis consumer group name. "agent_engines"
max_concurrency int Maximum concurrent tasks per Worker. 50
fetch_count int Number of messages to batch fetch from Redis each time. 10
redis_max_connections int Maximum Redis connections. max_concurrency + 10
plugin_list List[Plugin] Explicitly passed plugin list. None
plugin_configurator Callable Plugin configuration callback function. None
plugin_hook_timeout_seconds float Default timeout for plugin hooks. None
plugin_log_hook_stats_on_shutdown bool Whether to log plugin hook stats on shutdown. True
plugin_dir str Directory scanned once at startup for .py plugin modules. None

Environment Variables

Environment Variable Description Default
BYAI_WORKER_CONCURRENCY Maximum concurrency 50
BYAI_WORKER_FETCH_COUNT Batch fetch count 10
BYAI_REDIS_MAX_CONNECTIONS Redis max connections max_concurrency + 10

📚 API Reference

GatewayWorker

class GatewayWorker:
    def get_agent_types(self) -> List[str]:
        """Return list of Agent types this Worker can handle"""
        pass

    async def process_command(self, command, context: AgentContext) -> Any:
        """Process command and return result"""
        pass

AgentContext

class AgentContext:
    session_id: str
    trace_id: str
    current_agent_id: str
    message_id: str
    parent_message_id: str

    async def emit_chunk(self, event: Union[StreamChunkEvent, str], event_type: Optional[str] = None):
        """Send text chunk or streaming event"""

    async def emit_state(self, event: Union[StateChangeEvent, str], event_type: Optional[str] = None):
        """Send state update"""

    async def emit_artifact(self, event: Union[ArtifactEvent, str], event_type: Optional[str] = None):
        """Send artifact/attachment"""

    async def ask_user(self, event: Union[AskUserEvent, str]) -> dict:
        """Send waiting for input request to user"""

    async def call_agent(self, target_agent_type: str, content: object, **kwargs) -> dict:
        """Call other Agent"""

    async def dispatch_group(self, tasks: list[dict], **kwargs) -> dict:
        """Dispatch task group"""

    async def get_active_workers(self) -> Dict[str, Any]:
        """Get all active workers in cluster"""

GatewayClient / ByaiGatewayClient

class GatewayClient:
    async def send_message(
        self,
        target_agent_type: str,
        session_id: str,
        content: Any,
        user_code: str = "",
        action_type: str = "ASK_AGENT",
        metadata: Optional[dict] = None,
        target_worker_id: Optional[str] = None,
        require_online_worker: bool = True,
    ) -> SendMessageResponse:
        """Send message, return response object"""

    async def cancel_task(self, message_id: str, session_id: str, reason: str = "") -> CancelTaskResponse:
        """Cancel specified task"""

🚀 Deployment Guide

Single Machine Deployment

  1. Prepare Environment
# Install dependencies
cd by-framework-python
uv sync
  1. Start Redis
docker run -d --name gateway-redis \
  -p 6379:6379 \
  --restart unless-stopped \
  redis:7-alpine
  1. Start Worker
uv run python -m by_framework \
  --worker-class my_agent.MyAgent \
  --worker-id worker-01 \
  --redis-host localhost

Multi-Worker Deployment

To scale horizontally, run multiple worker processes with different worker_id values while sharing the same Redis instance and target_agent_type streams.

Production Environment Recommendations

  1. Use Connection Pool
run_worker(
    worker_class=MyAgent,
    redis_max_connections=50
)
  1. Configure Monitoring
import logging

from by_framework.common.logger import setup_logging

setup_logging(level=logging.INFO, use_json=True)

FAQ

Q: How to ensure tasks are not lost?

A: Redis Streams provides persistence mechanism. Workers use XACK to acknowledge message processing completion. Unacknowledged messages will be redelivered.

Q: How to implement Worker load balancing?

A: Multiple Workers connect to the same Redis Stream, and Redis automatically performs load distribution among consumers in the consumer group.


🗺️ Roadmap

  • Observability Dashboard: Integrated UI for monitoring worker health and task streams.
  • Advanced Sandbox: WASM-based execution environment for enhanced isolation.
  • Long-term Memory: Native support for vector-database backed session memory.
  • Native LangGraph Integration: Enhanced adapter for complex stateful multi-agent flows.

🤝 Contributing

Issues and Pull Requests are welcome! Please check our CONTRIBUTING.md for details.


📄 License

This project is licensed under Apache 2.0 License - see LICENSE file for details.


Maintained by byai team.

Questions or suggestions? Feel free to contact us!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

by_framework-0.1.10.tar.gz (252.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

by_framework-0.1.10-py3-none-any.whl (116.7 kB view details)

Uploaded Python 3

File details

Details for the file by_framework-0.1.10.tar.gz.

File metadata

  • Download URL: by_framework-0.1.10.tar.gz
  • Upload date:
  • Size: 252.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for by_framework-0.1.10.tar.gz
Algorithm Hash digest
SHA256 117906bed1b89e4a4132c3ebedb5db0556ee1ddf6050d4f004ff24c8f671a987
MD5 61c4188ca794f20a68060aaef857f38d
BLAKE2b-256 4a82f6ebcff58b7fe51122f388370130d599305a9692396986bcc501573dfc47

See more details on using hashes here.

Provenance

The following attestation bundles were made for by_framework-0.1.10.tar.gz:

Publisher: publish.yml on beyonai/by-framework-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file by_framework-0.1.10-py3-none-any.whl.

File metadata

  • Download URL: by_framework-0.1.10-py3-none-any.whl
  • Upload date:
  • Size: 116.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for by_framework-0.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 77ab65422b74a3fede19aa681235172340af895260cd8571de85770b46c90116
MD5 0275656ed79c6df9cfc2440bf185f36f
BLAKE2b-256 614c7cbff4d455267e19348787dc5da3c11be9e0fd1ad4971e0c611f9530df43

See more details on using hashes here.

Provenance

The following attestation bundles were made for by_framework-0.1.10-py3-none-any.whl:

Publisher: publish.yml on beyonai/by-framework-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page