Skip to main content

Streaming utilities for agent/LLM systems (RabbitMQ, etc.)

Project description

agent-stream

A modular Python package for real-time agent message streaming and filtering, designed for LLM/AI agent systems. Provides composable stream filters and streaming utilities for RabbitMQ and more.

Features

  • Composable stream filters (by function call, text, source, etc.)
  • Pluggable streaming backends (RabbitMQ, more coming)
  • Clean separation of filter logic and streaming logic
  • Environment-variable-based configuration for managers
  • Extensible for new message types and streaming backends

Installation

pip install agent-stream

Usage

Importing in your code

You can use the package in your code as follows:

Example: Using RabbitMQManager in an application

from agent_stream.managers import RabbitMQManager

rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()
# ... use rabbitmq_manager ...
await rabbitmq_manager.close()

Example: Using filters and streams in an agent service with autogen

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Handoff
from autogen_agentchat.teams import Swarm
from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter
from agent_stream.managers import RabbitMQManager

# Define your agents and team (simplified example)
leader_agent = AssistantAgent(
    "leader_agent",
    tools=[],
    model_client=your_llm_client,
    model_client_stream=True,
    system_message="You are the leader.",
    handoffs=[Handoff(target="coder_agent", name="request_code_implementation")]
)
coder_agent = AssistantAgent(
    "coder_agent",
    tools=[],
    model_client=your_llm_client,
    model_client_stream=True,
    system_message="You are the coder.",
    handoffs=[Handoff(target="leader_agent", name="provide_implementation")]
)
team = Swarm([leader_agent, coder_agent])

# Initialize RabbitMQ manager
rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()

# Run the team and stream results through RabbitMQStream with filters
result = await RabbitMQStream(
    stream=team.run_stream(task="Your task message"),
    rabbitmq=rabbitmq_manager,
    queue_name="agent-stream-demo",
    filters=[
        FunctionCallStreamFilter("signal_edit_file"),
        FunctionCallStreamFilter("signal_read_file"),
        StreamChunkStreamFilter("leader_agent"),
        StreamChunkStreamFilter("coder_agent")
    ]
)

await rabbitmq_manager.close()

Environment Variables

  • RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD (for RabbitMQManager)

Extending

  • Add new filters in filters/
  • Add new streaming backends in streams/
  • Add new managers in managers/

Filtering

Filtering in agent-stream allows you to process only the messages or events you care about from a stream of agent or LLM outputs. Filters are composable and can be combined using logical operators to create complex filtering logic.

Built-in Filters

  • FunctionCallStreamFilter: Passes through only messages that represent a function/tool call with a specific name.
  • TextMessageStreamFilter: Passes through only messages containing a specific text.
  • StreamChunkStreamFilter: Passes through only messages from a specific agent or stream chunk.

Example: Creating and Combining Filters

from agent_stream.filters import FunctionCallStreamFilter, TextMessageStreamFilter, StreamChunkStreamFilter

# Filter for tool call events with a specific function name
filter1 = FunctionCallStreamFilter("signal_edit_code")

# Filter for messages containing a specific text
filter2 = TextMessageStreamFilter("error")

# Filter for messages from a specific agent
filter3 = StreamChunkStreamFilter("leader_agent")

# Combine filters with | (OR) and & (AND)
combined_filter = (filter1 | filter2) & filter3

You can pass a single filter or a list of filters to RabbitMQStream or other streaming utilities. When multiple filters are provided, a message must pass all filters to be included (logical AND).

Example: Using Filters in a Stream

from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter

result = await RabbitMQStream(
    stream=your_async_stream,
    rabbitmq=rabbitmq_manager,
    queue_name="your-queue-name",
    filters=[
        FunctionCallStreamFilter("signal_edit_file"),
        StreamChunkStreamFilter("leader_agent")
    ]
)

This will only stream messages that are function calls to signal_edit_file and are from the leader_agent.

Custom Filters

You can create your own filters by subclassing the base filter class and implementing the __call__ method.

from agent_stream.filters import BaseStreamFilter

class CustomFilter(BaseStreamFilter):
    def __call__(self, message):
        # Your custom logic here
        return "important" in message.get("content", "")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_stream-0.1.2.tar.gz (7.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_stream-0.1.2-py3-none-any.whl (9.6 kB view details)

Uploaded Python 3

File details

Details for the file agent_stream-0.1.2.tar.gz.

File metadata

  • Download URL: agent_stream-0.1.2.tar.gz
  • Upload date:
  • Size: 7.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for agent_stream-0.1.2.tar.gz
Algorithm Hash digest
SHA256 d46a6dc4692342706423670fb51c714cdff4f7c0d7c6cc097a83ba9a13a67a8e
MD5 67776ef9fae394a0da8886485f653e1a
BLAKE2b-256 b6dd7d6c4daf084193f61915dfb5de7006bea18bd1cd760031ca3d4fae16db74

See more details on using hashes here.

File details

Details for the file agent_stream-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: agent_stream-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 9.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for agent_stream-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 28a163ff446449c2edc967b8bacb0ea81f7b6d20e6c96172c3d8f2384490677a
MD5 eb956bbda8cef6a7e778a741b90df8c3
BLAKE2b-256 2d484765ee3ac1a5d84f21cd218173aa9e8672eb551cec89aed6fecaa1451570

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page