Dubsof client SDK — connects external services to the Dubsof platform
Project description
dubsof-sdk
Python client SDK for the Dubsof platform. Connects external services to Atrium Run — registers tools over WebSocket and serves tool calls dispatched by the workflow execution engine.
Install
pip install -e .
# or with uv
uv pip install -e .
Requirements: Python ≥ 3.10, websockets, httpx, betterproto.
Quick start
import asyncio
import dubsof
from dubsof import credentials
from dubsof.services import run
dubsof.initialize_app(
credentials.APIKey("atk_..."),
url="http://localhost:8080",
client_id="my-service",
)
@run.tool("search_crm", description="Search CRM contacts by keyword")
def search_crm(query: str, tenant_id: str = "default") -> dict:
return {"results": db.search(query)}
@run.tool("send_email", description="Send a transactional email")
def send_email(to: str, subject: str, body: str, tenant_id: str = "default") -> dict:
mailer.send(to, subject, body)
return {"sent": True}
asyncio.run(run.connect()) # blocks — reconnects automatically
run.connect() completes the WebSocket handshake, registers all decorated tools with the server, then serves tool calls indefinitely. It reconnects with exponential backoff (1 s → 60 s cap) on disconnect.
Configuration with pydantic-settings
import dubsof
from dubsof import credentials
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="ATRIUM_")
api_key: str
url: str = "http://localhost:8080"
client_id: str = "my-service"
settings = Settings() # reads ATRIUM_API_KEY, ATRIUM_URL, ATRIUM_CLIENT_ID
dubsof.initialize_app(
credentials.APIKey(settings.api_key),
url=settings.url,
client_id=settings.client_id,
)
Credentials
from dubsof import credentials
# API key — create one in Settings → API Keys
cred = credentials.APIKey("atk_...")
APIKey is the only credential type today. The Credentials base class is designed for future additions (service accounts, OAuth) without changing call sites.
Tool registration
from dubsof.services import run
@run.tool(
"tool_id", # stable ID registered with the server
description="What this does", # shown to the LLM during planning
schema={"type": "object", ...}, # optional JSON Schema override
)
def my_tool(arg: str, tenant_id: str = "default") -> dict:
...
tenant_idis injected automatically by the server — always accept it as a keyword argument with a default.- The return value must be JSON-serialisable.
- Both sync and async handlers are supported.
Tools spread across modules are registered at import time. Import them before calling run.connect():
# main.py
import dubsof
from dubsof import credentials
dubsof.initialize_app(credentials.APIKey("atk_..."), url="...", client_id="my-service")
import tools.crm # registers @run.tool decorators
import tools.email
from dubsof.services import run
asyncio.run(run.connect())
Waiting for connection
When bootstrap logic must run after the handshake completes:
async def main():
connect_task = asyncio.create_task(run.connect())
await run.wait_connected(timeout=15.0) # raises TimeoutError if server unreachable
# Safe to use run.http here
workflows = await run.http.list_workflows()
await connect_task
Firing events
from dubsof.services import run
# Via WebSocket (low latency, requires active connection)
await run.fire_event(trigger_id, payload={"city": "London"})
# Via REST (works independently of WebSocket lifecycle)
await run.http.fire_event(trigger_id, payload={"city": "London"}, tenant_id="acme")
REST API (run.http)
run.http is an HTTPClient instance. All methods are async and raise httpx.HTTPStatusError on non-2xx. 429 responses are retried automatically (up to 3×) after the server-specified Retry-After delay.
Workflows
wf = await run.http.create_workflow("My workflow", description="...")
wfs = await run.http.list_workflows()
wf = await run.http.get_workflow(workflow_id)
await run.http.update_workflow(workflow_id, name="New name")
await run.http.delete_workflow(workflow_id)
# LLM-assisted generation from a natural language prompt
wf = await run.http.generate_workflow("Monitor Slack and post a weather summary daily")
Flows
flow = await run.http.create_flow(
workflow_id,
title="Fetch and notify",
description="Get weather then post to Slack",
trigger_id=tid,
tool_ids=[tool_id_a, tool_id_b], # security scope — only these tools may run in this flow
)
flows = await run.http.list_flows(workflow_id)
Flow dependencies
# flow_b will not start until flow_a completes
await run.http.add_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
await run.http.remove_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
deps = await run.http.list_flow_deps(workflow_id)
Tasks
task = await run.http.create_task(
workflow_id, flow_id,
title="Get weather",
description="Use weather.fetch to get current conditions for the target city.",
)
tasks = await run.http.list_tasks(workflow_id, flow_id)
Triggers
trigger = await run.http.create_trigger(workflow_id, description="Manual fire")
triggers = await run.http.list_triggers(workflow_id)
Executions
result = await run.http.list_executions(
workflow_id=wf_id,
status="running", # pending | running | success | error
limit=20,
)
execution = await run.http.get_execution(execution_id)
Tools
tools = await run.http.list_tools(client_id="my-service")
active = await run.http.list_tools(active_only=True)
Rate limits
limits = await run.http.get_rate_limit()
# {"tier": "pro", "limit_minute": 300, "used_minute": 12, ...}
Rate limit headers (X-RateLimit-*) are also returned on every API response.
Running tests
uv pip install -e ".[dev]"
pytest
Integration tests require a running Atrium server. Set ATRIUM_URL and ATRIUM_API_KEY before running them:
ATRIUM_URL=http://localhost:8080 ATRIUM_API_KEY=atk_... pytest tests/test_integration.py
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dubsof-0.1.0.tar.gz.
File metadata
- Download URL: dubsof-0.1.0.tar.gz
- Upload date:
- Size: 21.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
32859076f1b5072d46e191e027f90896cdba783fe9ec15d2480497303d3aa444
|
|
| MD5 |
7b6aaace1fbec63153b31a92999b411e
|
|
| BLAKE2b-256 |
54d87e449562aed52072f22aa4664ec6c18e19e6bed66c16b41db845c208bfd2
|
File details
Details for the file dubsof-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dubsof-0.1.0-py3-none-any.whl
- Upload date:
- Size: 18.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
adca1bd46e701c249eebf315d01d5cd81d83eedb58144e3a652a8eca1e772f86
|
|
| MD5 |
d2aeaca4ba49529b34985d558eefe99e
|
|
| BLAKE2b-256 |
6cffc18478bf8ba6c7c5110d937c363508addb1e61b71c4e34c6d6bae9d737f4
|