Python SDK for the Administrate.dev REST API
Reason this release was yanked:
Incorrect url
Project description
Administrate Python SDK
The official Python SDK for the Administrate.dev REST API.
Administrate.dev is a monitoring and management platform for AI agencies running n8n and AI automation workflows across multiple clients. It provides a single dashboard to track every workflow, every client, every failure, and all LLM costs, so you can catch problems before clients do and prove the value of your automations.
Key platform features:
- Multi-instance monitoring — See all n8n instances across every client in one place
- Error tracking — Real-time failure detection with automatic error categorization
- LLM cost tracking — Connect OpenAI, Anthropic, Azure, and OpenRouter accounts to attribute costs to specific clients
- Workflow insights — Execution counts, success rates, and time-saved ROI reporting
- Sync health — Know instantly when a data sync fails
- Webhooks & API — Full programmatic access for custom integrations
Installation
pip install administrate
Requires Python 3.9+.
Quick start
from administrate import Administrate
client = Administrate(api_key="sk_live_...")
# Get account info
account = client.account.get()
print(account.name, account.plan)
# List all clients with auto-pagination
for c in client.clients.list():
print(c.name, c.n8n_instances_count)
# Check for failed executions across all instances
for execution in client.executions.list(errors_only=True):
print(f"{execution.workflow_name}: {execution.error_category}")
# Get LLM cost summary
costs = client.llm_costs.summary()
print(f"Total: ${costs.data.summary.total_cost_cents / 100:.2f}")
Async usage
from administrate import AsyncAdministrate
async with AsyncAdministrate(api_key="sk_live_...") as client:
account = await client.account.get()
async for workflow in client.workflows.list(active=True):
print(workflow.name, workflow.is_active)
Every resource method available on Administrate has an identical async counterpart on AsyncAdministrate.
Configuration
from administrate import Administrate
client = Administrate(
api_key="sk_live_...", # Required. Must start with "sk_live_"
base_url="https://...", # Default: "https://app.administrate.dev"
timeout=30.0, # Request timeout in seconds. Default: 30
max_retries=3, # Retry attempts for failed requests. Default: 3
)
The client can be used as a context manager to ensure the underlying HTTP connection is closed:
with Administrate(api_key="sk_live_...") as client:
account = client.account.get()
# Connection closed automatically
# Or close manually
client = Administrate(api_key="sk_live_...")
# ... use client ...
client.close()
You can also pass a pre-configured httpx.Client (or httpx.AsyncClient for async) if you need full control over the HTTP layer:
import httpx
http_client = httpx.Client(proxy="http://proxy:8080")
client = Administrate(api_key="sk_live_...", http_client=http_client)
API reference
All API keys are created in Settings > Developers within Administrate.dev. Tokens have three permission levels: read, write, and full.
Account
# Get current token info and account summary
me = client.account.me()
print(me.token.name, me.token.permission)
print(me.account.name, me.account.plan)
# Get full account details
account = client.account.get()
# Update account settings
account = client.account.update(
name="My Agency",
billing_email="billing@example.com",
timezone="Australia/Brisbane",
)
Clients
Clients represent the companies you manage automations for.
# List all clients (auto-paginates)
for c in client.clients.list():
print(c.name, c.code)
# Get a client (includes 7-day metrics)
c = client.clients.get("com_abc123")
print(c.metrics.success_rate, c.metrics.time_saved_minutes)
# Create a client
c = client.clients.create(
name="Acme Corp",
code="acme",
contact_email="ops@acme.com",
timezone="America/New_York",
)
# Update a client
c = client.clients.update("com_abc123", notes="Enterprise tier")
# Delete a client (requires full permission)
client.clients.delete("com_abc123")
Instances
Instances are n8n deployments connected to Administrate.
# List all instances
for inst in client.instances.list():
print(inst.name, inst.sync_status)
# Filter by client or sync status
for inst in client.instances.list(client_id="com_abc123", sync_status="error"):
print(inst.name, inst.last_sync_error)
# Get an instance (includes 7-day metrics)
inst = client.instances.get("n8n_abc123")
print(inst.metrics.executions_count, inst.metrics.success_rate)
# Connect a new n8n instance
inst = client.instances.create(
client_id="com_abc123",
name="Production n8n",
base_url="https://n8n.acme.com",
api_key="n8n_api_key_here",
)
# Trigger a sync
result = client.instances.sync("n8n_abc123", sync_type="all")
# Sync all instances at once
result = client.instances.sync_all(sync_type="workflows")
# Update an instance
inst = client.instances.update("n8n_abc123", name="Staging n8n")
# Delete an instance
client.instances.delete("n8n_abc123")
Workflows
# List workflows with filters
for wf in client.workflows.list(client_id="com_abc123", active=True):
print(wf.name, wf.is_active)
# Search by name
for wf in client.workflows.list(search="onboarding"):
print(wf.name)
# Get a workflow (includes 7-day metrics)
wf = client.workflows.get("wfl_abc123")
print(wf.metrics.success_rate, wf.metrics.time_saved_minutes)
# Set time-saved estimates (for ROI reporting)
wf = client.workflows.update(
"wfl_abc123",
minutes_saved_per_success=15,
minutes_saved_per_failure=5,
)
Executions
Executions are read-only records of workflow runs synced from n8n.
# List executions with filters
for ex in client.executions.list(
client_id="com_abc123",
status="failed",
start_date="2025-01-01",
end_date="2025-01-31",
):
print(ex.workflow_name, ex.status, ex.duration_ms)
# Get only errors
for ex in client.executions.list(errors_only=True):
print(ex.error_category, ex.workflow_name)
# Get execution details (includes error message and payload)
ex = client.executions.get("exe_abc123")
print(ex.error_message)
print(ex.error_payload)
Sync runs
# List sync run history
for run in client.sync_runs.list(instance_id="n8n_abc123", status="failed"):
print(run.sync_type, run.status, run.duration_seconds)
# Get a specific sync run
run = client.sync_runs.get("syn_abc123")
# Get sync health across all instances
for entry in client.sync_runs.health():
print(entry.instance_name, entry.sync_status)
print(f" Workflows last synced: {entry.workflows.last_synced_at}")
print(f" Executions last synced: {entry.executions.last_synced_at}")
Users
# List team members
for user in client.users.list():
print(user.name, user.email, user.role)
# Get a user
user = client.users.get("usr_abc123")
# Invite a new team member
invitation = client.users.invite(email="new@example.com", role="member")
print(invitation.expires_at)
# Change a user's role
user = client.users.update("usr_abc123", role="admin")
# Remove a user
client.users.delete("usr_abc123")
Webhooks
# List webhooks
for wh in client.webhooks.list():
print(wh.url, wh.events, wh.enabled)
# Create a webhook
wh = client.webhooks.create(
url="https://example.com/hook",
events=["execution.failed", "sync.failed"],
description="Slack failure alerts",
)
print(wh.secret) # Save this — used to verify webhook signatures
# Update a webhook
wh = client.webhooks.update("whk_abc123", enabled=False)
# Regenerate the signing secret (old secret becomes invalid immediately)
wh = client.webhooks.regenerate_secret("whk_abc123")
print(wh.secret)
# Delete a webhook
client.webhooks.delete("whk_abc123")
API tokens
# List all tokens
for token in client.api_tokens.list():
print(token.name, token.permission, token.token_hint)
# Create a token (the plain token is only returned once)
token = client.api_tokens.create(
name="CI/CD Pipeline",
permission="read",
ip_allowlist=["10.0.0.0/8"],
expires_in="90_days",
)
print(token.token) # sk_live_... — save this immediately
# Update a token
token = client.api_tokens.update("tok_abc123", name="Updated Name")
# Revoke a token
client.api_tokens.delete("tok_abc123")
LLM providers
Connect your AI provider accounts to track costs.
# List providers
for provider in client.llm_providers.list():
print(provider.name, provider.provider_type, provider.sync_status)
# Get a provider (includes 7-day metrics)
provider = client.llm_providers.get("llm_abc123")
print(provider.metrics.total_cost_cents, provider.metrics.total_tokens)
# Connect a new provider
provider = client.llm_providers.create(
name="OpenAI Production",
provider_type="openai", # openai, anthropic, openrouter, or azure
api_key="sk-...",
organization_id="org-...",
)
# Trigger a cost sync
client.llm_providers.sync("llm_abc123")
# Update a provider
provider = client.llm_providers.update("llm_abc123", name="OpenAI Staging")
# Delete a provider
client.llm_providers.delete("llm_abc123")
LLM projects
Projects are discovered automatically when syncing a provider. Assign them to clients to attribute costs.
# List projects for a provider
for project in client.llm_projects.list("llm_abc123"):
print(project.name, project.total_cost_cents, project.client_name)
# Assign a project to a client
project = client.llm_projects.update(
"llm_abc123", "proj_456", client_id="com_abc123"
)
LLM costs
# Get cost summary (defaults to last 7 days)
costs = client.llm_costs.summary()
print(f"Total: ${costs.data.summary.total_cost_cents / 100:.2f}")
print(f"Tokens: {costs.data.summary.total_tokens:,}")
# Breakdown by provider
for p in costs.data.providers:
print(f" {p.name}: ${p.cost_cents / 100:.2f}")
# Breakdown by model
for m in costs.data.models:
print(f" {m.model}: ${m.cost_cents / 100:.2f}")
# Daily trend
for day in costs.data.daily:
print(f" {day.date}: ${day.cost_cents / 100:.2f}")
# Custom date range
costs = client.llm_costs.summary(
start_date="2025-01-01",
end_date="2025-01-31",
)
# Costs by client
for entry in client.llm_costs.by_client().data:
print(f"{entry.name}: ${entry.cost_cents / 100:.2f}")
# Costs by provider
for entry in client.llm_costs.by_provider().data:
print(f"{entry.name}: ${entry.cost_cents / 100:.2f}")
Pagination
All .list() methods return an iterator that handles pagination automatically. By default, the API returns 25 items per page (max 100).
# Auto-paginate through all results
for c in client.clients.list():
print(c.name)
# Control page size
for c in client.clients.list(per_page=100):
print(c.name)
# Get a single page
page = client.clients.list(per_page=10).first_page()
print(page.meta.total, page.meta.total_pages)
for c in page:
print(c.name)
Async iteration works the same way:
async for c in async_client.clients.list():
print(c.name)
Error handling
The SDK raises typed exceptions for all API errors:
from administrate import (
Administrate,
APIError,
AuthenticationError,
NotFoundError,
RateLimitError,
ValidationError,
)
client = Administrate(api_key="sk_live_...")
try:
c = client.clients.get("com_nonexistent")
except NotFoundError as e:
print(f"Not found: {e.message}")
except AuthenticationError:
print("Invalid API key")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
except ValidationError as e:
print(f"Invalid params: {e.body}")
except APIError as e:
print(f"API error {e.status_code}: {e.message}")
Exception hierarchy:
| Exception | Status code | Description |
|---|---|---|
AdministrateError |
— | Base exception for all SDK errors |
APIError |
Any non-2xx | Base for all HTTP API errors |
AuthenticationError |
401 | Invalid or missing API key |
PermissionDeniedError |
403 | Insufficient token permissions |
NotFoundError |
404 | Resource does not exist |
ValidationError |
422 | Invalid request parameters |
RateLimitError |
429 | Rate limit exceeded (has retry_after) |
InternalServerError |
5xx | Server-side error |
ConnectionError |
— | Failed to connect to the API |
TimeoutError |
— | Request timed out |
All APIError subclasses expose status_code, response (the raw httpx.Response), and body (parsed JSON or text).
Retries
The SDK automatically retries failed requests with exponential backoff:
- 429 (rate limited) — Retries after the duration specified in the
Retry-Afterheader - 5xx (server errors) — Retries with exponential backoff (0.5s, 1s, 2s, ...)
- Connection errors and timeouts — Retried with the same backoff schedule
By default, the SDK retries up to 3 times. Set max_retries=0 to disable:
client = Administrate(api_key="sk_live_...", max_retries=0)
Requirements
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file administrate-0.1.0.tar.gz.
File metadata
- Download URL: administrate-0.1.0.tar.gz
- Upload date:
- Size: 23.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c3b1ce6a9f5ebd36419738a845f589215c3e7b3099c1da61e2de2d6289dcfaef
|
|
| MD5 |
36abf0a60879c6371381ce23251c0250
|
|
| BLAKE2b-256 |
7c44fb13116546a69152f27609aafb31af785931dcdaffc698a86204808bad21
|
File details
Details for the file administrate-0.1.0-py3-none-any.whl.
File metadata
- Download URL: administrate-0.1.0-py3-none-any.whl
- Upload date:
- Size: 31.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e28b4378d9f85c72fb2e9ab6008acf26952e5597d474d60bc86942bd8e81695b
|
|
| MD5 |
bc91281538877b965559b07d03c61373
|
|
| BLAKE2b-256 |
471dd8fefe0f85fbbcb0a70adefcce6e35af87a302eb2b712d6d274d05a94534
|