Skip to main content

Python SDK for the Atrium platform by Dubsof

Project description

dubsof-sdk

Python SDK for Atrium by Dubsof, the platform for workflow automation, AI composition, and service orchestration.


Install

pip install -e .
# or with uv
uv pip install -e .

Requirements: Python ≥ 3.10, websockets, httpx, betterproto.


Quick start

import asyncio
import dubsof
from dubsof import credentials
from dubsof.services import run

dubsof.initialize_app(
    credentials.APIKey("atk_..."),
    url="http://localhost:8080",
    client_id="my-service",
)

@run.tool("search_crm", description="Search CRM contacts by keyword")
def search_crm(query: str, tenant_id: str = "default") -> dict:
    return {"results": db.search(query)}

@run.tool("send_email", description="Send a transactional email")
def send_email(to: str, subject: str, body: str, tenant_id: str = "default") -> dict:
    mailer.send(to, subject, body)
    return {"sent": True}

asyncio.run(run.connect())   # blocks — reconnects automatically

run.connect() completes the WebSocket handshake, registers all decorated tools with the server, then serves tool calls indefinitely. It reconnects with exponential backoff (1 s → 60 s cap) on disconnect.


Configuration with pydantic-settings

import dubsof
from dubsof import credentials
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="ATRIUM_")
    api_key:   str
    url:       str = "http://localhost:8080"
    client_id: str = "my-service"

settings = Settings()   # reads ATRIUM_API_KEY, ATRIUM_URL, ATRIUM_CLIENT_ID

dubsof.initialize_app(
    credentials.APIKey(settings.api_key),
    url=settings.url,
    client_id=settings.client_id,
)

Credentials

from dubsof import credentials

# API key — create one in Settings → API Keys
cred = credentials.APIKey("atk_...")

APIKey is the only credential type today. The Credentials base class is designed for future additions (service accounts, OAuth) without changing call sites.


Tool registration

from dubsof.services import run

@run.tool(
    "tool_id",                      # stable ID registered with the server
    description="What this does",   # shown to the LLM during planning
    schema={"type": "object", ...}, # optional JSON Schema override
)
def my_tool(arg: str, tenant_id: str = "default") -> dict:
    ...
  • tenant_id is injected automatically by the server — always accept it as a keyword argument with a default.
  • The return value must be JSON-serialisable.
  • Both sync and async handlers are supported.

Tools spread across modules are registered at import time. Import them before calling run.connect():

# main.py
import dubsof
from dubsof import credentials
dubsof.initialize_app(credentials.APIKey("atk_..."), url="...", client_id="my-service")

import tools.crm    # registers @run.tool decorators
import tools.email

from dubsof.services import run
asyncio.run(run.connect())

Waiting for connection

When bootstrap logic must run after the handshake completes:

async def main():
    connect_task = asyncio.create_task(run.connect())
    await run.wait_connected(timeout=15.0)   # raises TimeoutError if server unreachable

    # Safe to use run.http here
    workflows = await run.http.list_workflows()

    await connect_task

Firing events

from dubsof.services import run

# Via WebSocket (low latency, requires active connection)
await run.fire_event(trigger_id, payload={"city": "London"})

# Via REST (works independently of WebSocket lifecycle)
await run.http.fire_event(trigger_id, payload={"city": "London"}, tenant_id="acme")

REST API (run.http)

run.http is an HTTPClient instance. All methods are async and raise httpx.HTTPStatusError on non-2xx. 429 responses are retried automatically (up to 3×) after the server-specified Retry-After delay.

Workflows

wf  = await run.http.create_workflow("My workflow", description="...")
wfs = await run.http.list_workflows()
wf  = await run.http.get_workflow(workflow_id)
      await run.http.update_workflow(workflow_id, name="New name")
      await run.http.delete_workflow(workflow_id)

# LLM-assisted generation from a natural language prompt
wf  = await run.http.generate_workflow("Monitor Slack and post a weather summary daily")

Flows

flow = await run.http.create_flow(
    workflow_id,
    title="Fetch and notify",
    description="Get weather then post to Slack",
    trigger_id=tid,
    tool_ids=[tool_id_a, tool_id_b],  # security scope — only these tools may run in this flow
)
flows = await run.http.list_flows(workflow_id)

Flow dependencies

# flow_b will not start until flow_a completes
await run.http.add_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
await run.http.remove_flow_dep(workflow_id, flow_id=flow_b, depends_on_flow_id=flow_a)
deps = await run.http.list_flow_deps(workflow_id)

Tasks

task = await run.http.create_task(
    workflow_id, flow_id,
    title="Get weather",
    description="Use weather.fetch to get current conditions for the target city.",
)
tasks = await run.http.list_tasks(workflow_id, flow_id)

Triggers

trigger  = await run.http.create_trigger(workflow_id, description="Manual fire")
triggers = await run.http.list_triggers(workflow_id)

Executions

result    = await run.http.list_executions(
    workflow_id=wf_id,
    status="running",   # pending | running | success | error
    limit=20,
)
execution = await run.http.get_execution(execution_id)

Tools

tools  = await run.http.list_tools(client_id="my-service")
active = await run.http.list_tools(active_only=True)

Rate limits

limits = await run.http.get_rate_limit()
# {"tier": "pro", "limit_minute": 300, "used_minute": 12, ...}

Rate limit headers (X-RateLimit-*) are also returned on every API response.


Compose API (compose.http)

compose.http provides access to Atrium Compose — project management, AI generation, and deployment.

from dubsof.services import compose

# Projects
projects = await compose.http.list_projects()
project  = await compose.http.create_project("My API", prompt="A REST API for managing orders")
project  = await compose.http.get_project(project_id)   # includes graph when available
          await compose.http.update_project(project_id, name="Order API")
          await compose.http.delete_project(project_id)

# Inspect and validate the project's current output
graph  = await compose.http.parse_project(project_id)    # {"ok", "graph", "types"}
check  = await compose.http.check_project(project_id)    # {"ok", "diagnostics"}
result = await compose.http.compile_project(project_id)  # {"ok", "errors", "service_names"}

# Download artefacts
zip_bytes  = await compose.http.export_project(project_id)    # compiled source ZIP
spec_bytes = await compose.http.download_openapi(project_id)  # OpenAPI JSON or ZIP

# Deployment
deployment = await compose.http.get_deployment(project_id)
            await compose.http.redeploy(project_id)

AI Generation

# Stream generation events in real time
async for event in compose.http.generate_project(project_id, "A REST API for managing orders"):
    if event["event"] == "update":
        print(event["data"]["phase"], event["data"]["status"])
    elif event["event"] == "build_done":
        print("Services:", event["data"].get("service_names"))
    elif event["event"] == "error":
        raise RuntimeError(event["data"]["error"])
    elif event["event"] == "done":
        break

# Fetch the completed project after the stream ends
project = await compose.http.get_project(project_id)

Generation is streamed as Server-Sent Events. The project is updated in the database as generation progresses.

Models and usage

models = await compose.http.list_models()
usage  = await compose.http.get_usage()

Runtime API (runtime.http)

runtime.http provides access to Atrium Runtime — deploy and manage Compose project services.

from dubsof.services import runtime

# Deploy a Compose project
dep = await runtime.http.deploy_project(project_id)
print(dep["id"], dep["deployment_status"])  # dep-a1b2c3d4  creating

# Stream until live (polls every 3s, times out after 600s)
async for status in runtime.http.stream_deployment(dep["id"]):
    print(status["deployment_status"], status.get("deployment_urls"))
# live  {'hello_world': 'https://hello-world.dubsof.app'}

# List all deployments for the org
deployments = await runtime.http.list_deployments()
for d in deployments:
    print(d["project_name"], d["deployment_status"], d["deployment_urls"])

# Redeploy (creates a new deployment record)
new_dep = await runtime.http.redeploy(dep["id"])

Scopes required

Method Scope
list_deployments, get_deployment, stream_deployment runtime:deployments:read
deploy_project, redeploy runtime:deployments:write

Running tests

uv pip install -e ".[dev]"
pytest

Integration tests require a running Atrium server. Set ATRIUM_URL and ATRIUM_API_KEY before running them:

ATRIUM_URL=http://localhost:8080 ATRIUM_API_KEY=atk_... pytest tests/test_integration.py

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dubsof-0.2.0.tar.gz (31.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dubsof-0.2.0-py3-none-any.whl (30.0 kB view details)

Uploaded Python 3

File details

Details for the file dubsof-0.2.0.tar.gz.

File metadata

  • Download URL: dubsof-0.2.0.tar.gz
  • Upload date:
  • Size: 31.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for dubsof-0.2.0.tar.gz
Algorithm Hash digest
SHA256 f16f8a015f823ee5799ce657bf593abbde62f1241229faae2384014164d235f5
MD5 26c4f52d420ac784b6993397e9f15eac
BLAKE2b-256 a8986b821a90f8db3fd41f4c18a07824571ce1c091cfefe0a5ffee71a890764d

See more details on using hashes here.

File details

Details for the file dubsof-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: dubsof-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 30.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for dubsof-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4df039705cdc67d3802138ea0116c4dcfe64a3e52a16b69d03af0e4c1a0a55aa
MD5 6aa46ad0ae5714c46513c68d28825df2
BLAKE2b-256 c3f6320fd202833549993cd7a117c46035bfba5491cbd77424ffe81ee21e51e9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page