Skip to main content

Python SDK for the Administrate.dev REST API

Project description

Administrate Python SDK

The official Python SDK for the Administrate.dev REST API.

Administrate.dev is AI Agency Management software. It is a monitoring and management platform for AI agencies running n8n and AI automation workflows across multiple clients. It provides a single dashboard to track every workflow, every client, every failure, and all LLM costs, so you can catch problems before clients do and prove the value of your automations.

Key platform features:

  • Multi-instance monitoring — See all n8n instances across every client in one place
  • Error tracking — Real-time failure detection with automatic error categorization
  • LLM cost tracking — Connect OpenAI, Anthropic, Azure, and OpenRouter accounts to attribute costs to specific clients
  • Workflow insights — Execution counts, success rates, and time-saved ROI reporting
  • Sync health — Know instantly when a data sync fails
  • Webhooks & API — Full programmatic access for custom integrations

Installation

pip install administrate

Requires Python 3.9+.

Quick start

from administrate import Administrate

client = Administrate(api_key="sk_live_...")

# Get account info
account = client.account.get()
print(account.name, account.plan)

# List all clients with auto-pagination
for c in client.clients.list():
    print(c.name, c.n8n_instances_count)

# Check for failed executions across all instances
for execution in client.executions.list(errors_only=True):
    print(f"{execution.workflow_name}: {execution.error_category}")

# Get LLM cost summary
costs = client.llm_costs.summary()
print(f"Total: ${costs.data.summary.total_cost_cents / 100:.2f}")

Async usage

from administrate import AsyncAdministrate

async with AsyncAdministrate(api_key="sk_live_...") as client:
    account = await client.account.get()

    async for workflow in client.workflows.list(active=True):
        print(workflow.name, workflow.is_active)

Every resource method available on Administrate has an identical async counterpart on AsyncAdministrate.

Configuration

from administrate import Administrate

client = Administrate(
    api_key="sk_live_...",        # Required. Must start with "sk_live_"
    base_url="https://...",       # Default: "https://administrate.dev"
    timeout=30.0,                 # Request timeout in seconds. Default: 30
    max_retries=3,                # Retry attempts for failed requests. Default: 3
)

The client can be used as a context manager to ensure the underlying HTTP connection is closed:

with Administrate(api_key="sk_live_...") as client:
    account = client.account.get()
# Connection closed automatically

# Or close manually
client = Administrate(api_key="sk_live_...")
# ... use client ...
client.close()

You can also pass a pre-configured httpx.Client (or httpx.AsyncClient for async) if you need full control over the HTTP layer:

import httpx

http_client = httpx.Client(proxy="http://proxy:8080")
client = Administrate(api_key="sk_live_...", http_client=http_client)

API reference

All API keys are created in Settings > Developers within Administrate.dev. Tokens have three permission levels: read, write, and full.

Account

# Get current token info and account summary
me = client.account.me()
print(me.token.name, me.token.permission)
print(me.account.name, me.account.plan)

# Get full account details
account = client.account.get()

# Update account settings
account = client.account.update(
    name="My Agency",
    billing_email="billing@example.com",
    timezone="Australia/Brisbane",
)

Clients

Clients represent the companies you manage automations for.

# List all clients (auto-paginates)
for c in client.clients.list():
    print(c.name, c.code)

# Get a client (includes 7-day metrics)
c = client.clients.get("com_abc123")
print(c.metrics.success_rate, c.metrics.time_saved_minutes)

# Create a client
c = client.clients.create(
    name="Acme Corp",
    code="acme",
    contact_email="ops@acme.com",
    timezone="America/New_York",
)

# Update a client
c = client.clients.update("com_abc123", notes="Enterprise tier")

# Delete a client (requires full permission)
client.clients.delete("com_abc123")

Instances

Instances are n8n deployments connected to Administrate.

# List all instances
for inst in client.instances.list():
    print(inst.name, inst.sync_status)

# Filter by client or sync status
for inst in client.instances.list(client_id="com_abc123", sync_status="error"):
    print(inst.name, inst.last_sync_error)

# Get an instance (includes 7-day metrics)
inst = client.instances.get("n8n_abc123")
print(inst.metrics.executions_count, inst.metrics.success_rate)

# Connect a new n8n instance
inst = client.instances.create(
    client_id="com_abc123",
    name="Production n8n",
    base_url="https://n8n.acme.com",
    api_key="n8n_api_key_here",
)

# Trigger a sync
result = client.instances.sync("n8n_abc123", sync_type="all")

# Sync all instances at once
result = client.instances.sync_all(sync_type="workflows")

# Update an instance
inst = client.instances.update("n8n_abc123", name="Staging n8n")

# Delete an instance
client.instances.delete("n8n_abc123")

Workflows

# List workflows with filters
for wf in client.workflows.list(client_id="com_abc123", active=True):
    print(wf.name, wf.is_active)

# Search by name
for wf in client.workflows.list(search="onboarding"):
    print(wf.name)

# Get a workflow (includes 7-day metrics)
wf = client.workflows.get("wfl_abc123")
print(wf.metrics.success_rate, wf.metrics.time_saved_minutes)

# Set time-saved estimates (for ROI reporting)
wf = client.workflows.update(
    "wfl_abc123",
    minutes_saved_per_success=15,
    minutes_saved_per_failure=5,
)

Executions

Executions are read-only records of workflow runs synced from n8n.

# List executions with filters
for ex in client.executions.list(
    client_id="com_abc123",
    status="failed",
    start_date="2025-01-01",
    end_date="2025-01-31",
):
    print(ex.workflow_name, ex.status, ex.duration_ms)

# Get only errors
for ex in client.executions.list(errors_only=True):
    print(ex.error_category, ex.workflow_name)

# Get execution details (includes error message and payload)
ex = client.executions.get("exe_abc123")
print(ex.error_message)
print(ex.error_payload)

Sync runs

# List sync run history
for run in client.sync_runs.list(instance_id="n8n_abc123", status="failed"):
    print(run.sync_type, run.status, run.duration_seconds)

# Get a specific sync run
run = client.sync_runs.get("syn_abc123")

# Get sync health across all instances
for entry in client.sync_runs.health():
    print(entry.instance_name, entry.sync_status)
    print(f"  Workflows last synced: {entry.workflows.last_synced_at}")
    print(f"  Executions last synced: {entry.executions.last_synced_at}")

Users

# List team members
for user in client.users.list():
    print(user.name, user.email, user.role)

# Get a user
user = client.users.get("usr_abc123")

# Invite a new team member
invitation = client.users.invite(email="new@example.com", role="member")
print(invitation.expires_at)

# Change a user's role
user = client.users.update("usr_abc123", role="admin")

# Remove a user
client.users.delete("usr_abc123")

Webhooks

# List webhooks
for wh in client.webhooks.list():
    print(wh.url, wh.events, wh.enabled)

# Create a webhook
wh = client.webhooks.create(
    url="https://example.com/hook",
    events=["execution.failed", "sync.failed"],
    description="Slack failure alerts",
)
print(wh.secret)  # Save this — used to verify webhook signatures

# Update a webhook
wh = client.webhooks.update("whk_abc123", enabled=False)

# Regenerate the signing secret (old secret becomes invalid immediately)
wh = client.webhooks.regenerate_secret("whk_abc123")
print(wh.secret)

# Delete a webhook
client.webhooks.delete("whk_abc123")

API tokens

# List all tokens
for token in client.api_tokens.list():
    print(token.name, token.permission, token.token_hint)

# Create a token (the plain token is only returned once)
token = client.api_tokens.create(
    name="CI/CD Pipeline",
    permission="read",
    ip_allowlist=["10.0.0.0/8"],
    expires_in="90_days",
)
print(token.token)  # sk_live_... — save this immediately

# Update a token
token = client.api_tokens.update("tok_abc123", name="Updated Name")

# Revoke a token
client.api_tokens.delete("tok_abc123")

LLM providers

Connect your AI provider accounts to track costs.

# List providers
for provider in client.llm_providers.list():
    print(provider.name, provider.provider_type, provider.sync_status)

# Get a provider (includes 7-day metrics)
provider = client.llm_providers.get("llm_abc123")
print(provider.metrics.total_cost_cents, provider.metrics.total_tokens)

# Connect a new provider
provider = client.llm_providers.create(
    name="OpenAI Production",
    provider_type="openai",  # openai, anthropic, openrouter, or azure
    api_key="sk-...",
    organization_id="org-...",
)

# Trigger a cost sync
client.llm_providers.sync("llm_abc123")

# Update a provider
provider = client.llm_providers.update("llm_abc123", name="OpenAI Staging")

# Delete a provider
client.llm_providers.delete("llm_abc123")

LLM projects

Projects are discovered automatically when syncing a provider. Assign them to clients to attribute costs.

# List projects for a provider
for project in client.llm_projects.list("llm_abc123"):
    print(project.name, project.total_cost_cents, project.client_name)

# Assign a project to a client
project = client.llm_projects.update(
    "llm_abc123", "proj_456", client_id="com_abc123"
)

LLM costs

# Get cost summary (defaults to last 7 days)
costs = client.llm_costs.summary()
print(f"Total: ${costs.data.summary.total_cost_cents / 100:.2f}")
print(f"Tokens: {costs.data.summary.total_tokens:,}")

# Breakdown by provider
for p in costs.data.providers:
    print(f"  {p.name}: ${p.cost_cents / 100:.2f}")

# Breakdown by model
for m in costs.data.models:
    print(f"  {m.model}: ${m.cost_cents / 100:.2f}")

# Daily trend
for day in costs.data.daily:
    print(f"  {day.date}: ${day.cost_cents / 100:.2f}")

# Custom date range
costs = client.llm_costs.summary(
    start_date="2025-01-01",
    end_date="2025-01-31",
)

# Costs by client
for entry in client.llm_costs.by_client().data:
    print(f"{entry.name}: ${entry.cost_cents / 100:.2f}")

# Costs by provider
for entry in client.llm_costs.by_provider().data:
    print(f"{entry.name}: ${entry.cost_cents / 100:.2f}")

Pagination

All .list() methods return an iterator that handles pagination automatically. By default, the API returns 25 items per page (max 100).

# Auto-paginate through all results
for c in client.clients.list():
    print(c.name)

# Control page size
for c in client.clients.list(per_page=100):
    print(c.name)

# Get a single page
page = client.clients.list(per_page=10).first_page()
print(page.meta.total, page.meta.total_pages)
for c in page:
    print(c.name)

Async iteration works the same way:

async for c in async_client.clients.list():
    print(c.name)

Error handling

The SDK raises typed exceptions for all API errors:

from administrate import (
    Administrate,
    APIError,
    AuthenticationError,
    NotFoundError,
    RateLimitError,
    ValidationError,
)

client = Administrate(api_key="sk_live_...")

try:
    c = client.clients.get("com_nonexistent")
except NotFoundError as e:
    print(f"Not found: {e.message}")
except AuthenticationError:
    print("Invalid API key")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after}s")
except ValidationError as e:
    print(f"Invalid params: {e.body}")
except APIError as e:
    print(f"API error {e.status_code}: {e.message}")

Exception hierarchy:

Exception Status code Description
AdministrateError Base exception for all SDK errors
APIError Any non-2xx Base for all HTTP API errors
AuthenticationError 401 Invalid or missing API key
PermissionDeniedError 403 Insufficient token permissions
NotFoundError 404 Resource does not exist
ValidationError 422 Invalid request parameters
RateLimitError 429 Rate limit exceeded (has retry_after)
InternalServerError 5xx Server-side error
ConnectionError Failed to connect to the API
TimeoutError Request timed out

All APIError subclasses expose status_code, response (the raw httpx.Response), and body (parsed JSON or text).

Retries

The SDK automatically retries failed requests with exponential backoff:

  • 429 (rate limited) — Retries after the duration specified in the Retry-After header
  • 5xx (server errors) — Retries with exponential backoff (0.5s, 1s, 2s, ...)
  • Connection errors and timeouts — Retried with the same backoff schedule

By default, the SDK retries up to 3 times. Set max_retries=0 to disable:

client = Administrate(api_key="sk_live_...", max_retries=0)

Requirements

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

administrate-0.1.1.tar.gz (84.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

administrate-0.1.1-py3-none-any.whl (31.2 kB view details)

Uploaded Python 3

File details

Details for the file administrate-0.1.1.tar.gz.

File metadata

  • Download URL: administrate-0.1.1.tar.gz
  • Upload date:
  • Size: 84.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.5

File hashes

Hashes for administrate-0.1.1.tar.gz
Algorithm Hash digest
SHA256 c4a7fb0682e0dd444ac1e405fd178b0b37f3f374972f10fc9a29004ff0fa684f
MD5 6109f493eced3273d62594ff1dad9b5b
BLAKE2b-256 61c1488aeb35301a10c7e52ac0bd149e84b92438945ff129246158f708046f14

See more details on using hashes here.

File details

Details for the file administrate-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: administrate-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 31.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.5

File hashes

Hashes for administrate-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6e37519ba9cd40c46659c43fe554051ac5bc4f92d3d4b38e1804be9a8852e3e8
MD5 bd7d74a505ca68a6bb95db1b55ff8a0c
BLAKE2b-256 fc1f2cd64fe252307580d4bcac7c4b75ea136b2af91f32e450355564525effd6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page