Skip to main content

Streaming utilities for agent/LLM systems (RabbitMQ, etc.)

Project description

agent-stream

A modular Python package for real-time agent message streaming and filtering, designed for LLM/AI agent systems. Provides composable stream filters and streaming utilities for RabbitMQ and more.

Features

  • Composable stream filters (by function call, text, source, etc.)
  • Pluggable streaming backends (RabbitMQ, more coming)
  • Clean separation of filter logic and streaming logic
  • Environment-variable-based configuration for managers
  • Extensible for new message types and streaming backends

Installation

pip install agent-stream

Usage

Importing in your code

You can use the package in your code as follows:

Example: Using RabbitMQManager in an application

from agent_stream.managers import RabbitMQManager

rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()
# ... use rabbitmq_manager ...
await rabbitmq_manager.close()

Example: Using filters and streams in an agent service with autogen

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Handoff
from autogen_agentchat.teams import Swarm
from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter
from agent_stream.managers import RabbitMQManager

# Define your agents and team (simplified example)
leader_agent = AssistantAgent(
    "leader_agent",
    tools=[],
    model_client=your_llm_client,
    model_client_stream=True,
    system_message="You are the leader.",
    handoffs=[Handoff(target="coder_agent", name="request_code_implementation")]
)
coder_agent = AssistantAgent(
    "coder_agent",
    tools=[],
    model_client=your_llm_client,
    model_client_stream=True,
    system_message="You are the coder.",
    handoffs=[Handoff(target="leader_agent", name="provide_implementation")]
)
team = Swarm([leader_agent, coder_agent])

# Initialize RabbitMQ manager
rabbitmq_manager = RabbitMQManager()
await rabbitmq_manager.connect()

# Run the team and stream results through RabbitMQStream with filters
result = await RabbitMQStream(
    stream=team.run_stream(task="Your task message"),
    rabbitmq=rabbitmq_manager,
    queue_name="agent-stream-demo",
    filters=[
        FunctionCallStreamFilter("signal_edit_file"),
        FunctionCallStreamFilter("signal_read_file"),
        StreamChunkStreamFilter("leader_agent"),
        StreamChunkStreamFilter("coder_agent")
    ]
)

await rabbitmq_manager.close()

Environment Variables

  • RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD (for RabbitMQManager)

Extending

  • Add new filters in filters/
  • Add new streaming backends in streams/
  • Add new managers in managers/

Filtering

Filtering in agent-stream allows you to process only the messages or events you care about from a stream of agent or LLM outputs. Filters are composable and can be combined using logical operators to create complex filtering logic.

Built-in Filters

  • FunctionCallStreamFilter: Passes through only messages that represent a function/tool call with a specific name.
  • TextMessageStreamFilter: Passes through only messages containing a specific text.
  • StreamChunkStreamFilter: Passes through only messages from a specific agent or stream chunk.

Example: Creating and Combining Filters

from agent_stream.filters import FunctionCallStreamFilter, TextMessageStreamFilter, StreamChunkStreamFilter

# Filter for tool call events with a specific function name
filter1 = FunctionCallStreamFilter("signal_edit_code")

# Filter for messages containing a specific text
filter2 = TextMessageStreamFilter("error")

# Filter for messages from a specific agent
filter3 = StreamChunkStreamFilter("leader_agent")

# Combine filters with | (OR) and & (AND)
combined_filter = (filter1 | filter2) & filter3

You can pass a single filter or a list of filters to RabbitMQStream or other streaming utilities. When multiple filters are provided, a message must pass all filters to be included (logical AND).

Example: Using Filters in a Stream

from agent_stream.streams import RabbitMQStream
from agent_stream.filters import FunctionCallStreamFilter, StreamChunkStreamFilter

result = await RabbitMQStream(
    stream=your_async_stream,
    rabbitmq=rabbitmq_manager,
    queue_name="your-queue-name",
    filters=[
        FunctionCallStreamFilter("signal_edit_file"),
        StreamChunkStreamFilter("leader_agent")
    ]
)

This will only stream messages that are function calls to signal_edit_file and are from the leader_agent.

Custom Filters

You can create your own filters by subclassing the base filter class and implementing the __call__ method.

from agent_stream.filters import BaseStreamFilter

class CustomFilter(BaseStreamFilter):
    def __call__(self, message):
        # Your custom logic here
        return "important" in message.get("content", "")

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_stream-0.1.0.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_stream-0.1.0-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file agent_stream-0.1.0.tar.gz.

File metadata

  • Download URL: agent_stream-0.1.0.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for agent_stream-0.1.0.tar.gz
Algorithm Hash digest
SHA256 acf2d8940f47469067ac00718c6cdfbe14de62c2b64742bb8410a87e28923b69
MD5 16902349d4b7672392d23dec1808b13f
BLAKE2b-256 300dd06518e8d7823dd87d4a8fa7b7e4eb60d2b797f9f2ca12b90f3bd6d355d4

See more details on using hashes here.

File details

Details for the file agent_stream-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: agent_stream-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for agent_stream-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ecdad821a9614fe23f946d552dcfe8adb96daf83350a45e7b13190453a1a3272
MD5 e196746fcf97919e616e6cba71dc9b53
BLAKE2b-256 acd37fe951d747fa995c4b133218286501929962a6c4d8868cf2eb443b82eb12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page