分布式 Agent 调度框架
Project description
by-framework
by-framework is a distributed, high-performance Agent scheduling engine built on Redis Streams. It provides Worker orchestration, session-scoped runtime state, and a plugin-based agent capability system — purpose-built for AI agent systems.
Table of Contents
- Architecture
- Getting Started
- Core Concepts
- Advanced Features
- Sending Tasks
- Extension Libraries
- Configuration Reference
- Development
- Deployment
- Roadmap
- Contributing
- License
Architecture
The system is fully asynchronous and event-driven. Control messages and data events travel on separate Redis Streams, so scaling up Workers does not couple to the data delivery path.
Data Flow
Client ──▶ Redis Control Stream ──▶ GatewayWorker (competitive consume)
(queue:ctrl:{agent_type})
│
▼
Consumer ◀──── Redis Data Stream ◀──── Business Logic (emit chunks/states/artifacts)
Backend (queue:data:{session})
- A client writes a command to the agent-type control stream.
- Any online GatewayWorker subscribing to that agent type competitively pulls the message via Redis consumer groups.
- The Worker processes the task and, through
AgentContext, emits streaming chunks, state changes, and artifacts back to a session-scoped data stream. - A backend or frontend consumer reads that data stream in real time.
Component Hierarchy
| Layer | Component | Source | Role |
|---|---|---|---|
| Client | GatewayClient / ByaiGatewayClient |
client/ |
Publish control commands to Redis. Supports interceptors and cascade cancellation. |
| Scheduler | Redis Streams + consumer groups | (infrastructure) | Competitive consumption, automatic load balancing across Worker replicas. |
| Execution | GatewayWorker / ByaiWorker |
worker/ |
Pulls tasks, executes business logic in isolated workspaces, hooks into plugin lifecycle. |
| Orchestrator | WorkerRunner |
worker/runner.py |
Manages message consumption loop, concurrency semaphore, graceful shutdown. |
| Output | GatewayDataEmitter |
common/emitter.py |
Pushes events to session-scoped data streams with TTL. |
| Registry | WorkerRegistry |
core/registry.py |
Redis-backed worker membership, heartbeats, execution state tracking. |
| Plugin | PluginRegistry |
core/extensions/ |
Plugin discovery, lifecycle hooks, agent config versioning, hot-reload. |
Worker Routing
Three semantic layers govern how tasks reach Workers:
| Layer | Purpose | Update Timing |
|---|---|---|
| Membership | Worker declares supported agent_types via get_agent_types(). |
Startup / graceful shutdown |
| Online / Heartbeat | Redis key with TTL; each Worker refreshes periodically. Only online Workers are valid send targets. | Heartbeat cycle |
| Worker ID Lock | Prevents duplicate startup of the same worker_id. Instance mutex, not used for routing. |
Startup / shutdown |
Production path (agent type routing):
- Client writes to
byai_gateway:ctrl:agent_type:{agent_type}. - Multiple Workers in the same consumer group compete for messages.
- Sender only verifies that at least one online Worker exists for the target agent type.
Debug path (direct worker routing):
- When
target_worker_idis explicitly provided, the message goes tobyai_gateway:ctrl:worker:{worker_id}. - The sender explicitly checks that the target Worker is online.
Redis Key Map
| Key Pattern | Type | Purpose |
|---|---|---|
byai_gateway:ctrl:agent_type:{agent_type} |
Stream | Per-agent-type control queue; competitive consume |
byai_gateway:ctrl:worker:{worker_id} |
Stream | Per-worker control queue; direct routing |
byai_gateway:session:{session_id}:data_stream |
Stream | Session-scoped output events |
byai_gateway:session:{session_id}:registry |
Hash | Execution records for a session |
byai_gateway:task_group:{group_id} |
Hash | Scatter-gather group progress tracker |
byai_gateway:task_group:{group_id}:results |
Hash | Scatter-gather results collection |
byai_gateway:registry:worker:online:{worker_id} |
String (TTL) | Heartbeat lease |
byai_gateway:registry:agent_type:workers:{agent_type} |
Set | Agent type ➜ worker IDs |
byai_gateway:registry:worker:agent_types:{worker_id} |
Set | Worker ➜ agent types |
byai_gateway:agent_configs_snapshot:{key} |
String | Serialized agent config snapshots for durable restart |
byai_gateway:plugin_reload:{id}:ack |
Stream | Hot-reload ACK channel |
Getting Started
Installation
Prerequisites: Python 3.12+, Redis 7.0+
# via pip
pip install by-framework
Optional extension packages:
pip install by-framework-trace-langfuse # Langfuse observability
pip install by-framework-history-postgres # PostgreSQL history backend
pip install by-framework-langgraph # LangGraph integration
Quick Start
1. Define a Worker:
# my_agent.py
from by_framework import GatewayWorker, AgentContext, run_worker
class MyAgent(GatewayWorker):
def get_agent_types(self):
return ["my_agent"]
async def process_command(self, command, context: AgentContext):
await context.emit_chunk("Hello from your agent!")
return {"status": "completed", "content": "Hello from your agent!"}
if __name__ == "__main__":
run_worker(MyAgent, worker_id="worker-01")
2. Start Redis:
docker run -d -p 6379:6379 redis:7-alpine
3. Start the Worker:
uv run python my_agent.py
4. Send a task:
# send_task.py
import asyncio
from by_framework import ByaiGatewayClient, WorkerRegistry, init_redis, close_redis
async def main():
redis = init_redis(host="localhost", port=6379)
registry = WorkerRegistry(redis_client=redis)
client = ByaiGatewayClient(redis_client=redis, registry=registry)
resp = await client.send_message(
target_agent_type="my_agent",
session_id="demo-session",
content="Hello!",
)
print(f"Sent: {resp.message_id}")
await close_redis()
asyncio.run(main())
Core Concepts
GatewayWorker
The abstract base class for all Workers. You implement two methods:
| Method | Required | Purpose |
|---|---|---|
get_agent_types() |
Yes | Returns the list of agent types this Worker handles. Drives routing and worker registration. |
process_command(command, context) |
Yes | Core business logic. Receives a command object and an AgentContext. Return an AgentTaskResult or dict. |
The base class handles, transparently:
- Message lifecycle (parse, decode, acknowledge, persist)
- Workspace provisioning per session/task
- Plugin hook execution (on_task_start, on_task_complete, etc.)
- Sub-agent call orchestration (suspend, resume, cascade cancel)
- History persistence (in-memory or pluggable backend)
AgentContext
The per-task runtime context. Available as the second argument to process_command().
async def process_command(self, command, context: AgentContext):
# Streaming output
await context.emit_chunk("Step 1 complete\n")
# State transitions
await context.emit_state("analyzing")
# Artifacts / structured data
await context.emit_artifact(ArtifactEvent(url="https://example.com/output.json"))
# Call another agent (with optional suspend-and-wait)
reply = await context.call_agent(
target_agent_type="translator",
content="Hello world",
wait_for_reply=True,
)
# Scatter-gather fan-out
group = await context.dispatch_group([
{"target_agent_type": "researcher", "content": "Find references"},
{"target_agent_type": "writer", "content": "Draft summary"},
])
results = await context.collect_group_results(group["task_group_id"])
# Ask the end-user a question (suspends the task)
return await context.ask_user(AskUserEvent(prompt="Approve deployment?"))
Key properties: session_id, trace_id, message_id, parent_message_id, current_agent_id.
Protocol & Messages
Commands and events are defined in core/protocol/. The system supports these command types:
| Command | Purpose |
|---|---|
AskAgentCommand |
Standard task request with content, header, and optional extra_payload. |
ResumeCommand |
Resumes a suspended task (e.g., after ask_user reply). Carries reply_data. |
CancelTaskCommand |
Graceful or forced cancellation. Supports BFS cascade through the task tree. |
ReloadPluginsCommand |
Hot-reload plugins on all Workers without restart. |
Event types emitted to data streams:
| Event | Purpose |
|---|---|
StreamChunkEvent |
Incremental streaming text / reasoning log. |
StateChangeEvent |
Agent state transitions (thinking, completed, failed, etc.). |
ArtifactEvent |
Structured output files, URLs, or attachments. |
AskUserEvent |
Prompt requesting human input (triggers task suspension). |
Message context is carried by MessageHeader (message_id, session_id, trace_id, source_agent_type, target_agent_type, parent_message_id, task_group_id, user_code).
Plugin System
Plugins are the primary extensibility mechanism. They register AgentConfigs that declare tools, prompts, skills, callbacks, and sub-agents — and they hook into the Worker lifecycle.
Writing a Plugin
from by_framework import Plugin, PluginManifest, AgentConfig, PluginBuildContext, AgentContext
class WeatherPlugin(Plugin):
def __init__(self):
super().__init__(PluginManifest(plugin_id="weather", version="1.0.0"))
async def register_agent_configs(self, ctx: PluginBuildContext) -> list[AgentConfig]:
return [
AgentConfig(
agent_id="weather_agent",
tools={
"get_weather": self._get_weather,
},
prompts={
"system": "You are a weather assistant."
},
)
]
async def _get_weather(self, city: str) -> dict:
return {"city": city, "temp": 22, "condition": "sunny"}
# Lifecycle hooks
async def on_task_start(self, context: AgentContext): ...
async def on_task_complete(self, context: AgentContext, result): ...
async def on_task_error(self, context: AgentContext, error: Exception): ...
async def on_task_cancel(self, context: AgentContext, reason: str): ...
async def on_call_agent_start(self, context: AgentContext, target: str, content): ...
async def on_call_agent_complete(self, context: AgentContext, target: str, result): ...
async def on_worker_startup(self): ...
async def on_worker_shutdown(self): ...
Loading Plugins
Three ways to provide plugins to run_worker():
# 1. Explicit list
run_worker(MyAgent, plugin_list=[WeatherPlugin()])
# 2. Configurator callback
def setup(registry):
registry.register_bundle(WeatherPlugin())
run_worker(MyAgent, plugin_configurator=setup)
# 3. Directory scan (startup-time)
run_worker(MyAgent, plugin_dir="./my_plugins")
Plugin Hot-Reload
Send a ReloadPluginsCommand to trigger reload() on all plugins without restarting the Worker process. Config snapshots are versioned and persisted to Redis so that even during a restart, the last known-good configuration is recovered.
Advanced Features
Inter-Agent Calling
A Worker can delegate to another agent via context.call_agent(). When wait_for_reply=True, the current task suspends, the callee runs to completion, and the reply is delivered back to the caller as an AgentTaskResult.
The framework handles:
- Task tree construction (parent/child linking for cascade cancel)
- Callback notification when the callee finishes
- Automatic re-delivery of the reply message
Scatter-Gather Dispatch
Fan out multiple sub-tasks in parallel and collect their results:
group = await context.dispatch_group([
{"target_agent_type": "researcher", "content": "Find papers"},
{"target_agent_type": "analyst", "content": "Summarize findings"},
], wait_for_reply=True)
results = await context.collect_group_results(group["task_group_id"])
for r in results.values():
print(r["content"])
Group progress is tracked in Redis; dispatch_group returns immediately with a task_group_id that can be polled via collect_group_results.
User-in-the-Loop
Suspend a task and wait for human input:
from by_framework import AskUserEvent, ResumeCommand
async def process_command(self, command, context: AgentContext):
if isinstance(command, ResumeCommand):
# This is the user's reply
await context.emit_chunk(f"You said: {command.content}")
return {"status": "completed"}
# Suspend and ask
return await context.ask_user(
AskUserEvent(prompt="What is the target deployment environment?")
)
Service Discovery
Redis-backed service discovery utilities:
| Component | Role |
|---|---|
ServiceRegistry |
Register / deregister / heartbeat for services. |
DiscoveryClient |
Cached service lookup with round-robin load balancing. |
DiscoveryHttpClient |
HTTP client that retries across discovered service nodes on failure. |
Sending Tasks
GatewayClient
from by_framework import GatewayClient, WorkerRegistry, init_redis
redis = init_redis(host="localhost", port=6379)
client = GatewayClient(redis_client=redis, registry=WorkerRegistry(redis_client=redis))
# Send to an agent type
resp = await client.send_message(
target_agent_type="my_agent",
session_id="sess-001",
content="Your task content",
user_code="user-123",
metadata={"priority": "high"},
)
# Cancel a task (including cascade through sub-tasks)
await client.cancel_task(
message_id=resp.message_id,
session_id="sess-001",
reason="User requested",
)
ByaiGatewayClient
A typed wrapper around GatewayClient that automatically serializes content through the Byai codec:
from by_framework import ByaiGatewayClient, BaiYingMessage
client = ByaiGatewayClient(redis_client=redis, registry=registry)
# Automatically encodes BaiYingMessage content
resp = await client.send_message(
target_agent_type="chat_agent",
session_id="sess-001",
content=BaiYingMessage(role="user", content="Hello"),
)
Interceptors
Register request interceptors on the client side for custom pre-processing:
class AuthInterceptor:
async def before_send(self, command, header):
header.metadata["auth_token"] = "..."
return command, header
client = GatewayClient(...)
client.add_interceptor(AuthInterceptor())
Extension Libraries
These are optional workspace member packages shipping alongside the core framework:
| Package | Purpose | Key Dependency |
|---|---|---|
by-framework-trace-langfuse |
Langfuse LLM observability plugin. Auto-discovered at Worker startup if env vars are set. | langfuse |
by-framework-trace-phoenix |
Arize Phoenix tracing integration. | phoenix |
by-framework-history-postgres |
Persistent message history in PostgreSQL via asyncpg. |
asyncpg |
by-framework-history-byclaw |
Byclaw-specific history backend. | — |
by-framework-langgraph |
LangGraph state-graph adapter, worker, and tool bridge. | langgraph, langchain-core |
Tracing providers are auto-discovered via TraceProviderFactory at Worker startup when the corresponding package is installed and environment variables are configured.
Configuration Reference
run_worker() Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
worker_class |
Type[GatewayWorker] |
(required) | Your Worker implementation. |
worker_id |
str |
"worker-1" |
Unique Worker instance ID. |
redis_host |
str |
"localhost" |
Redis host. |
redis_port |
int |
6379 |
Redis port. |
redis_db |
int |
0 |
Redis database number. |
redis_password |
str | None |
None |
Redis password. |
redis_username |
str | None |
None |
Redis username. |
redis_max_connections |
int |
max_concurrency + 10 |
Redis connection pool size. |
workspace_dir |
str |
"/tmp/gateway-workspace" |
Local workspace root for task isolation. |
consumer_group |
str |
"agent_engines" |
Redis Streams consumer group name. |
max_concurrency |
int |
50 |
Max concurrent tasks per Worker. |
fetch_count |
int |
10 |
Batch size for Redis XREADGROUP. |
plugin_list |
list[Plugin] | None |
None |
Explicit plugin instances. |
plugin_configurator |
Callable | None |
None |
Callback for programmatic plugin registration. |
plugin_dir |
str | None |
None |
Directory scanned for .py plugin modules at startup. |
plugin_hook_timeout_seconds |
float | None |
None |
Timeout for individual plugin hooks. |
plugin_log_hook_stats_on_shutdown |
bool |
True |
Log per-hook success/failure stats at shutdown. |
Environment Variables
| Variable | Default | Description |
|---|---|---|
BYAI_WORKER_CONCURRENCY |
50 |
Overrides max_concurrency. |
BYAI_WORKER_FETCH_COUNT |
10 |
Overrides fetch_count. |
BYAI_REDIS_MAX_CONNECTIONS |
max_concurrency + 10 |
Overrides redis_max_connections. |
Development
# Install all workspace dependencies
make install
# Format (isort + ruff + pyink)
make format
# Lint (pylint + ruff)
make lint
# Run all tests
make test
# Run a single test file
uv run pytest tests/worker/test_gateway_worker.py
# Run tests matching a pattern
uv run pytest -k "test_name_pattern"
# Full CI check
make ci
Code style: isort for imports, ruff-format + pyink for formatting, pylint + ruff for linting. Pre-commit hooks run automatically on git commit.
Tests are organized by module under tests/:
tests/common/— Logger, Redis client, config, exceptionstests/core/— Registry, protocol, discoverytests/worker/— Worker, runner, context, processor, emitter, sandboxtests/client/— Client functionalitytests/plugin/— Plugin registry, system, discovery, tracingtests/integration/— End-to-end flows (scatter-gather, callbacks, ask_user)
Deployment
Single Machine
# 1. Start Redis
docker run -d --name by-redis -p 6379:6379 registry:7-alpine
# 2. Start a Worker
python -m by_framework \
--worker-class my_agent.MyAgent \
--worker-id worker-01 \
--redis-host localhost
Horizontal Scaling
Run multiple Worker processes with different worker_id values. They all consume from the same agent-type control stream. Redis consumer groups automatically handle load distribution.
python -m by_framework --worker-class my_agent.MyAgent --worker-id worker-01 &
python -m by_framework --worker-class my_agent.MyAgent --worker-id worker-02 &
python -m by_framework --worker-class my_agent.MyAgent --worker-id worker-03 &
Reliability
- Message persistence: Messages are stored in Redis Streams until explicitly acknowledged (
XACK). Unacknowledged messages are redelivered on Worker restart. - Durable config: Agent config snapshots are persisted to Redis, so a restarted Worker recovers the last-known plugin configuration.
- Gradual shutdown:
WorkerRunnerdrains in-flight tasks before shutting down, acknowledging completed work. - Separate data path: Data output goes to session-scoped streams independently of control, so backend consumers are decoupled from Worker scaling.
Logging
from by_framework.common.logger import setup_logging
import logging
setup_logging(level=logging.INFO, use_json=True) # JSON for log aggregation
Roadmap
- Observability dashboard for Worker health and task streams
- WASM-based sandbox for stronger execution isolation
- Enhanced LangGraph multi-agent orchestration adapter
Contributing
Issues and pull requests are welcome. See CONTRIBUTING.md for guidelines.
License
Apache 2.0 — see LICENSE.
Maintained by the byai team.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file by_framework-0.2.0.tar.gz.
File metadata
- Download URL: by_framework-0.2.0.tar.gz
- Upload date:
- Size: 262.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b0369fca424f67c49b6194921f634bc44e61344a6ea0db0f5d2906a29075f749
|
|
| MD5 |
b0838af40b425d0b2485df0efe572f27
|
|
| BLAKE2b-256 |
6cc0f7b668dfd3f04a93f729386688be1b8f98011395f2db559d0cc0055e138b
|
Provenance
The following attestation bundles were made for by_framework-0.2.0.tar.gz:
Publisher:
publish.yml on beyonai/by-framework-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
by_framework-0.2.0.tar.gz -
Subject digest:
b0369fca424f67c49b6194921f634bc44e61344a6ea0db0f5d2906a29075f749 - Sigstore transparency entry: 1522363825
- Sigstore integration time:
-
Permalink:
beyonai/by-framework-python@e414e7b558c0d8c5e57c6f4c0fad5ab2e2e75a06 -
Branch / Tag:
refs/tags/by-framework-v0.2.0 - Owner: https://github.com/beyonai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e414e7b558c0d8c5e57c6f4c0fad5ab2e2e75a06 -
Trigger Event:
push
-
Statement type:
File details
Details for the file by_framework-0.2.0-py3-none-any.whl.
File metadata
- Download URL: by_framework-0.2.0-py3-none-any.whl
- Upload date:
- Size: 126.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
90f803892f002811ebaa54c54a9ccbc5870848c75de41745b1766023ca5a41a4
|
|
| MD5 |
03b825601c3ff0f4fb4dc95390cf7727
|
|
| BLAKE2b-256 |
8216d7e622501f00c399c6078e5b4d8839ddc517cabede25c9a1bd7a724f77b8
|
Provenance
The following attestation bundles were made for by_framework-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on beyonai/by-framework-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
by_framework-0.2.0-py3-none-any.whl -
Subject digest:
90f803892f002811ebaa54c54a9ccbc5870848c75de41745b1766023ca5a41a4 - Sigstore transparency entry: 1522363851
- Sigstore integration time:
-
Permalink:
beyonai/by-framework-python@e414e7b558c0d8c5e57c6f4c0fad5ab2e2e75a06 -
Branch / Tag:
refs/tags/by-framework-v0.2.0 - Owner: https://github.com/beyonai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@e414e7b558c0d8c5e57c6f4c0fad5ab2e2e75a06 -
Trigger Event:
push
-
Statement type: