Daita Agents - Data focused AI agent framework with free local use and premium hosted enterprise features
Project description
Daita Agents
Open-source Python SDK for building production AI agents.
Daita Agents gives you a clean, minimal API for autonomous tool-calling agents that work with any LLM provider — OpenAI, Anthropic, Gemini, Grok, and more. Zero-configuration tracing, pluggable data sources, composable skills, and a workflow system for multi-agent pipelines.
Quickstart
pip install daita-agents
Point an agent at a database and start asking questions:
import asyncio
from daita import Agent
async def main():
agent = await Agent.from_db(
"sqlite:///sales.db",
model="gpt-4o",
)
result = await agent.run("What were the top 5 products by revenue last quarter?")
print(result)
asyncio.run(main())
Agent.from_db() inspects the schema, generates tool wrappers, and composes a system prompt — no manual configuration needed.
Features
- Multi-provider LLM support — OpenAI, Anthropic, Gemini, Grok (or bring your own)
- Autonomous tool calling — agents plan and execute multi-step tool chains without manual orchestration
@tooldecorator — turn any sync or async Python function into an LLM-callable tool in one lineAgent.from_db()— point at a database connection string and get a fully-configured data agent in one call- Skills — reusable, composable units of agent capability that bundle instructions + tools (subclass
BaseSkillor use theSkillhelper) - Streaming — real-time event-based output via
agent.stream()oron_eventcallback - Conversation history — stateful multi-turn sessions with local persistence
- Plugin ecosystem — PostgreSQL, MySQL, MongoDB, SQLite, BigQuery, Snowflake, S3, Slack, Elasticsearch, Pinecone, ChromaDB, Qdrant, Neo4j, Redis, MCP, and more
- Embeddings — pluggable providers (OpenAI, Gemini, Voyage, sentence-transformers) via
BaseEmbeddingProvider - Memory — persistent semantic memory with working memory, memory graph, and automatic local/cloud detection
- Watch system — monitor databases and APIs continuously; trigger agent actions when thresholds are crossed
- Workflows — connect multiple agents into pipelines via relay channels
- Data quality enforcement —
ItemAssertion+query_checked()validate every row and fail fast with structured violations - Agent graph — built-in graph backend powering lineage & catalog; expose traversal tools to agents with
register_graph_tools() - Zero-config tracing — every LLM call and tool execution is automatically traced (tokens, latency, cost); optional OTLP export to Datadog, Jaeger, Honeycomb, etc.
- Retry & reliability — configurable exponential backoff with permanent-error detection
- Focus DSL — pre-filter tool results before the LLM sees them, reducing token usage
Examples
Custom tools with @tool
import asyncio
from daita import Agent, tool
@tool
def search_products(query: str, max_results: int = 5) -> list:
"""Search the product catalog.
Args:
query: Search terms
max_results: Maximum number of results to return
"""
return [{"name": "Widget A", "price": 9.99}]
@tool
def calculate_discount(price: float, pct: float) -> float:
"""Calculate a discounted price.
Args:
price: Original price
pct: Discount percentage (0-100)
"""
return round(price * (1 - pct / 100), 2)
async def main():
agent = Agent(
name="Shopping Assistant",
llm_provider="openai",
model="gpt-4o",
tools=[search_products, calculate_discount],
)
result = await agent.run("Find me a widget and apply a 15% discount.")
print(result)
asyncio.run(main())
Both sync and async functions work with @tool. Parameter types and descriptions are auto-extracted from type hints and docstrings.
Database agent with Agent.from_db()
The fastest way to build a data agent. Pass a connection string (or plugin instance) and get a fully-configured agent with schema-aware tools, an auto-generated system prompt, and optional lineage/memory:
import asyncio
from daita import Agent
async def main():
agent = await Agent.from_db(
"postgresql://user:pass@localhost/sales_db",
model="gpt-4o",
lineage=True, # track data lineage automatically
memory=True, # remember business context across sessions
)
result = await agent.run("What were our top 5 products by revenue last quarter?")
print(result)
asyncio.run(main())
You can also add a database plugin manually for more control:
from daita import Agent
from daita.plugins import postgresql
agent = Agent(name="Sales Analyst", llm_provider="openai", model="gpt-4o")
agent.add_plugin(postgresql(host="localhost", database="sales_db", user="analyst", password="secret"))
result = await agent.run("What were the top 5 products by revenue last quarter?")
Skills — reusable units of capability
Skills bundle domain instructions with a set of tools. Use the Skill helper for simple cases, or subclass BaseSkill when you need dynamic instructions or plugin dependencies.
import asyncio
from daita import Agent, Skill, tool
@tool
def format_report(data: list, title: str) -> str:
"""Render a markdown report."""
rows = "\n".join(f"- {r}" for r in data)
return f"# {title}\n\n{rows}"
@tool
def generate_chart(series: list, kind: str = "bar") -> str:
"""Generate a chart description."""
return f"{kind} chart with {len(series)} series"
report_skill = Skill(
name="report_gen",
description="Produces polished analytical reports",
instructions="Always render results as markdown with a title and bulleted rows.",
tools=[format_report, generate_chart],
)
async def main():
agent = Agent(name="Analyst", llm_provider="openai", model="gpt-4o")
agent.add_skill(report_skill)
result = await agent.run("Summarize Q3 revenue with a chart.")
print(result)
asyncio.run(main())
For skills that need plugin access, subclass BaseSkill and declare requires():
from daita import BaseSkill
from daita.plugins.base_db import BaseDatabasePlugin
class MigrationsSkill(BaseSkill):
name = "migrations"
instructions = "Follow forward-only migration policy."
def requires(self):
return {"db": BaseDatabasePlugin}
Data quality enforcement with ItemAssertion
Validate every row returned by a database query; violations raise DataQualityError (permanent, non-retried) with the full list attached.
import asyncio
from daita import ItemAssertion, DataQualityError
from daita.plugins import postgresql
async def main():
async with postgresql(host="localhost", database="sales_db") as db:
try:
rows = await db.query_checked(
"SELECT id, amount, customer_id FROM transactions WHERE day = CURRENT_DATE",
assertions=[
ItemAssertion(lambda r: r["amount"] > 0, "All amounts must be positive"),
ItemAssertion(lambda r: r["customer_id"] is not None, "Every row needs a customer_id"),
],
)
print(f"{len(rows)} clean rows")
except DataQualityError as exc:
print(f"Data quality failure: {exc}")
asyncio.run(main())
Streaming with agent.stream()
Use agent.stream() to receive real-time events as an async generator:
import asyncio
from daita import Agent
from daita.core.streaming import EventType
async def main():
agent = Agent(name="assistant", llm_provider="openai", model="gpt-4o")
async for event in agent.stream("Explain transformer attention mechanisms"):
if event.type == EventType.THINKING:
print(event.content, end="", flush=True)
elif event.type == EventType.TOOL_CALL:
print(f"\n[calling {event.tool_name}]")
elif event.type == EventType.COMPLETE:
print(f"\n\nDone. Tokens used: {event.token_usage}")
asyncio.run(main())
Alternatively, pass an on_event callback to run():
await agent.run("...", on_event=lambda e: print(e))
Multi-turn conversations with ConversationHistory
import asyncio
from daita import Agent, ConversationHistory
async def main():
agent = Agent(name="Support Bot", llm_provider="anthropic", model="claude-sonnet-4-6")
history = ConversationHistory(session_id="alice-session")
await agent.run("My name is Alice and I prefer concise answers.", history=history)
result = await agent.run("What's my name and preference?", history=history)
print(result) # "Your name is Alice and you prefer concise answers."
asyncio.run(main())
Sessions persist to .daita/sessions/ between process restarts.
Monitor data sources with @agent.watch()
Continuously poll a data source and trigger the agent when a threshold is crossed:
import asyncio
from daita import Agent, WatchEvent
from daita.plugins import postgresql
db = postgresql(host="localhost", database="ops_db")
agent = Agent(name="Ops Monitor", llm_provider="openai", model="gpt-4o")
agent.add_plugin(db)
@agent.watch(
source=db,
condition="SELECT COUNT(*) FROM failed_jobs WHERE created_at > NOW() - INTERVAL '5m'",
threshold=lambda v: v > 10,
interval="1m",
)
async def on_job_failures(event: WatchEvent):
await agent.run(f"There are {event.value} failed jobs in the last 5 minutes. Diagnose and suggest fixes.")
asyncio.run(agent.start())
Watches start lazily on the first run() call, or explicitly with await agent.start().
Multi-agent workflow
import asyncio
from daita import Agent, Workflow
async def main():
fetcher = Agent(name="Data Fetcher", llm_provider="openai", model="gpt-4o")
analyzer = Agent(name="Analyzer", llm_provider="openai", model="gpt-4o")
workflow = Workflow("Sales Pipeline")
workflow.add_agent("fetcher", fetcher)
workflow.add_agent("analyzer", analyzer)
workflow.connect("fetcher", "raw_data", "analyzer")
await workflow.start()
await workflow.inject_data("fetcher", {"query": "Q3 sales"}, task="fetch")
await workflow.stop()
asyncio.run(main())
Memory-enabled agent
import asyncio
from daita import Agent
from daita.plugins import memory
async def main():
agent = Agent(name="Assistant", llm_provider="anthropic", model="claude-sonnet-4-6")
agent.add_plugin(memory())
await agent.run("My name is Alex and I prefer concise answers.")
result = await agent.run("What's my preference?")
print(result)
asyncio.run(main())
Memory auto-detects local or cloud backend and includes working memory, fact extraction, contradiction handling, and a memory graph for association.
Custom embedding providers
from daita import BaseEmbeddingProvider
from daita.embeddings import create_embedding_provider
# Built-in: "openai", "gemini", "voyage", "sentence_transformers", "mock"
embedder = create_embedding_provider("voyage", model="voyage-3")
vectors = await embedder.embed(["hello world", "another doc"])
Subclass BaseEmbeddingProvider to plug in any embedding model you want.
Vector database search
import asyncio
from daita import Agent
from daita.plugins import chroma
async def main():
agent = Agent(name="Knowledge Assistant", llm_provider="openai", model="gpt-4o")
agent.add_plugin(chroma(path="./vectors", collection="docs"))
result = await agent.run("What do our docs say about authentication?")
print(result)
asyncio.run(main())
Expose graph traversal to agents
Lineage and catalog plugins populate a shared agent graph automatically. Call register_graph_tools() to let the agent traverse it directly:
from daita import Agent
from daita.plugins import lineage
from daita.core.graph import register_graph_tools
agent = Agent(name="Impact Analyst", llm_provider="openai", model="gpt-4o")
agent.add_plugin(lineage())
register_graph_tools(agent) # adds graph_subgraph, graph_shortest_path, impact_analysis
await agent.run("What downstream tables break if we drop customers.email?")
OTLP tracing export
from daita import configure_tracing
configure_tracing(
exporter="otlp",
endpoint="https://otel.example.com",
service_name="my-daita-agent",
)
Install with pip install "daita-agents[otlp]" to enable the OTLP exporter. Spans cover LLM calls, tool invocations, retries, and plugin operations.
MCP (Model Context Protocol) integration
import asyncio
from daita import Agent
from daita.plugins import mcp
async def main():
agent = Agent(
name="File Analyst",
llm_provider="openai",
model="gpt-4o",
mcp=mcp.server(command="uvx", args=["mcp-server-filesystem", "/data"]),
)
result = await agent.run("Read report.csv and summarize the totals.")
print(result)
asyncio.run(main())
Plugins
Databases
| Plugin | Description | Extra |
|---|---|---|
postgresql |
Query and write PostgreSQL (pgvector) | [postgresql] |
mysql |
Query and write MySQL | [mysql] |
mongodb |
Query MongoDB collections | [mongodb] |
sqlite |
Query and write SQLite | [sqlite] |
snowflake |
Query Snowflake data warehouse | [snowflake] |
bigquery |
Query Google BigQuery | [bigquery] |
elasticsearch |
Search Elasticsearch indices | [elasticsearch] |
Vector Databases
| Plugin | Description | Extra |
|---|---|---|
chroma |
Local/embedded vector search | [chromadb] |
pinecone |
Managed cloud vector search | [pinecone] |
qdrant |
Self-hosted vector search | [qdrant] |
Integrations & Cloud
| Plugin | Description | Extra |
|---|---|---|
rest |
Call REST APIs | (included) |
s3 |
Read/write S3 objects | [aws] |
slack |
Send Slack messages | [slack] |
email |
Send/receive email (SMTP/IMAP) | (included) |
google_drive |
Read files from Google Drive | [google-drive] |
websearch |
AI-optimized web search (Tavily) | [websearch] |
mcp |
Model Context Protocol servers | [mcp] |
redis_messaging |
Redis pub/sub messaging | [redis] |
redis |
Redis data store operations | [redis] |
neo4j |
Graph database (Cypher queries) | [neo4j] |
Knowledge & Orchestration
| Plugin | Description |
|---|---|
memory |
Persistent semantic agent memory |
catalog |
Schema discovery and metadata management |
lineage |
Data lineage tracking and impact analysis |
orchestrator |
Multi-agent coordination and task routing |
data_quality |
Data profiling and quality checks |
transformer |
SQL transformation management and execution |
Installation
Core (OpenAI included)
pip install daita-agents
LLM providers
pip install "daita-agents[anthropic]" # Claude
pip install "daita-agents[google]" # Gemini
pip install "daita-agents[llm-all]" # All LLM providers
Database plugins
pip install "daita-agents[postgresql]"
pip install "daita-agents[mysql]"
pip install "daita-agents[mongodb]"
pip install "daita-agents[sqlite]"
pip install "daita-agents[bigquery]"
pip install "daita-agents[snowflake]"
pip install "daita-agents[databases]" # All traditional databases
Vector database plugins
pip install "daita-agents[chromadb]"
pip install "daita-agents[pinecone]"
pip install "daita-agents[qdrant]"
pip install "daita-agents[vectordb]" # All vector databases
Embedding providers
pip install "daita-agents[voyage]" # Voyage AI
pip install "daita-agents[sentence-transformers]" # Local sentence-transformers
Cloud
pip install "daita-agents[aws]" # boto3
pip install "daita-agents[gcp]" # Google Cloud services
pip install "daita-agents[google-drive]" # Drive + document parsers
pip install "daita-agents[cloud]" # All cloud integrations
Observability & production
pip install "daita-agents[otlp]" # Export traces to OTLP collectors
pip install "daita-agents[api-server]" # FastAPI + Uvicorn
pip install "daita-agents[production]" # AWS + API server
Data & content
pip install "daita-agents[data]" # pandas, numpy, openpyxl, parsing libs
pip install "daita-agents[web]" # beautifulsoup4, lxml
pip install "daita-agents[data-quality]" # Advanced quality checks (scipy)
pip install "daita-agents[lineage]" # networkx graph support
Bundles
pip install "daita-agents[recommended]" # Anthropic + pandas + beautifulsoup4
pip install "daita-agents[complete]" # Most features, no heavy packages
pip install "daita-agents[all]" # Everything (large install)
Exception hierarchy
All exceptions are importable from daita:
DaitaError → AgentError, LLMError, ConfigError, PluginError, SkillError, WorkflowError, TransientError, RetryableError, PermanentError, RateLimitError, AuthenticationError, ValidationError, FocusDSLError, DataQualityError
Documentation
See the examples/ directory for full working examples, or the documentation.
Contributing
See CONTRIBUTING.md. All contributions are welcome.
License
Apache 2.0 — see LICENSE.
Built by Daita
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file daita_agents-0.17.0.tar.gz.
File metadata
- Download URL: daita_agents-0.17.0.tar.gz
- Upload date:
- Size: 419.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ca4bc2bc2414a98005487b334e7e3a82d0de0edd2111783b110f295d92e099f
|
|
| MD5 |
cdafe4f0fe2d3dba26ea1634199448b6
|
|
| BLAKE2b-256 |
ea2be0f4c665a06be8f6f3713bbad3dc235dd15f91e9edb4d9f844a04ca1335d
|
File details
Details for the file daita_agents-0.17.0-py3-none-any.whl.
File metadata
- Download URL: daita_agents-0.17.0-py3-none-any.whl
- Upload date:
- Size: 523.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1bd8f35ea1d328522ba9191d6b498ac521d23143284496c02b2ddeb8e26a92ac
|
|
| MD5 |
c566e69934f6b41acb778ce4e6d7130a
|
|
| BLAKE2b-256 |
4c0649aaca697212a6e769d3f3ede88d58cfda4b627e66e29383d889d4424fe9
|