Skip to main content

Agent Data Shuttle - Python SDK

Project description

Agent Data Shuttle (ADS) - Python SDK

Agent Data Shuttle (ADS) — The framework that makes your AI agents autonomously react to external events.

ADS Python SDK enables you to build ADS Publishers and Subscribers in Python, allowing your AI agents to react to external events in real time. It is interoperable with other ADS SDKs (NodeJS/TypeScript, n8n) and supports all publisher-subscriber combinations.


Installation

pip install agentdatashuttle_adspy

Table of Contents


Overview

Agent Data Shuttle (ADS) is a framework for connecting event sources (publishers) and AI agents (subscribers) across platforms and languages. This SDK lets you build Python publishers and subscribers that can interoperate with NodeJS/TypeScript SDKs and n8n.

  • Publishers send events (e.g., file uploads, system alerts, support tickets, CRM events, etc.).
  • Subscribers (AI agents or workflows) receive and react to those events to take appropriate actions.

All combinations are possible:

  • Python Publisher → Python Subscriber
  • Python Publisher → NodeJS Subscriber
  • Python Publisher → n8n Subscriber
  • NodeJS Publisher → Python Subscriber
  • NodeJS Publisher → n8n Subscriber
  • NodeJS Publisher → NodeJS Subscriber
  • n8n Publisher → Python Subscriber
  • n8n Publisher → NodeJS Subscriber
  • n8n Publisher → n8n Subscriber

Features

  • Event-Driven Architecture: Publish and subscribe to events between systems and agents.
  • Publisher & Subscriber SDKs: Build both event sources (publishers) and event consumers (subscribers) in Python.
  • n8n Integration: Out-of-the-box support for n8n workflows as subscribers.
  • Notification Channels: Send notifications via Email or Slack when agents process events.
  • Pluggable Connectors: Connect an ADS Subscriber to multiple ADS Publishers via data connectors.
  • Prompt Generation: Automatically generate contextual prompts for AI agents based on event payloads and agent capabilities.
  • Type Hints: Strong typing for safer and more maintainable code.

Architecture

  • ADS Publisher: Sends events to subscribers via ADS Bridge.
  • ADS Bridge: (see ADS Bridge repository) Broadcasts events to connected subscribers.
  • ADS Subscriber: Receives ADS events and invokes AI agents or workflows.
  • Notification Channels: Email/Slack notifications on event processing.
  • Interoperability: Mix Python, NodeJS, and n8n publishers/subscribers.

Architecture Diagram


Prerequisites

Prerequisites for ADS Publisher

  • Python 3.8+
  • RabbitMQ instance (for event queueing and secure event publishing)
  • ADS Bridge (for real-time event delivery via Socket.io).
    You must run the ADS Bridge service which would be the point of connection for subscribers. More info at: https://github.com/agentdatashuttle/ads-bridge
  • Redis (for handling ADS event delivery to a large number of ADS Subscribers from ADS Bridge)

Prerequisites for ADS Subscriber

  • Python 3.8+
  • Email/Slack credentials (if using notification channels)
  • Redis (for queuing ADS events and prevent overwhelmed agent invocations)
  • AI Agent or LLM (for integrating with an AI model and trigger agentic workflows)

Usage

1. ADS Publisher

Publish events to ADS subscribers.

import os
import time
from agentdatashuttle_adspy.core.publisher import ADSPublisher
from agentdatashuttle_adspy.core.client import ADSClientParams
from agentdatashuttle_adspy.models.models import ADSDataPayload

client_params = ADSClientParams(
    host=os.getenv("ADS_HOST", "localhost"),
    username=os.getenv("ADS_USERNAME", "ads_user"),
    password=os.getenv("ADS_PASSWORD", "ads_password"),
    port=int(os.getenv("ADS_PORT", 5672))
)

publisher = ADSPublisher("UptimeKumaEvents", client_params)

payload = ADSDataPayload(
    event_name="container_down",
    event_description="the argocd service is down",
    event_data={
        "timestamp": "23-04-2004",
        "memory_captured_last": "2042Mi"
    }
)

time.sleep(2)  # Simulate some delay before publishing
publisher.publish_event(payload)
print("Event published successfully.")

Tip: Customize the event payload to match your use case, and provide a detailed event_description and as much detail as required in the event_data dictionary for the destination AI Agent to take remediation actions with greater confidence.


2. ADS Subscriber

Subscribe to events and invoke your AI agent.

import os
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
from langgraph.prebuilt import create_react_agent
from agentdatashuttle_adspy.core.subscriber import ADSSubscriber
from agentdatashuttle_adspy.core.dataconnector import ADSDataConnector
from agentdatashuttle_adspy.core.client import ADSBridgeClientParams
from agentdatashuttle_adspy.models.models import ADSDataPayload
from agentdatashuttle_adspy.core.notifications import EmailNotificationChannel, SlackNotificationChannel

# Define the tools for the agent to use
agent_tools = [toolA, toolB, see_k8s_logs_tool]
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")

agent = create_react_agent(llm, agent_tools, debug=True)

def invoke_agent(prompt: str, payload: ADSDataPayload) -> str:
    print("The payload was:", payload)
    if payload.event_name == "container_up":
        return "NO INVOCATION FOR THIS EVENT - CONTAINER UP"
    response = agent.invoke({"messages": [HumanMessage(prompt)]})
    return response["messages"][-1].content

ads_bridge_client_params = ADSBridgeClientParams(
    connection_string="http://localhost:9999",
    path_prefix="/ads_bridge",
    ads_subscribers_pool_id="<a_random_uuid>"  # Replace with your actual pool ID to group horizontally scaled replicas of ADS Subscribers
)

data_connector_one = ADSDataConnector(
    connector_name="UptimeKumaConnector",
    bridge_client_params=ads_bridge_client_params
)

# Optionally, add notification channels
email_channel = EmailNotificationChannel(
    "Agent description",
    "smtp.example.com",
    587,
    "smtp_user",
    os.getenv("EMAIL_SMTP_PASSWORD", ""),
    "from@example.com",
    "to@example.com"
)

slack_channel = SlackNotificationChannel(
    "Agent description",
    os.getenv("SLACK_BOT_TOKEN", ""),
    "#your-channel"
)

ads_subscriber = ADSSubscriber(
    agent_callback_function=invoke_agent,
    llm=llm,
    agent_description="Agent description",
    data_connectors=[data_connector_one],
    notification_channels=[email_channel, slack_channel]
)

ads_subscriber.start()

3. n8n Integration

Use the n8n nodes provided in the via-rabbitmq/n8n/nodes and via-bridge/n8n/nodes folders to integrate with n8n workflows.


Notification Channels

Send notifications via Email or Slack when events are processed:

from agentdatashuttle_adspy.core.notifications import EmailNotificationChannel, SlackNotificationChannel

email_channel = EmailNotificationChannel(
    "Agent description",
    "smtp.example.com",
    587,
    "smtp_user",
    "smtp_pass",
    "from@example.com",
    "to@example.com"
)

slack_channel = SlackNotificationChannel(
    "Agent description",
    "xoxb-your-slack-bot-token",
    "#your-channel"
)

Pass these channels to the ADSSubscriber to enable notifications.


Types

All core types are defined in agentdatashuttle_adspy/models/models.py:


Logging

Logging level can be configured via the LOG_LEVEL environment variable with the following values:

Level Description
error Critical errors that may cause the app to crash
warn Warnings about potentially harmful situations
info General operational information
debug Debug-level logs for development

Contributing

Contributions are welcome!

If you have ideas for improvements, bug fixes, or new features, please open a GitHub Issue to discuss or submit a Pull Request (PR).

How to contribute:

  1. Fork this repository and create your branch from main.
  2. Make your changes with clear commit messages.
  3. Ensure your code passes tests.
  4. Open a Pull Request describing your changes.

If you encounter any bugs or have feature requests, please raise an issue on GitHub.

Thank you for helping improve the Agent Data Shuttle initiative!


License

This project is licensed under the Apache License 2.0.


Contact

For questions or support, please contact
agentdatashuttle@knowyours.co or sudhay2001@gmail.com

For more information about Agent Data Shuttle - https://agentdatashuttle.knowyours.co

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agentdatashuttle_adspy-1.0.4.tar.gz (22.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agentdatashuttle_adspy-1.0.4-py3-none-any.whl (22.0 kB view details)

Uploaded Python 3

File details

Details for the file agentdatashuttle_adspy-1.0.4.tar.gz.

File metadata

  • Download URL: agentdatashuttle_adspy-1.0.4.tar.gz
  • Upload date:
  • Size: 22.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.5

File hashes

Hashes for agentdatashuttle_adspy-1.0.4.tar.gz
Algorithm Hash digest
SHA256 1b34d54b58dab60a249c69f30645a5e8ec8c8b7c2dcb89c387f3a57107925ff4
MD5 31f6ce99ac17a1a5802ea7eb7c03dbef
BLAKE2b-256 da6eeff86f12d956256db669e0ffa8bd3ee11a812dd341e07e3377a12653dc24

See more details on using hashes here.

File details

Details for the file agentdatashuttle_adspy-1.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for agentdatashuttle_adspy-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 e578af796078b60f78941a007599d3ddf9760587cec3aaa733d0ab3ff82c7312
MD5 a32e3777911ee0b297b314f04af8330a
BLAKE2b-256 b68b2774b84c078de945653efac82a5a1ab00b36ad32aed6a7a32019479ea6e2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page