Skip to main content

Streaming utilities for agent/LLM systems (RabbitMQ, etc.)

Project description

agent-stream

A modular Python package for real-time agent message streaming and filtering, designed for LLM/AI agent systems. Provides composable stream filters and streaming utilities for RabbitMQ and more.

Features

  • Composable stream filters (by function call, text, source, etc.)
  • Pluggable streaming backends (RabbitMQ, more coming)
  • Clean separation of filter logic and streaming logic
  • Environment-variable-based configuration for managers
  • Extensible for new message types and streaming backends

Installation

pip install agent-stream

Usage

Importing in your code

You can use the package in your code as follows:

Example: Using RabbitMQManager in an application

from agent_stream.managers import RabbitMQManager

rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()
# ... use rabbitmq_manager ...
await rabbitmq_manager.close()

Example: Using filters and streams in an agent service with autogen

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Handoff
from autogen_agentchat.teams import Swarm
from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter
from agent_stream.managers import RabbitMQManager

# Define your agents and team (simplified example)
leader_agent = AssistantAgent(
    "leader_agent",
    tools=[],
    model_client=your_llm_client,
    model_client_stream=True,
    system_message="You are the leader.",
    handoffs=[Handoff(target="coder_agent", name="request_code_implementation")]
)
coder_agent = AssistantAgent(
    "coder_agent",
    tools=[],
    model_client=your_llm_client,
    model_client_stream=True,
    system_message="You are the coder.",
    handoffs=[Handoff(target="leader_agent", name="provide_implementation")]
)
team = Swarm([leader_agent, coder_agent])

# Initialize RabbitMQ manager
rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()

# Run the team and stream results through RabbitMQStream with filters
result = await RabbitMQStream(
    stream=team.run_stream(task="Your task message"),
    rabbitmq=rabbitmq_manager,
    queue_name="agent-stream-demo",
    filters=[
        FunctionCallStreamFilter("signal_edit_file"),
        FunctionCallStreamFilter("signal_read_file"),
        StreamChunkStreamFilter("leader_agent"),
        StreamChunkStreamFilter("coder_agent")
    ]
)

await rabbitmq_manager.close()

Example: Using SSEStream for Server-Sent Events (SSE)

You can use SSEStream to stream agent or LLM events to clients over HTTP using Server-Sent Events (SSE). This is useful for real-time web applications.

from agent_stream.streams import SSEStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter

# Example usage in an async context (e.g., inside a FastAPI/Starlette endpoint)
async for sse_event in SSEStream(
    stream=team.run_stream(task="Your task message"),
    filters=[
        FunctionCallStreamFilter("signal_edit_file"),
        StreamChunkStreamFilter("leader_agent")
    ]
):
    # sse_event is a string formatted for SSE, e.g. 'event: ...\ndata: ...\nsource: ...\n\n'
    # You can yield this directly in a FastAPI/Starlette StreamingResponse
    print(sse_event)

This will yield SSE-formatted strings for each event, which you can send to the client in real time. Each event includes the event type, data, and source fields.

Environment Variables

  • RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD (for RabbitMQManager)

Extending

  • Add new filters in filters/
  • Add new streaming backends in streams/
  • Add new managers in managers/

Filtering

Filtering in agent-stream allows you to process only the messages or events you care about from a stream of agent or LLM outputs. Filters are composable and can be combined using logical operators to create complex filtering logic.

Built-in Filters

  • FunctionCallStreamFilter: Passes through only messages that represent a function/tool call with a specific name.
  • TextMessageStreamFilter: Passes through only messages containing a specific text.
  • StreamChunkStreamFilter: Passes through only messages from a specific agent or stream chunk.

Example: Creating and Combining Filters

from agent_stream.filters import FunctionCallStreamFilter, TextMessageStreamFilter, StreamChunkStreamFilter

# Filter for tool call events with a specific function name
filter1 = FunctionCallStreamFilter("signal_edit_code")

# Filter for messages containing a specific text
filter2 = TextMessageStreamFilter("error")

# Filter for messages from a specific agent
filter3 = StreamChunkStreamFilter("leader_agent")

# Combine filters with | (OR) and & (AND)
combined_filter = (filter1 | filter2) & filter3

You can pass a single filter or a list of filters to RabbitMQStream or other streaming utilities. When multiple filters are provided, a message must pass all filters to be included (logical AND).

Example: Using Filters in a Stream

from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter

result = await RabbitMQStream(
    stream=your_async_stream,
    rabbitmq=rabbitmq_manager,
    queue_name="your-queue-name",
    filters=[
        FunctionCallStreamFilter("signal_edit_file"),
        StreamChunkStreamFilter("leader_agent")
    ]
)

This will only stream messages that are function calls to signal_edit_file and are from the leader_agent.

Custom Filters

You can create your own filters by subclassing the base filter class and implementing the __call__ method.

from agent_stream.filters import BaseStreamFilter

class CustomFilter(BaseStreamFilter):
    def __call__(self, message):
        # Your custom logic here
        return "important" in message.get("content", "")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_stream-0.2.0.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_stream-0.2.0-py3-none-any.whl (11.1 kB view details)

Uploaded Python 3

File details

Details for the file agent_stream-0.2.0.tar.gz.

File metadata

  • Download URL: agent_stream-0.2.0.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for agent_stream-0.2.0.tar.gz
Algorithm Hash digest
SHA256 55770196b68fddfeeaaaf5d8eb3b13b18b876de623106f49dfc736a0abc8911b
MD5 c00398109dbc0d3188972820e41149e7
BLAKE2b-256 4411c66b45792b811d025c3e4a99e0df1d8982f5fd799d6e81f685282f50b084

See more details on using hashes here.

File details

Details for the file agent_stream-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: agent_stream-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for agent_stream-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 da9cf3a29147cb6d071a2aac71f197514d1088e0217cf3ff8913639cfc463c1b
MD5 2e359178fcadc024feb7288558b96272
BLAKE2b-256 3c82cd5a580c7f1241590cf3a5ba91aa03d8166c69f643b3cdb0013c980c72e3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page