分布式 Agent 调度框架
Project description
🚀 by-framework-python
by-framework is a distributed Agent scheduling framework built on Redis Streams. It provides worker orchestration, session-scoped runtime state, and plugin-based agent capability registration for AI agent systems.
Table of Contents
- ✨ Key Features
- 🏗️ Architecture
- 📦 Installation
- 🚀 Quick Start
- 💡 Deep Dive
- 🔌 Plugin System
- 📡 Sending Tasks
- 🧪 Examples
- 🛠️ Configuration Reference
- 📚 API Reference
- 🧩 Advanced Capabilities
- 🚀 Deployment Guide
✨ Key Features
- ⚡ Native Async: Built on Python
asyncio, perfectly suited for I/O-intensive Agent tasks. - 🧩 Highly Plugin-Based: Plugin system with agent config registration for tools, prompts, skills, and lifecycle hooks.
- 📊 State Management: Complete
AgentContextsupport for easy streaming output, state transitions, and artifact handling. - 🔄 Decoupled Architecture: Separates control streams from data streams for scalable Worker orchestration.
- 🎯 Agent Type Routing: Workers declare supported
agent_typesviaget_agent_types()for routing and liveness checks.
🏗️ Architecture
The system uses event-driven design with high decoupling:
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Client │──────▶│ Redis Input │──────▶│ Gateway │
│ (Gateway) │ │ MQ │ │ Worker │
└─────────────┘ └──────────────┘ └──────┬───────┘
▲ │
│ │
│ ▼
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Consumer │◀─────│ Redis Data │◀─────│ Business │
│ / Backend │ │ Streams │ │ Logic │
└─────────────┘ └──────────────┘ └──────────────┘
Core Components
- Access Layer:
GatewayClientpublishes control commands to Redis Input MQ. - Scheduling Layer: Uses Redis Stream for competitive consumption and routing among Worker clusters.
- Execution Layer:
GatewayWorkeractively pulls tasks and executes business logic in isolated workspaces. - Output Layer: Data is asynchronously pushed to session-scoped data streams for downstream consumers.
Worker Routing Semantics
There are three layers of routing semantics:
- membership: Worker declares which
agent_typesit supports. This is a static relationship, only updated at startup and graceful shutdown. - online / heartbeat: Whether the Worker can currently accept tasks. Only online Workers are considered valid send targets.
- worker_id lock: Prevents duplicate startup of the same
worker_id. This is instance mutex, not part of agent type routing.
Production main path uses agent type stream:
- Client writes to
byai_gateway:ctrl:agent_type:{agent_type} - Multiple Workers under the same agent type consume competitively via Redis consumer group
- Only checks "whether at least one online Worker exists for this agent type" before sending
- Does not pre-select a specific Worker before sending
Debug or direct control path uses worker stream:
- When
target_worker_idis explicitly provided, messages are written tobyai_gateway:ctrl:worker:{worker_id} - This path is only for debug, direct dispatch, or worker-level control commands
- Explicitly checks if the worker is online before sending
Data Flow
User Request
↓
Gateway (write to control stream)
↓
Worker (consume control stream, process task)
↓
Redis Stream (write to session data stream)
↓
Backend or consumer (read session data stream)
↓
Frontend (render real-time AI response)
📦 Installation
Prerequisites
- Python 3.12+
- Redis 7.0+ (for message queue)
Install via pip
# Basic installation
pip install by-framework
# Development mode installation
pip install -e ".[dev]"
Install via uv (Recommended)
# Clone the project and install all dependencies
cd by-framework-python
uv sync
Workspace Development
This repository uses uv workspace to manage local package development.
Common commands:
# Sync workspace dependencies in repository root
uv sync
# Run package tests
uv run pytest tests
🚀 Quick Start
1. Create a Simple Agent Worker
Create my_agent.py:
import asyncio
from by_framework import GatewayWorker, AgentContext, run_worker
class MyAssistant(GatewayWorker):
def get_agent_types(self):
# Declare the Agent types this Worker can handle
return ["weather_agent", "chat_agent"]
async def process_command(self, command, context: AgentContext):
# Send streaming text chunks
await context.emit_chunk("Processing your request...\n")
# Simulate time-consuming operation
await asyncio.sleep(0.5)
# Update task state
await context.emit_state("thinking")
# Read request content from the command payload
user_input = (
command.content if isinstance(command.content, str) else str(command.content)
)
# Send thinking process
await context.emit_chunk(f"I received: {user_input}\n")
await asyncio.sleep(0.3)
# Send final result
await context.emit_chunk("This is my reply!")
return {
"status": "success",
"message": "Task completed",
"data": {"answer": "The weather is sunny today"}
}
if __name__ == "__main__":
run_worker(
worker_class=MyAssistant,
worker_id="worker-01",
redis_host="localhost",
redis_port=6379,
)
2. Start Redis
docker run -d -p 6379:6379 redis:7-alpine
3. Start Worker
cd by-framework-python
uv run python my_agent.py
4. Send a Test Task
Create send_task.py:
import asyncio
from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis
async def send_task():
# Initialize shared Redis client and registry
redis = init_redis(host="localhost", port=6379)
registry = WorkerRegistry(redis_client=redis)
# ByaiGatewayClient accepts redis_client / registry
client = ByaiGatewayClient(redis_client=redis, registry=registry)
response = await client.send_message(
target_agent_type="weather_agent",
session_id="session-001",
content="How's the weather in Beijing today?",
)
if response.success:
print(f"Task sent, message ID: {response.message_id}")
else:
print(f"Send failed: {response.error}")
await close_redis()
asyncio.run(send_task())
Run:
uv run python send_task.py
💡 Deep Dive
GatewayWorker Base Class
GatewayWorker is the base class for all custom Workers. You need to implement the following methods:
| Method | Required | Description |
|---|---|---|
get_agent_types() |
Yes | Returns list of Agent types this Worker can handle |
process_command(command, context) |
Yes | Handle specific business logic |
AgentContext
AgentContext provides the ability to interact with the runtime environment:
from by_framework import AgentContext, ArtifactEvent
async def process_command(self, command, context: AgentContext):
# 1. Send streaming output
await context.emit_chunk("Processing...")
# 2. Send artifacts/structured data
await context.emit_artifact(ArtifactEvent(url="https://example.com/result.json"))
# 3. Get message ID and session ID
msg_id = context.message_id
session_id = context.session_id
# 4. Call other Agents (supports suspending current task waiting for reply)
result = await context.call_agent(
target_agent_type="translator_agent",
content="Hello",
wait_for_reply=True
)
Commands and Message Protocol
AskAgentCommand (Task Command)
from by_framework.core.protocol.commands import AskAgentCommand
from by_framework.core.protocol.message_header import MessageHeader
command = AskAgentCommand(
header=MessageHeader(
message_id="msg_123",
session_id="sess_456",
target_agent_type="weather_agent"
),
content="Query Beijing weather",
extra_payload={
"location": "Beijing"
}
)
Common EventType Values
| Event Type | Description |
|---|---|
answerDelta |
Incremental answer content |
reasoningLogDelta |
Reasoning or intermediate log output |
appStreamResponse |
Marks stream completion |
taskCreate |
Task creation related event |
taskStop |
Task termination related event |
🔌 Plugin System
Plugins are the foundation of By-Framework's extensibility. You can register tools, prompt templates, etc. through plugins.
Plugin Directory Structure
my_plugins/
├── weather_plugin.py
├── calculator_plugin.py
└── custom_hooks.py
Writing a Plugin
Create my_cool_plugin.py:
from by_framework import AgentConfig, AgentContext, Plugin, PluginBuildContext, PluginManifest
from typing import Any
class WeatherPlugin(Plugin):
def __init__(self):
super().__init__(PluginManifest(
plugin_id="weather_plugin",
version="1.0.0",
))
async def register_agent_configs(self, build_context: PluginBuildContext) -> list[AgentConfig]:
# Plugin registers capabilities by returning a list of AgentConfig
config = AgentConfig(
agent_id="weather_assistant",
tools={
"get_current_weather": self._get_weather,
"get_forecast": self._get_forecast
},
prompts={
"system_prompt": "You are a weather assistant..."
}
)
return [config]
async def _get_weather(self, city: str) -> dict[str, Any]:
"""Get current weather"""
# In real projects, this would call a real weather API
return {
"city": city,
"temperature": 25,
"condition": "Sunny",
"humidity": 60
}
async def _get_forecast(self, city: str, days: int = 3) -> list[dict]:
"""Get weather forecast"""
return [
{"day": 1, "high": 28, "low": 18, "condition": "Sunny"},
{"day": 2, "high": 26, "low": 16, "condition": "Cloudy"},
{"day": 3, "high": 24, "low": 14, "condition": "Overcast"}
][:days]
# Plugin lifecycle hooks
async def on_task_start(self, context: AgentContext):
"""Called when task starts"""
print(f"Task {context.message_id} started")
async def on_task_complete(self, context: AgentContext, result: Any):
"""Called when task completes successfully"""
print(f"Task {context.message_id} completed")
async def on_task_error(self, context: AgentContext, error: Exception):
"""Called when task encounters an error"""
print(f"Task {context.message_id} error: {error}")
Using Plugins
Method 1: Pass via plugin_list parameter
from by_framework import run_worker
from my_cool_plugin import WeatherPlugin
run_worker(
worker_class=MyAssistant,
worker_id="worker-01",
plugin_list=[WeatherPlugin()]
)
Method 2: Configure via plugin_configurator callback
from by_framework import run_worker
from my_cool_plugin import WeatherPlugin
def configure_plugins(registry):
registry.register_bundle(WeatherPlugin())
run_worker(
worker_class=MyAssistant,
worker_id="worker-01",
plugin_configurator=configure_plugins
)
Method 3: Load plugin modules from a directory
run_worker(
worker_class=MyAssistant,
plugin_dir="./my_plugins" # Scans .py files in this directory once at startup
)
📡 Sending Tasks
Using ByaiGatewayClient
ByaiGatewayClient is a wrapper around GatewayClient that uses shared Byai codec for message serialization by default, supporting higher-level message protocols.
import asyncio
from by_framework import ByaiGatewayClient, WorkerRegistry, close_redis, init_redis
async def main():
redis = init_redis(host="localhost", port=6379)
registry = WorkerRegistry(redis_client=redis)
client = ByaiGatewayClient(redis_client=redis, registry=registry)
response = await client.send_message(
target_agent_type="weather_agent",
session_id="session_123",
user_code="user_123",
content="Query today's weather in Beijing",
)
if response.success:
print(f"Task sent, message ID: {response.message_id}")
else:
print(f"Send failed: {response.error}")
await close_redis()
asyncio.run(main())
Sending Path Explanation
GatewayClient.send_message(...) has two modes:
- Default agent type mode: Writes to agent type stream based on
target_agent_type, and verifies if an online worker exists whenrequire_online_worker=True. - Direct worker mode: When
target_worker_idis provided, writes directly to worker stream, suitable for debug or direct control.
This means:
response.target_worker_idmay be empty in agent type mode, because the actual worker is only determined when a consumer in that agent type reads the message.- If canceling a task that has already started executing, the execution registry will fill in the
worker_idwhen the worker truly starts processing.
🧪 Examples
Example 1: Basic Streaming Output
class StreamingAgent(GatewayWorker):
def get_agent_types(self):
return ["streaming_demo"]
async def process_command(self, command, context: AgentContext):
text = "This is a sample text for streaming output."
for char in text:
await context.emit_chunk(char)
await asyncio.sleep(0.05)
return {"status": "done"}
Example 2: Registering Plugin Capabilities
Tools, prompts, and skills are registered through the plugin mechanism and exposed through AgentConfig.
from by_framework import AgentConfig, GatewayWorker, Plugin, PluginBuildContext, PluginManifest
class CalculatorPlugin(Plugin):
def __init__(self):
super().__init__(PluginManifest(plugin_id="calculator"))
async def register_agent_configs(
self, build_context: PluginBuildContext
) -> list[AgentConfig]:
return [
AgentConfig(
agent_id="tool_demo",
tools={"calculate": self.calculate},
)
]
async def calculate(self, a: float, b: float, op: str) -> float:
if op == "+":
return a + b
if op == "-":
return a - b
if op == "*":
return a * b
if op == "/":
return a / b if b != 0 else 0
return 0
class ToolAgent(GatewayWorker):
def get_agent_types(self):
return ["tool_demo"]
async def process_command(self, command, context: AgentContext):
config = context.get_agent_config("tool_demo")
await context.emit_chunk(
f"Registered tools: {list(config.tools.keys()) if config else []}"
)
return {"status": "success"}
🧩 Advanced Capabilities
User-in-the-Loop Flows
Workers can suspend execution and wait for user input through context.ask_user(...). The follow-up reply is delivered back to the worker as a ResumeCommand.
from by_framework import AgentContext, AskUserEvent, GatewayWorker, ResumeCommand
class ApprovalAgent(GatewayWorker):
def get_agent_types(self):
return ["approval_agent"]
async def process_command(self, command, context: AgentContext):
if isinstance(command, ResumeCommand):
await context.emit_chunk(f"User replied: {command.content}")
return {"status": "completed"}
return await context.ask_user(
AskUserEvent(prompt="Please confirm the deployment window.")
)
Scatter-Gather Dispatch
dispatch_group(...) can enqueue multiple subtasks under one task group, and collect_group_results(...) can gather the callback payloads when they finish.
tasks = [
{"target_agent_type": "researcher", "content": "Collect references"},
{"target_agent_type": "writer", "content": "Draft the summary"},
]
group = await context.dispatch_group(tasks, wait_for_reply=True)
results = await context.collect_group_results(group["task_group_id"])
Byai Typed Worker Layer
If your business payloads use BaiYing message objects, ByaiWorker and ByaiAgentContext provide typed content decoding/encoding on top of the generic worker runtime.
Service Discovery Utilities
The repository also ships Redis-backed service discovery and a discovery-aware HTTP client:
ServiceRegistryfor service registration and heartbeatDiscoveryClientfor cached service lookup and load balancingDiscoveryHttpClientfor node-switching HTTP retries on discovered instances
These utilities live in src/by_framework/core/discovery.py and src/by_framework/util/discovery_http_client.py.
🛠️ Configuration Reference
run_worker Function Parameters
run_worker function supports rich configuration options:
| Parameter | Type | Description | Default |
|---|---|---|---|
worker_class |
Type[GatewayWorker] |
Required. Business Worker class. | - |
worker_id |
str |
Unique identifier for Worker instance. | "worker-1" |
redis_host |
str |
Redis server address. | "localhost" |
redis_port |
int |
Redis port. | 6379 |
redis_db |
int |
Redis database number. | 0 |
redis_password |
str |
Redis password (optional). | None |
redis_username |
str |
Redis username (optional). | None |
workspace_dir |
str |
Local working directory for task execution. | "/tmp/gateway-workspace" |
consumer_group |
str |
Redis consumer group name. | "agent_engines" |
max_concurrency |
int |
Maximum concurrent tasks per Worker. | 50 |
fetch_count |
int |
Number of messages to batch fetch from Redis each time. | 10 |
redis_max_connections |
int |
Maximum Redis connections. | max_concurrency + 10 |
plugin_list |
List[Plugin] |
Explicitly passed plugin list. | None |
plugin_configurator |
Callable |
Plugin configuration callback function. | None |
plugin_hook_timeout_seconds |
float |
Default timeout for plugin hooks. | None |
plugin_log_hook_stats_on_shutdown |
bool |
Whether to log plugin hook stats on shutdown. | True |
plugin_dir |
str |
Directory scanned once at startup for .py plugin modules. |
None |
Environment Variables
| Environment Variable | Description | Default |
|---|---|---|
BYAI_WORKER_CONCURRENCY |
Maximum concurrency | 50 |
BYAI_WORKER_FETCH_COUNT |
Batch fetch count | 10 |
BYAI_REDIS_MAX_CONNECTIONS |
Redis max connections | max_concurrency + 10 |
📚 API Reference
GatewayWorker
class GatewayWorker:
def get_agent_types(self) -> List[str]:
"""Return list of Agent types this Worker can handle"""
pass
async def process_command(self, command, context: AgentContext) -> Any:
"""Process command and return result"""
pass
AgentContext
class AgentContext:
session_id: str
trace_id: str
current_agent_id: str
message_id: str
parent_message_id: str
async def emit_chunk(self, event: Union[StreamChunkEvent, str], event_type: Optional[str] = None):
"""Send text chunk or streaming event"""
async def emit_state(self, event: Union[StateChangeEvent, str], event_type: Optional[str] = None):
"""Send state update"""
async def emit_artifact(self, event: Union[ArtifactEvent, str], event_type: Optional[str] = None):
"""Send artifact/attachment"""
async def ask_user(self, event: Union[AskUserEvent, str]) -> dict:
"""Send waiting for input request to user"""
async def call_agent(self, target_agent_type: str, content: object, **kwargs) -> dict:
"""Call other Agent"""
async def dispatch_group(self, tasks: list[dict], **kwargs) -> dict:
"""Dispatch task group"""
async def get_active_workers(self) -> Dict[str, Any]:
"""Get all active workers in cluster"""
GatewayClient / ByaiGatewayClient
class GatewayClient:
async def send_message(
self,
target_agent_type: str,
session_id: str,
content: Any,
user_code: str = "",
action_type: str = "ASK_AGENT",
metadata: Optional[dict] = None,
target_worker_id: Optional[str] = None,
require_online_worker: bool = True,
) -> SendMessageResponse:
"""Send message, return response object"""
async def cancel_task(self, message_id: str, session_id: str, reason: str = "") -> CancelTaskResponse:
"""Cancel specified task"""
🚀 Deployment Guide
Single Machine Deployment
- Prepare Environment
# Install dependencies
cd by-framework-python
uv sync
- Start Redis
docker run -d --name gateway-redis \
-p 6379:6379 \
--restart unless-stopped \
redis:7-alpine
- Start Worker
uv run python -m by_framework \
--worker-class my_agent.MyAgent \
--worker-id worker-01 \
--redis-host localhost
Multi-Worker Deployment
To scale horizontally, run multiple worker processes with different worker_id values while sharing the same Redis instance and target_agent_type streams.
Production Environment Recommendations
- Use Connection Pool
run_worker(
worker_class=MyAgent,
redis_max_connections=50
)
- Configure Monitoring
import logging
from by_framework.common.logger import setup_logging
setup_logging(level=logging.INFO, use_json=True)
FAQ
Q: How to ensure tasks are not lost?
A: Redis Streams provides persistence mechanism. Workers use XACK to acknowledge message processing completion. Unacknowledged messages will be redelivered.
Q: How to implement Worker load balancing?
A: Multiple Workers connect to the same Redis Stream, and Redis automatically performs load distribution among consumers in the consumer group.
🗺️ Roadmap
- Observability Dashboard: Integrated UI for monitoring worker health and task streams.
- Advanced Sandbox: WASM-based execution environment for enhanced isolation.
- Long-term Memory: Native support for vector-database backed session memory.
- Native LangGraph Integration: Enhanced adapter for complex stateful multi-agent flows.
🤝 Contributing
Issues and Pull Requests are welcome! Please check our CONTRIBUTING.md for details.
📄 License
This project is licensed under Apache 2.0 License - see LICENSE file for details.
Maintained by byai team.
Questions or suggestions? Feel free to contact us!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file by_framework-0.1.10.tar.gz.
File metadata
- Download URL: by_framework-0.1.10.tar.gz
- Upload date:
- Size: 252.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
117906bed1b89e4a4132c3ebedb5db0556ee1ddf6050d4f004ff24c8f671a987
|
|
| MD5 |
61c4188ca794f20a68060aaef857f38d
|
|
| BLAKE2b-256 |
4a82f6ebcff58b7fe51122f388370130d599305a9692396986bcc501573dfc47
|
Provenance
The following attestation bundles were made for by_framework-0.1.10.tar.gz:
Publisher:
publish.yml on beyonai/by-framework-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
by_framework-0.1.10.tar.gz -
Subject digest:
117906bed1b89e4a4132c3ebedb5db0556ee1ddf6050d4f004ff24c8f671a987 - Sigstore transparency entry: 1316912101
- Sigstore integration time:
-
Permalink:
beyonai/by-framework-python@dd5f6a6624931e8aa91b9e09224a02c82006fc9b -
Branch / Tag:
refs/tags/by-framework-v0.1.10 - Owner: https://github.com/beyonai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@dd5f6a6624931e8aa91b9e09224a02c82006fc9b -
Trigger Event:
push
-
Statement type:
File details
Details for the file by_framework-0.1.10-py3-none-any.whl.
File metadata
- Download URL: by_framework-0.1.10-py3-none-any.whl
- Upload date:
- Size: 116.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
77ab65422b74a3fede19aa681235172340af895260cd8571de85770b46c90116
|
|
| MD5 |
0275656ed79c6df9cfc2440bf185f36f
|
|
| BLAKE2b-256 |
614c7cbff4d455267e19348787dc5da3c11be9e0fd1ad4971e0c611f9530df43
|
Provenance
The following attestation bundles were made for by_framework-0.1.10-py3-none-any.whl:
Publisher:
publish.yml on beyonai/by-framework-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
by_framework-0.1.10-py3-none-any.whl -
Subject digest:
77ab65422b74a3fede19aa681235172340af895260cd8571de85770b46c90116 - Sigstore transparency entry: 1316912121
- Sigstore integration time:
-
Permalink:
beyonai/by-framework-python@dd5f6a6624931e8aa91b9e09224a02c82006fc9b -
Branch / Tag:
refs/tags/by-framework-v0.1.10 - Owner: https://github.com/beyonai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@dd5f6a6624931e8aa91b9e09224a02c82006fc9b -
Trigger Event:
push
-
Statement type: