Agent Data Shuttle - Python SDK
Project description
Agent Data Shuttle (ADS) - Python SDK
Agent Data Shuttle (ADS) — The framework that makes your AI agents autonomously react to external events.
ADS Python SDK enables you to build ADS Publishers and Subscribers in Python, allowing your AI agents to react to external events in real time.
It is interoperable with other ADS SDKs (NodeJS/TypeScript, n8n) and supports all publisher-subscriber combinations.
Installation
pip install agentdatashuttle_adspy
Table of Contents
- Overview
- Features
- Architecture
- Prerequisites
- Usage
- Notification Channels
- Types
- Logging
- Contributing
- License
- Contact
Overview
Agent Data Shuttle (ADS) is a framework for connecting event sources (publishers) and AI agents (subscribers) across platforms and languages.
This SDK lets you build Python publishers and subscribers that can interoperate with NodeJS/TypeScript SDKs and n8n.
- Publishers send events (e.g., file uploads, system alerts, support tickets, CRM events, payment processor events, etc.).
- Subscribers (AI agents or workflows) receive and react to those events to take appropriate actions.
All combinations are possible:
- Python Publisher → Python Subscriber
- Python Publisher → NodeJS Subscriber
- Python Publisher → n8n Subscriber
- NodeJS Publisher → Python Subscriber
- NodeJS Publisher → n8n Subscriber
- NodeJS Publisher → NodeJS Subscriber
- n8n Publisher → Python Subscriber
- n8n Publisher → NodeJS Subscriber
- n8n Publisher → n8n Subscriber
Features
- Event-Driven Architecture: Publish and subscribe to events between systems and agents.
- Publisher & Subscriber SDKs: Build both event sources (publishers) and event consumers (subscribers) in Python.
- n8n Integration: Out-of-the-box support for n8n workflows as subscribers or publishers.
- Notification Channels: Send notifications via Email or Slack when agents process events.
More channels coming soon.
- Pluggable Connectors: Connect an ADS Subscriber to multiple ADS Publishers via data connectors.
- Prompt Generation: Automatically generate contextual prompts for AI agents based on event payloads and agent capabilities.
- Type Hints: Strong typing for safer and more maintainable code.
Architecture
- ADS Publisher: Sends events to subscribers via ADS Bridge.
- ADS Bridge: (see ADS Bridge repository) Broadcasts events to connected subscribers.
- ADS Subscriber: Receives ADS events and invokes AI agents or workflows.
- Notification Channels: Email/Slack notifications on event processing.
- Interoperability: Mix Python, NodeJS, and n8n publishers/subscribers.
Prerequisites
Prerequisites for ADS Publisher
-
Python 3.8+
-
RabbitMQ instance
For event queueing and secure event publishing
-
ADS Bridge
For real-time event delivery via Socket.io
You must run the ADS Bridge service which would be the point of connection for subscribers.
More info at: https://github.com/agentdatashuttle/ads-bridge
-
Redis
For handling ADS event delivery to a large number of ADS Subscribers from ADS Bridge
Prerequisites for ADS Subscriber
-
Python 3.8+
-
Email/Slack credentials (Optional)
For using notification channels upon each autonomous agent invocation
-
AI Agent or LLM (for integrating with an AI model and trigger agentic workflows)
Usage
1. ADS Publisher
Publish events to ADS subscribers.
import os
import time
from agentdatashuttle_adspy.core.client import ADSRabbitMQClientParams
from agentdatashuttle_adspy.core.publisher import ADSPublisher
from agentdatashuttle_adspy.models.models import ADSDataPayload
# Step 1: Create ADSRabbitMQClientParams
client_params = ADSRabbitMQClientParams(
host=os.getenv("RABBITMQ_HOST", "localhost"),
username=os.getenv("RABBITMQ_USERNAME", "ads_user"),
password=os.getenv("RABBITMQ_PASSWORD", "ads_password"),
port=int(os.getenv("RABBITMQ_PORT", 5672))
)
# Step 2: Create a ADSPublisher instance
# Example: ADSPublisher for Kubernetes Health monitoring system
publisher = ADSPublisher("KubernetesMonitoring", client_params)
# Step 3: Create a sample ADSDataPayload
payload = ADSDataPayload(
event_name="pod_killed",
event_description="Pod 'payment-service-233ch3' just got killed due to OOMKilled error",
event_data={
"pod": "payment-service-233ch3",
"recorded_memory_usage": "2042Mi",
"limits": "2000Mi"
}
)
time.sleep(2) # Simulate some delay before publishing
# Publish the payload
publisher.publish_event(payload)
print("Event published successfully.")
Tip: Customize the event payload to match your use case, and provide a detailed event_description and as much detail as required in the event_data dictionary for the subscriber AI Agent to take remediation actions with greater confidence and accuracy.
2. ADS Subscriber
Subscribe to events and invoke your AI agent.
import os
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent
# Import ADS Subscriber and add ADS Data Connectors
from agentdatashuttle_adspy.core.subscriber import ADSSubscriber
from agentdatashuttle_adspy.core.dataconnector import ADSDataConnector
from agentdatashuttle_adspy.core.client import ADSBridgeClientParams
from agentdatashuttle_adspy.models.models import ADSDataPayload
from agentdatashuttle_adspy.core.notifications import EmailNotificationChannel, SlackNotificationChannel
# Define the tools for the agent to use
agent_tools = [toolA, toolB, see_k8s_logs_tool]
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
agent = create_react_agent(llm, agent_tools, debug=True)
# Step 1: Define callback function for ADS Subscriber to invoke agent
def invoke_agent(prompt: str, payload: ADSDataPayload) -> str:
print("The ADS payload was:", payload)
# Filter specific events in/out as you desire
if payload.event_name == "container_up":
return "NO AGENT INVOCATION FOR THIS EVENT - CONTAINER UP"
# Invoke your agent with the context enriched prompt generated by Agent Data Shuttle
response = agent.invoke({"messages": [HumanMessage(prompt)]})
# Return final agent response - will be sent to all notification channels for later review
return response["messages"][-1].content
# Step 2: Define ADSBridgeClientParams and corresponding ADSDataConnector
ads_bridge_client_params = ADSBridgeClientParams(
connection_string="http://localhost:9999",
path_prefix="/ads_bridge",
ads_subscribers_pool_id="<a_random_uuid>" # Replace with your actual pool ID to group horizontally scaled replicas of ADS Subscribers
)
data_connector_one = ADSDataConnector(
connector_name="UptimeKumaConnector",
bridge_client_params=ads_bridge_client_params
)
# Step 3: Optionally, add notification channels
email_channel = EmailNotificationChannel(
"<agent_description>",
"<smtp_host>",
"<smtp_port>",
"<smtp_username>",
"<smtp_password>",
"from@example.com",
"to@example.com"
)
slack_channel = SlackNotificationChannel(
"<agent_description>",
"<slack_bot_token>",
"#<your-channel>"
)
# Step 4: Create the ADSSubscriber with the callback function, LLM, and Data Connectors.
# The ADSSubscriber will listen for events from all the data connectors and invoke the agent.
ads_subscriber = ADSSubscriber(
agent_callback_function=invoke_agent,
llm=llm,
agent_description="<agent_description>",
data_connectors=[data_connector_one],
notification_channels=[email_channel, slack_channel]
)
# Step 5: Start the ADSSubscriber to listen for events and invoke the agent.
ads_subscriber.start()
Notification Channels
Send notifications via Email or Slack when events are processed:
from agentdatashuttle_adspy.core.notifications import EmailNotificationChannel, SlackNotificationChannel
email_channel = EmailNotificationChannel(
"<agent_description>",
"<smtp_host>",
"<smtp_port>",
"<smtp_username>",
"<smtp_password>",
"from@example.com",
"to@example.com"
)
slack_channel = SlackNotificationChannel(
"<agent_description>",
"<slack_bot_token>",
"#<your-channel>"
)
Pass these channels to the ADSSubscriber to enable notifications.
Types
All core types are defined in agentdatashuttle_adspy/models/models.py:
Logging
Logging level can be configured via the LOG_LEVEL environment variable with the following values:
| Level | Description |
|---|---|
| error | Critical errors that may cause the app to crash |
| warn | Warnings about potentially harmful situations |
| info | General operational information |
| debug | Debug-level logs for development |
Contributing
Contributions are welcome!
If you have ideas for improvements, bug fixes, or new features, please open a GitHub Issue to discuss or submit a Pull Request (PR).
How to contribute:
- Fork this repository and create your branch from
main. - Make your changes with clear commit messages.
- Ensure your code passes tests.
- Open a Pull Request describing your changes.
If you encounter any bugs or have feature requests, please raise an issue on GitHub.
Thank you for helping improve the Agent Data Shuttle initiative!
License
This project is licensed under the Apache License 2.0.
Contact
For questions or support, please contact
agentdatashuttle@knowyours.co or sudhay2001@gmail.com
For more information about Agent Data Shuttle - https://agentdatashuttle.knowyours.co
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentdatashuttle_adspy-1.0.5.tar.gz.
File metadata
- Download URL: agentdatashuttle_adspy-1.0.5.tar.gz
- Upload date:
- Size: 22.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
130e57f036c81bf7002bfa6691d8a394c4dd6a7e031fc9c6b226f0fe4fc49a1b
|
|
| MD5 |
f40608178d31b11dc88d9c5271b6a45a
|
|
| BLAKE2b-256 |
1633539e2203db41a6baba0b5a18d7a8dc421870b3e991bb364f9fe19bfe3bcd
|
File details
Details for the file agentdatashuttle_adspy-1.0.5-py3-none-any.whl.
File metadata
- Download URL: agentdatashuttle_adspy-1.0.5-py3-none-any.whl
- Upload date:
- Size: 22.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c5f6713ff4ca07f2881eb47492595cbd836164e37b4e1ca086099e32b699ee3a
|
|
| MD5 |
123001efebe9d1e2041ca8e5b36fcc26
|
|
| BLAKE2b-256 |
1aa5cfad906754e89d6a2072bf05de252eaf71764cd7b7d583715e2f3bf4c0fb
|