Automio API
Project description
RobotnikAI Python Client - API Documentation
A Python client for RobotnikAI API that enables seamless integration with various APIs and services.
For development documentation (versioning, publishing, etc.), see README.dev.md
Installation
pip install robotnikai
Quick Start
Environment Variables
Create a .env file in your project root for configuration:
# Cache Service (when available)
CACHE_SERVICE_URL=https://cache.robotnikai.com
CACHE_SERVICE_TOKEN=your_cache_token
# Redis Configuration (for local development)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=optional_password
# RobotnikAI API Configuration
API_BASE_URL=https://robotnikai.com
APP_TOKEN=your_api_token
APP_ID=your_app_id
ACTION_ID=your_action_id
# Task Management
TASK_ID=optional_custom_task_id
Usage Example
from automio.wrapper import API
api = API()
Core Methods
GraphQL (Tables API)
Use GraphQLRequest together with api.tables.apps_graphql_query(...) to run queries and mutations against dynamic app tables (Hasura-backed). This is how the trigger scripts operate:
- Queries are scoped by
APP_IDand table name (e.g.,app_{APP_ID}_orders). - Variables are passed via
GraphQLRequest(query=..., variables=...). - Responses include
.dataand.errors; always check.errorsbefore using data. - Mutations use
insert_*orupdate_*withwhereclauses and_setupdates. - Optional
organization_idis injected into thewhereclause for multi-tenant safety.
Example: Query a single order by orderId
from typing import Any, Dict
from automio import GraphQLRequest
from automio.wrapper import API
api = API()
APP_ID = 130
ORDERS_TABLE_NAME = "orders"
order_query = f"""query GetOrder($order_id: String!, $organization_id: Int) {{
app_{APP_ID}_{ORDERS_TABLE_NAME}(
where: {{orderId: {{_eq: $order_id}}, organization_id: {{_eq: $organization_id}}}},
limit: 1
) {{
id
orderId
buyerLogin
}}
}}"""
variables: Dict[str, Any] = {"order_id": "eba6bc90-faab-11f0-96f9-c518c5ba21fb", "organization_id": 2}
request = GraphQLRequest(query=order_query, variables=variables)
response = api.tables.apps_graphql_query(
app=APP_ID, table=ORDERS_TABLE_NAME, graph_ql_request=request
)
if response.errors:
print(f"Order fetch errors: {response.errors}")
else:
records = response.data.get(f"app_{APP_ID}_{ORDERS_TABLE_NAME}") or []
print(records[0] if records else "Order not found")
Example: Update delivered flag by orderId
from typing import Any, Dict
from automio import GraphQLRequest
from automio.wrapper import API
api = API()
APP_ID = 130
ORDERS_TABLE_NAME = "orders"
mutation = f"""mutation UpdateOrderDelivered($order_id: String!, $delivered: Boolean!, $organization_id: Int) {{
update_app_{APP_ID}_{ORDERS_TABLE_NAME}(
where: {{orderId: {{_eq: $order_id}}, organization_id: {{_eq: $organization_id}}}},
_set: {{delivered: $delivered}}
) {{
affected_rows
}}
}}"""
variables: Dict[str, Any] = {"order_id": "eba6bc90-faab-11f0-96f9-c518c5ba21fb", "delivered": True, "organization_id": 2}
request = GraphQLRequest(query=mutation, variables=variables)
response = api.tables.apps_graphql_query(
app=APP_ID, table=ORDERS_TABLE_NAME, graph_ql_request=request
)
if response.errors:
print(f"Order update errors: {response.errors}")
Example: Query without variables (simple string query)
from automio import GraphQLRequest
from automio.wrapper import API
api = API()
APP_ID = 130
TABLE_NAME = "billing_entries"
# GraphQLRequest can be used without variables for simple queries
query = f"""{{
app_{APP_ID}_{TABLE_NAME}(
order_by: {{ occurredAt: desc }}
limit: 1
) {{
allegroId
}}
}}"""
response = api.tables.apps_graphql_query(
app=APP_ID, table=TABLE_NAME, graph_ql_request=GraphQLRequest(query=query)
)
rows = response.data.get(f"app_{APP_ID}_{TABLE_NAME}", [])
print(rows[0]["allegroId"] if rows else "No records")
Example: Batch upsert with on_conflict
from typing import Any, Dict, List
from automio import GraphQLRequest
from automio.wrapper import API
api = API()
APP_ID = 130
TABLE_NAME = "billing_entries"
def upsert_batch(rows: List[Dict[str, Any]]) -> None:
mutation = f"""mutation InsertBatch($objects: [app_{APP_ID}_{TABLE_NAME}_insert_input!]!) {{
insert_app_{APP_ID}_{TABLE_NAME}(
objects: $objects,
on_conflict: {{
constraint: uq_app_{APP_ID}_{TABLE_NAME}_allegroId_org,
update_columns: [occurredAt, type, value, balance]
}}
) {{
affected_rows
}}
}}"""
request = GraphQLRequest(query=mutation, variables={"objects": rows})
response = api.tables.apps_graphql_query(
app=APP_ID, table=TABLE_NAME, graph_ql_request=request
)
if response.errors:
print(f"Batch upsert errors: {response.errors}")
return
affected = (
response.data
.get(f"insert_app_{APP_ID}_{TABLE_NAME}", {})
.get("affected_rows", 0)
)
print(f"Saved {affected} rows")
Example: Insert a message linked to an order
from typing import Any, Dict
from automio import GraphQLRequest
from automio.wrapper import API
api = API()
APP_ID = 130
MESSAGES_TABLE_NAME = "messages"
message_mutation = f"""mutation InsertMessage($data: app_{APP_ID}_{MESSAGES_TABLE_NAME}_insert_input!) {{
insert_app_{APP_ID}_{MESSAGES_TABLE_NAME}_one(object: $data) {{
order_id
}}
}}"""
send_message_payload: Dict[str, Any] = {
"order_id": "c9a1d25a-fcf3-4950-a660-2f5b04a25320", # foreign key id
"messageContent": "Hello, user",
"organization_id": 2,
}
request = GraphQLRequest(query=message_mutation, variables={"data": send_message_payload})
response = api.tables.apps_graphql_query(
app=APP_ID, table=MESSAGES_TABLE_NAME, graph_ql_request=request
)
if response.errors:
print(f"Message insert errors: {response.errors}")
else:
print(response.data)
Integration API Calls
api.integrations.call()
Make synchronous API calls to integrated services.
# Basic API call (default connection)
integration = api.integrations.get_integration("github")
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/user"
)
# With parameters and specific connection (connection_id is an integer ID)
integration = api.integrations.get_integration("allegro_sandbox")
for connection_id in integration.connected_accounts:
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/sale/offers",
params={"limit": 10, "offset": 0},
connection_id=connection_id
)
# POST with JSON payload
data, response = api.integrations.call(
integration,
method="POST",
endpoint="/messages/submit",
json={
"from": {"name": "John Doe", "address": "john@example.com"},
"to": [{"name": "Jane Doe", "address": "jane@example.com"}],
"subject": "Test Email",
"text": "Hello from Python client!"
},
connection_id=connection_id
)
Note:
connection_idis an integer ID. Useintegration.connected_accountsto iterate over all connected account IDs, orapi.integrations.get_connections()to get full connection objects.
Returns: (data, response) tuple where:
data: Parsed JSON response dataresponse: HTTP response object with.ok,.status_code,.text,.headers,.urlattributes
Parallel Methods Return: List[ParallelCallResponse] objects where each response contains:
.api_data: Parsed JSON response data from the external service.api_response: HTTP response object (same as above).connection_id: ID of the connection used for this request.organization_id: Organization ID associated with the connection
Parallel API Calls
api.integrations.parallel_call()
Execute multiple API calls concurrently using different connections per request for maximum performance and flexibility.
# Multi-connection parallel calls (recommended for scaling across accounts)
responses = api.integrations.parallel_call(
integration,
method="GET",
endpoint="/sale/product-offers/{offerId}",
data_list=[
{
"connection_id": 123,
"organization_id": 456,
"url_params": {"offerId": "offer_1"},
"params": {"include": "details"}
},
{
"connection_id": 789,
"organization_id": 101,
"url_params": {"offerId": "offer_2"},
"data": {"expand": True}
}
],
table="app_123_products"
)
# Process results with new object-based API
for response in responses:
print(f"Connection {response.connection_id}, Org {response.organization_id}")
if response.api_response.ok:
print(f"Offer: {response.api_data['name']}")
print(f"Status: {response.api_response.status_code}")
else:
print(f"Error: {response.api_response.text}")
api.integrations.parallel_call_for_connection()
Execute multiple API calls in parallel using a single connection (simpler for single-account operations).
# Single-connection parallel calls
responses = api.integrations.parallel_call_for_connection(
integration,
method="GET",
endpoint="/sale/product-offers/{offerId}",
data_list=[
{"url_params": {"offerId": "123"}},
{"url_params": {"offerId": "456"}},
{"url_params": {"offerId": "789"}}
],
connection_id="user@domain.com",
organization_id=123,
table="app_124_offers"
)
# Process results
for response in responses:
if response.api_response.ok:
print(f"Offer: {response.api_data['name']}")
else:
print(f"Error: {response.api_response.text}")
api.integrations.parallel_call_stream()
Stream parallel API calls for real-time processing and progress visibility.
# Stream parallel calls with real-time processing (single connection)
for result in api.integrations.parallel_call_stream(
integration,
method="GET",
endpoint="/sale/product-offers/{offerId}",
data_list=[{"url_params": {"offerId": offer["id"]}} for offer in offers],
connection_id=connection_id
):
if result.get("final"):
print(f" Completed! Processed {result['total_processed']} requests")
break
else:
# Process individual response as it arrives
data = result["data"]
response = result["response"]
index = result["index"]
if response.ok:
# Process immediately without waiting for all requests
processed_data = process_data(data)
save_to_database(processed_data)
else:
print(f"❌ Error for request {index}: {response.text}")
Plugins
Plugins let you run shared, reusable Python code per integration - defined once in a Plugin Interface, implemented separately for each integration.
Concepts:
- Plugin Interface - a contract identified by a
namespaceand a list ofcapabilities(function names). Think of it as an abstract class. Example:test_interfacewith capabilityfoo. - Plugin Interface code - shared Python code available to all plugins under that interface (helpers, base functions, common constants).
- Plugin - a concrete implementation of that interface for a specific integration (e.g.,
allegro_sandbox). It can override selected capabilities and reuse the interface code.
Shared Interface Code
When a Plugin Interface defines code, plugins in that interface can import it with:
from plugins.interfaces.<namespace> import <name>
Example for namespace test_interface:
from plugins.interfaces.test_interface import test_ifc
Or import everything from the interface module:
from plugins.interfaces.test_interface import *
api.plugins.get_list(namespace)
List all enabled plugins registered under a given interface namespace.
for plugin in api.plugins.get_list("test_interface"):
print(f"Plugin ID: {plugin.id}")
print(f"Connection IDs: {plugin.connection_ids}")
api.plugins.invoke(plugin, integration, connection_ids, **payload)
Invoke a capability from a plugin. The first argument is "namespace.capability", the second is the integration ID string.
integration = api.integrations.get_integration("allegro_sandbox")
for plugin in api.plugins.get_list("test_interface"):
result = api.plugins.invoke(
"test_interface.foo", # namespace.capability
integration.integration_id, # integration name as string
connection_ids=plugin.connection_ids,
)
The plugin code (written in the platform's code editor) receives connection_ids as a keyword argument:
# Plugin code for test_interface / allegro_sandbox integration:
def foo(connection_ids: list):
for connection_id in connection_ids:
data, response = api.integrations.call(
api.integrations.get_integration("allegro_sandbox"),
method="GET",
endpoint="/sale/offers",
connection_id=connection_id,
)
print(data)
Connection Management
integration.connected_accounts
The simplest way to iterate over all connected account IDs for an integration. Returns an iterable of integer connection IDs.
integration = api.integrations.get_integration("allegro_sandbox")
for connection_id in integration.connected_accounts:
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/billing/billing-entries",
connection_id=connection_id,
params={"limit": 100},
)
api.integrations.get_connections()
Get all connected accounts for an integration. Returns list of connections with id and name.
# Get all connected accounts
connections = api.integrations.get_connections("allegro_sandbox")
for connection in connections:
print(f"ID: {connection.id}, Name: {connection.name}")
# Use connection ID in API calls
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/sale/offers",
connection_id=connection.id # integer ID
)
Caching System
App Cache
Shared cache accessible across the entire application. Access by key only for users within the same organization.
# Set cache with TTL
api.app_cache.set(
key="api_data",
value={"rates": [1.2, 1.5, 1.8]},
ttl=300 # 5 minutes
)
# Get cached data
cached_data = api.app_cache.get(key="api_data")
print(cached_data) # {'rates': [1.2, 1.5, 1.8]}
# Delete cache entry
api.app_cache.delete(key="api_data")
User Cache
User-specific cache for personalized data. It is assigned to user's organization and can be shared between apps.
# Set user-specific cache
api.user_cache.set(
key="my_key",
value="my_value",
ttl=3600 # 1 hour
)
# Get user cache
preferences = api.user_cache.get(key="my_key")
# Delete user cache
api.user_cache.delete(key="my_key")
Notifications
api.notify_me()
Send notifications (via email) to yourself (useful for monitoring and alerts).
# Send notification
api.notify_me(
subject="API Process Completed",
text="Successfully processed 1000 records",
html="<h1>Success!</h1><p>Processed <strong>1000</strong> records</p>"
)
Task Progress Tracking
api.task.set_progress(progress: int, info: str, status: 'pending' | 'completed' | 'failed')
Update task progress for long-running operations.
progress and info are displayed in the UI during task execution, while status indicates the task state.
# Initialize progress
api.task.set_progress(0, "Starting data processing...", "pending")
# Update progress throughout your task
for i, item in enumerate(large_dataset):
# Process item
process_item(item)
# Update progress every 100 items
if i % 100 == 0:
progress = int((i / len(large_dataset)) * 100)
api.task.set_progress(
progress,
f"Processed {i}/{len(large_dataset)} items",
"pending"
)
# Complete the task
api.task.set_progress(100, "Processing completed!", "completed")
Configuration & Environment
Cache & Progress Service Configuration
The client automatically detects and adapts to your environment:
RobotnikAI Sandbox (Automatic)
When running in RobotnikAI Sandbox:
- Uses the hosted cache service API
TASK_IDis automatically assigned by the platform- Progress tracking is displayed in the RobotnikAI UI
- Cache is shared across the organization
Local Development (Redis)
When running locally without CACHE_SERVICE_URL:
- Automatically uses direct Redis connection
TASK_IDis generated as a random UUID4- Progress is logged to console
- Requires Redis server running locally
Required Dependencies
For local Redis usage, install the Redis client:
pip install redis
Usage Impact
From the user perspective, there are no code changes required. The client automatically:
- Detects environment and chooses appropriate backend
- Maintains consistent API across both configurations
- Handles serialization/deserialization transparently
- Provides same response format regardless of backend
# This code works identically in both environments
api.app_cache.set("my_key", {"data": "value"}, ttl=300)
cached_data = api.app_cache.get("my_key")
api.task.set_progress(50, "Halfway done", "pending")
Environment Detection Logic
# The client automatically determines configuration:
if CACHE_SERVICE_URL:
# Use RobotnikAI cache service API
# - HTTP requests to cache service
# - Progress updates sent to platform
# - Task ID from environment or auto-generated
else:
# Use local Redis connection
# - Direct Redis operations
# - Progress logged to console
# - UUID4 generated for task tracking
Integration Management
api.integrations.get_integration()
Get integration configuration for API calls.
# Get integration
integration = api.integrations.get_integration("allegro_sandbox")
# Now use this integration object in .call() methods
api.integrations.get_integrations()
List all available integrations.
integrations = api.integrations.get_integrations()
for integration in integrations.results:
print(f"ID: {integration.integration_id}, Name: {integration.name}")
Common Patterns
1. Parallel Data Processing
# Efficient parallel processing pattern
integration = api.integrations.get_integration("api_service")
# Step 1: Get list of items
list_data, response = api.integrations.call(
integration, method="GET", endpoint="/items",
params={"limit": 50}, connection_id="user@domain.com"
)
# Step 2: Process details in parallel (single connection)
detail_requests = [
{"url_params": {"id": item["id"]}}
for item in list_data["items"]
]
responses = api.integrations.parallel_call_for_connection(
integration,
method="GET",
endpoint="/items/{id}/details",
data_list=detail_requests,
connection_id="user@domain.com",
organization_id=123
)
# Step 3: Process results with new object API
processed_data = []
for response in responses:
if response.api_response.ok:
processed_data.append(transform_data(response.api_data))
else:
print(f"Error for connection {response.connection_id}: {response.api_response.text}")
2. Multi-Connection Parallel Processing
# Advanced: Parallel processing across multiple connections simultaneously
integration = api.integrations.get_integration("api_service")
# Get all connections for the service
table_connections = api.integrations.all_connections(integration, "data_table")
# Build parallel requests for multiple connections
data_list = []
for org_id, connections in table_connections.items():
for connection in connections:
data_list.append({
"connection_id": connection["id"],
"organization_id": org_id,
"params": {"limit": 50},
"url_params": {"account_id": connection["id"]}
})
# Execute all requests in parallel across all connections
responses = api.integrations.parallel_call(
integration,
method="GET",
endpoint="/accounts/{account_id}/data",
data_list=data_list,
table="app_123_consolidated_data"
)
# Process results with full traceability
for response in responses:
print(f"Org {response.organization_id}, Connection {response.connection_id}")
if response.api_response.ok:
save_data(response.api_data, response.connection_id, response.organization_id)
else:
log_error(response.connection_id, response.api_response.text)
3. Multi-Account Operations
# Process data across multiple connected accounts
connections = api.integrations.get_connections("service_name")
for connection in connections:
print(f"Processing account: {connection.name}")
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/data",
connection_id=connection.id
)
if response.ok:
# Process account-specific data
process_account_data(data, connection.id)
3. Progress Tracking with Caching
def long_running_task():
api.task.set_progress(0, "Initializing...", "in_progress")
# Cache intermediate results
api.app_cache.set("task_checkpoint", {"processed": 0}, ttl=3600)
for i in range(1000):
# Do work
process_item(i)
# Update progress and cache
if i % 100 == 0:
progress = int((i / 1000) * 100)
api.task.set_progress(progress, f"Processed {i}/1000", "in_progress")
api.app_cache.set("task_checkpoint", {"processed": i}, ttl=3600)
api.task.set_progress(100, "Completed!", "completed")
api.app_cache.delete("task_checkpoint")
Performance Tips
- Use parallel calls for multiple API requests to the same service
- Cache frequently accessed data to reduce API calls
- Stream parallel calls for real-time processing of large datasets
- Set appropriate TTL for cached data based on update frequency
- Monitor progress for long-running tasks to improve user experience
Error Handling
Single API Calls
data, response = api.integrations.call(integration, method="GET", endpoint="/data")
if not response.ok:
print(f"API Error: {response.status_code} - {response.text}")
return
if not data:
print("No data received")
return
# Process successful response
process_data(data)
Parallel API Calls
responses = api.integrations.parallel_call(
integration, method="GET", endpoint="/data",
data_list=requests_data
)
for response in responses:
if not response.api_response.ok:
print(f"Error for connection {response.connection_id}: {response.api_response.text}")
continue
if not response.api_data:
print(f"No data received for connection {response.connection_id}")
continue
# Process successful response
process_data(response.api_data, response.connection_id)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file automio-0.1.2.tar.gz.
File metadata
- Download URL: automio-0.1.2.tar.gz
- Upload date:
- Size: 74.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a7c8ad84372ac0f7de8466466215ddee03755cdc0d332066927e34e26111986f
|
|
| MD5 |
59400b98121138056bc81dbc7beb6133
|
|
| BLAKE2b-256 |
b44a159374858c97d5937626d85ae9da8fcff6501521f36447d7a0f62d36b61b
|
Provenance
The following attestation bundles were made for automio-0.1.2.tar.gz:
Publisher:
release.yml on Automio-ai/automio-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
automio-0.1.2.tar.gz -
Subject digest:
a7c8ad84372ac0f7de8466466215ddee03755cdc0d332066927e34e26111986f - Sigstore transparency entry: 1042606252
- Sigstore integration time:
-
Permalink:
Automio-ai/automio-sdk@4061bf55ee77ede31a482959a432cdd1e34469ca -
Branch / Tag:
refs/heads/master - Owner: https://github.com/Automio-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@4061bf55ee77ede31a482959a432cdd1e34469ca -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file automio-0.1.2-py3-none-any.whl.
File metadata
- Download URL: automio-0.1.2-py3-none-any.whl
- Upload date:
- Size: 119.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
09d5365567ba455b5b77be5b7158492371e8f980de9305b9ec1a2d26dcd0a43e
|
|
| MD5 |
176e4b1a4945f2e99dae6101a41f6c8a
|
|
| BLAKE2b-256 |
f3459c9da1dab51e30aee8e24e042d9d773982fb12f1529ca357264634fffdb1
|
Provenance
The following attestation bundles were made for automio-0.1.2-py3-none-any.whl:
Publisher:
release.yml on Automio-ai/automio-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
automio-0.1.2-py3-none-any.whl -
Subject digest:
09d5365567ba455b5b77be5b7158492371e8f980de9305b9ec1a2d26dcd0a43e - Sigstore transparency entry: 1042606255
- Sigstore integration time:
-
Permalink:
Automio-ai/automio-sdk@4061bf55ee77ede31a482959a432cdd1e34469ca -
Branch / Tag:
refs/heads/master - Owner: https://github.com/Automio-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@4061bf55ee77ede31a482959a432cdd1e34469ca -
Trigger Event:
workflow_dispatch
-
Statement type: