Automio API
Project description
RobotnikAI Python Client - API Documentation
A Python client for RobotnikAI API that enables seamless integration with various APIs and services.
For development documentation (versioning, publishing, etc.), see README.dev.md
Installation
pip install robotnikai
Quick Start
Environment Variables
Create a .env file in your project root for configuration:
# Cache Service (when available)
CACHE_SERVICE_URL=https://cache.robotnikai.com
CACHE_SERVICE_TOKEN=your_cache_token
# Redis Configuration (for local development)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=optional_password
# RobotnikAI API Configuration
API_BASE_URL=https://robotnikai.com
APP_TOKEN=your_api_token
APP_ID=your_app_id
ACTION_ID=your_action_id
# Task Management
TASK_ID=optional_custom_task_id
Usage Example
from robotnikai.wrapper import API
api = API()
Core Methods
GraphQL (Tables API)
Use GraphQLRequest together with api.tables.apps_graphql_query(...) to run queries and mutations against dynamic app tables (Hasura-backed). This is how the trigger scripts operate:
- Queries are scoped by
APP_IDand table name (e.g.,app_{APP_ID}_orders). - Variables are passed via
GraphQLRequest(query=..., variables=...). - Responses include
.dataand.errors; always check.errorsbefore using data. - Mutations use
insert_*orupdate_*withwhereclauses and_setupdates. - Optional
organization_idis injected into thewhereclause for multi-tenant safety.
✅ Example: Query a single order by orderId
from typing import Any, Dict
from robotnikai import GraphQLRequest
from robotnikai.wrapper import API
api = API()
APP_ID = 130
ORDERS_TABLE_NAME = "orders"
order_query = f"""query GetOrder($order_id: String!, $organization_id: Int) {{
app_{APP_ID}_{ORDERS_TABLE_NAME}(
where: {{orderId: {{_eq: $order_id}}, organization_id: {{_eq: $organization_id}}}},
limit: 1
) {{
id
orderId
buyerLogin
}}
}}"""
variables: Dict[str, Any] = {"order_id": "eba6bc90-faab-11f0-96f9-c518c5ba21fb", "organization_id": 2}
request = GraphQLRequest(query=order_query, variables=variables)
response = api.tables.apps_graphql_query(
app=APP_ID, table=ORDERS_TABLE_NAME, graph_ql_request=request
)
if response.errors:
print(f"Order fetch errors: {response.errors}")
else:
records = response.data.get(f"app_{APP_ID}_{ORDERS_TABLE_NAME}") or []
print(records[0] if records else "Order not found")
✅ Example: Update delivered flag by orderId
from typing import Any, Dict
from robotnikai import GraphQLRequest
from robotnikai.wrapper import API
api = API()
APP_ID = 130
ORDERS_TABLE_NAME = "orders"
mutation = f"""mutation UpdateOrderDelivered($order_id: String!, $delivered: Boolean!, $organization_id: Int) {{
update_app_{APP_ID}_{ORDERS_TABLE_NAME}(
where: {{orderId: {{_eq: $order_id}}, organization_id: {{_eq: $organization_id}}}},
_set: {{delivered: $delivered}}
) {{
affected_rows
}}
}}"""
variables: Dict[str, Any] = {"order_id": "eba6bc90-faab-11f0-96f9-c518c5ba21fb", "delivered": True, "organization_id": 2}
request = GraphQLRequest(query=mutation, variables=variables)
response = api.tables.apps_graphql_query(
app=APP_ID, table=ORDERS_TABLE_NAME, graph_ql_request=request
)
if response.errors:
print(f"Order update errors: {response.errors}")
✅ Example: Insert a message linked to an order
from typing import Any, Dict
from robotnikai import GraphQLRequest
from robotnikai.wrapper import API
api = API()
APP_ID = 130
MESSAGES_TABLE_NAME = "messages"
message_mutation = f"""mutation InsertMessage($data: app_{APP_ID}_{MESSAGES_TABLE_NAME}_insert_input!) {{
insert_app_{APP_ID}_{MESSAGES_TABLE_NAME}_one(object: $data) {{
order_id
}}
}}"""
send_message_payload: Dict[str, Any] = {
"order_id": "c9a1d25a-fcf3-4950-a660-2f5b04a25320", # foreign key id
"messageContent": "Hello, user",
"organization_id": 2,
}
request = GraphQLRequest(query=message_mutation, variables={"data": send_message_payload})
response = api.tables.apps_graphql_query(
app=APP_ID, table=MESSAGES_TABLE_NAME, graph_ql_request=request
)
if response.errors:
print(f"Message insert errors: {response.errors}")
else:
print(response.data)
Integration API Calls
api.integrations.call()
Make synchronous API calls to integrated services.
# Basic API call
integration = api.integrations.get_integration("github")
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/user"
)
# With parameters and selected account
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/sale/offers",
params={"limit": 10, "offset": 0},
connection_id="ecommercelab@yandex.com"
)
# POST with JSON payload
data, response = api.integrations.call(
integration,
method="POST",
endpoint="/messages/submit",
json={
"from": {"name": "John Doe", "address": "john@example.com"},
"to": [{"name": "Jane Doe", "address": "jane@example.com"}],
"subject": "Test Email",
"text": "Hello from Python client!"
},
connection_id="user@domain.com"
)
Returns: (data, response) tuple where:
data: Parsed JSON response dataresponse: HTTP response object with.ok,.status_code,.text,.headers,.urlattributes
Parallel Methods Return: List[ParallelCallResponse] objects where each response contains:
.api_data: Parsed JSON response data from the external service.api_response: HTTP response object (same as above).connection_id: ID of the connection used for this request.organization_id: Organization ID associated with the connection
Parallel API Calls
api.integrations.parallel_call()
Execute multiple API calls concurrently using different connections per request for maximum performance and flexibility.
# Multi-connection parallel calls (recommended for scaling across accounts)
responses = api.integrations.parallel_call(
integration,
method="GET",
endpoint="/sale/product-offers/{offerId}",
data_list=[
{
"connection_id": 123,
"organization_id": 456,
"url_params": {"offerId": "offer_1"},
"params": {"include": "details"}
},
{
"connection_id": 789,
"organization_id": 101,
"url_params": {"offerId": "offer_2"},
"data": {"expand": True}
}
],
table="app_123_products"
)
# Process results with new object-based API
for response in responses:
print(f"Connection {response.connection_id}, Org {response.organization_id}")
if response.api_response.ok:
print(f"Offer: {response.api_data['name']}")
print(f"Status: {response.api_response.status_code}")
else:
print(f"Error: {response.api_response.text}")
api.integrations.parallel_call_for_connection()
Execute multiple API calls in parallel using a single connection (simpler for single-account operations).
# Single-connection parallel calls
responses = api.integrations.parallel_call_for_connection(
integration,
method="GET",
endpoint="/sale/product-offers/{offerId}",
data_list=[
{"url_params": {"offerId": "123"}},
{"url_params": {"offerId": "456"}},
{"url_params": {"offerId": "789"}}
],
connection_id="user@domain.com",
organization_id=123,
table="app_124_offers"
)
# Process results
for response in responses:
if response.api_response.ok:
print(f"Offer: {response.api_data['name']}")
else:
print(f"Error: {response.api_response.text}")
api.integrations.parallel_call_stream()
Stream parallel API calls for real-time processing and progress visibility.
# Stream parallel calls with real-time processing (single connection)
for result in api.integrations.parallel_call_stream(
integration,
method="GET",
endpoint="/sale/product-offers/{offerId}",
data_list=[{"url_params": {"offerId": offer["id"]}} for offer in offers],
connection_id=connection_id
):
if result.get("final"):
print(f" Completed! Processed {result['total_processed']} requests")
break
else:
# Process individual response as it arrives
data = result["data"]
response = result["response"]
index = result["index"]
if response.ok:
# Process immediately without waiting for all requests
processed_data = process_data(data)
save_to_database(processed_data)
else:
print(f"❌ Error for request {index}: {response.text}")
Connection Management
api.integrations.get_connections()
Get all connected accounts for an integration. Returns list of connections with id and name.
# Get all connected accounts
connections = api.integrations.get_connections("allegro_sandbox")
for connection in connections:
print(f"ID: {connection.id}, Name: {connection.name}")
# Use connection ID in API calls
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/sale/offers",
connection_id=connection.id # Use the connection ID in API calls
)
Caching System
App Cache
Shared cache accessible across the entire application. Access by key only for users within the same organization.
# Set cache with TTL
api.app_cache.set(
key="api_data",
value={"rates": [1.2, 1.5, 1.8]},
ttl=300 # 5 minutes
)
# Get cached data
cached_data = api.app_cache.get(key="api_data")
print(cached_data) # {'rates': [1.2, 1.5, 1.8]}
# Delete cache entry
api.app_cache.delete(key="api_data")
User Cache
User-specific cache for personalized data. It is assigned to user's organization and can be shared between apps.
# Set user-specific cache
api.user_cache.set(
key="my_key",
value="my_value",
ttl=3600 # 1 hour
)
# Get user cache
preferences = api.user_cache.get(key="my_key")
# Delete user cache
api.user_cache.delete(key="my_key")
Notifications
api.notify_me()
Send notifications (via email) to yourself (useful for monitoring and alerts).
# Send notification
api.notify_me(
subject="API Process Completed",
text="Successfully processed 1000 records",
html="<h1>Success!</h1><p>Processed <strong>1000</strong> records</p>"
)
Task Progress Tracking
api.task.set_progress(progress: int, info: str, status: 'pending' | 'completed' | 'failed')
Update task progress for long-running operations.
progress and info are displayed in the UI during task execution, while status indicates the task state.
# Initialize progress
api.task.set_progress(0, "Starting data processing...", "pending")
# Update progress throughout your task
for i, item in enumerate(large_dataset):
# Process item
process_item(item)
# Update progress every 100 items
if i % 100 == 0:
progress = int((i / len(large_dataset)) * 100)
api.task.set_progress(
progress,
f"Processed {i}/{len(large_dataset)} items",
"pending"
)
# Complete the task
api.task.set_progress(100, "Processing completed!", "completed")
Configuration & Environment
Cache & Progress Service Configuration
The client automatically detects and adapts to your environment:
RobotnikAI Sandbox (Automatic)
When running in RobotnikAI Sandbox:
- Uses the hosted cache service API
TASK_IDis automatically assigned by the platform- Progress tracking is displayed in the RobotnikAI UI
- Cache is shared across the organization
Local Development (Redis)
When running locally without CACHE_SERVICE_URL:
- Automatically uses direct Redis connection
TASK_IDis generated as a random UUID4- Progress is logged to console
- Requires Redis server running locally
Required Dependencies
For local Redis usage, install the Redis client:
pip install redis
Usage Impact
From the user perspective, there are no code changes required. The client automatically:
- Detects environment and chooses appropriate backend
- Maintains consistent API across both configurations
- Handles serialization/deserialization transparently
- Provides same response format regardless of backend
# This code works identically in both environments
api.app_cache.set("my_key", {"data": "value"}, ttl=300)
cached_data = api.app_cache.get("my_key")
api.task.set_progress(50, "Halfway done", "pending")
Environment Detection Logic
# The client automatically determines configuration:
if CACHE_SERVICE_URL:
# Use RobotnikAI cache service API
# - HTTP requests to cache service
# - Progress updates sent to platform
# - Task ID from environment or auto-generated
else:
# Use local Redis connection
# - Direct Redis operations
# - Progress logged to console
# - UUID4 generated for task tracking
Integration Management
api.integrations.get_integration()
Get integration configuration for API calls.
# Get integration
integration = api.integrations.get_integration("allegro_sandbox")
# Now use this integration object in .call() methods
api.integrations.get_integrations()
List all available integrations.
integrations = api.integrations.get_integrations()
for integration in integrations.results:
print(f"ID: {integration.integration_id}, Name: {integration.name}")
Common Patterns
1. Parallel Data Processing
# Efficient parallel processing pattern
integration = api.integrations.get_integration("api_service")
# Step 1: Get list of items
list_data, response = api.integrations.call(
integration, method="GET", endpoint="/items",
params={"limit": 50}, connection_id="user@domain.com"
)
# Step 2: Process details in parallel (single connection)
detail_requests = [
{"url_params": {"id": item["id"]}}
for item in list_data["items"]
]
responses = api.integrations.parallel_call_for_connection(
integration,
method="GET",
endpoint="/items/{id}/details",
data_list=detail_requests,
connection_id="user@domain.com",
organization_id=123
)
# Step 3: Process results with new object API
processed_data = []
for response in responses:
if response.api_response.ok:
processed_data.append(transform_data(response.api_data))
else:
print(f"Error for connection {response.connection_id}: {response.api_response.text}")
2. Multi-Connection Parallel Processing
# Advanced: Parallel processing across multiple connections simultaneously
integration = api.integrations.get_integration("api_service")
# Get all connections for the service
table_connections = api.integrations.all_connections(integration, "data_table")
# Build parallel requests for multiple connections
data_list = []
for org_id, connections in table_connections.items():
for connection in connections:
data_list.append({
"connection_id": connection["id"],
"organization_id": org_id,
"params": {"limit": 50},
"url_params": {"account_id": connection["id"]}
})
# Execute all requests in parallel across all connections
responses = api.integrations.parallel_call(
integration,
method="GET",
endpoint="/accounts/{account_id}/data",
data_list=data_list,
table="app_123_consolidated_data"
)
# Process results with full traceability
for response in responses:
print(f"Org {response.organization_id}, Connection {response.connection_id}")
if response.api_response.ok:
save_data(response.api_data, response.connection_id, response.organization_id)
else:
log_error(response.connection_id, response.api_response.text)
3. Multi-Account Operations
# Process data across multiple connected accounts
connections = api.integrations.get_connections("service_name")
for connection in connections:
print(f"Processing account: {connection.name}")
data, response = api.integrations.call(
integration,
method="GET",
endpoint="/data",
connection_id=connection.id
)
if response.ok:
# Process account-specific data
process_account_data(data, connection.id)
3. Progress Tracking with Caching
def long_running_task():
api.task.set_progress(0, "Initializing...", "in_progress")
# Cache intermediate results
api.app_cache.set("task_checkpoint", {"processed": 0}, ttl=3600)
for i in range(1000):
# Do work
process_item(i)
# Update progress and cache
if i % 100 == 0:
progress = int((i / 1000) * 100)
api.task.set_progress(progress, f"Processed {i}/1000", "in_progress")
api.app_cache.set("task_checkpoint", {"processed": i}, ttl=3600)
api.task.set_progress(100, "Completed!", "completed")
api.app_cache.delete("task_checkpoint")
Performance Tips
- Use parallel calls for multiple API requests to the same service
- Cache frequently accessed data to reduce API calls
- Stream parallel calls for real-time processing of large datasets
- Set appropriate TTL for cached data based on update frequency
- Monitor progress for long-running tasks to improve user experience
Error Handling
Single API Calls
data, response = api.integrations.call(integration, method="GET", endpoint="/data")
if not response.ok:
print(f"API Error: {response.status_code} - {response.text}")
return
if not data:
print("No data received")
return
# Process successful response
process_data(data)
Parallel API Calls
responses = api.integrations.parallel_call(
integration, method="GET", endpoint="/data",
data_list=requests_data
)
for response in responses:
if not response.api_response.ok:
print(f"Error for connection {response.connection_id}: {response.api_response.text}")
continue
if not response.api_data:
print(f"No data received for connection {response.connection_id}")
continue
# Process successful response
process_data(response.api_data, response.connection_id)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file automio-0.1.1.tar.gz.
File metadata
- Download URL: automio-0.1.1.tar.gz
- Upload date:
- Size: 77.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b15e947e86c541cc4694076ae4d5d001eddc17a5cae2d60da831f56862d399e
|
|
| MD5 |
b795d3ece8ed100ebad3e08683560a3d
|
|
| BLAKE2b-256 |
90295dbf3da8ac49126ba63ff883b18fdcf9fbc2582f28d877bb575f74667138
|
Provenance
The following attestation bundles were made for automio-0.1.1.tar.gz:
Publisher:
release.yml on Automio-ai/automio-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
automio-0.1.1.tar.gz -
Subject digest:
3b15e947e86c541cc4694076ae4d5d001eddc17a5cae2d60da831f56862d399e - Sigstore transparency entry: 1003627454
- Sigstore integration time:
-
Permalink:
Automio-ai/automio-sdk@02c9b2042ccf137af5bd60c1eb325804c077f8a3 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/Automio-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@02c9b2042ccf137af5bd60c1eb325804c077f8a3 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file automio-0.1.1-py3-none-any.whl.
File metadata
- Download URL: automio-0.1.1-py3-none-any.whl
- Upload date:
- Size: 132.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dfec1e986dae1c741ff4ad4c62ef1f0b640d3fed4f864be32d05c58aa6dc6c95
|
|
| MD5 |
a98f7779c4600768a9e58fb71123d2d3
|
|
| BLAKE2b-256 |
b11d00428877d4853f8e19ebb90406d35460dc0112f73f802c6991409875cd3a
|
Provenance
The following attestation bundles were made for automio-0.1.1-py3-none-any.whl:
Publisher:
release.yml on Automio-ai/automio-sdk
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
automio-0.1.1-py3-none-any.whl -
Subject digest:
dfec1e986dae1c741ff4ad4c62ef1f0b640d3fed4f864be32d05c58aa6dc6c95 - Sigstore transparency entry: 1003627461
- Sigstore integration time:
-
Permalink:
Automio-ai/automio-sdk@02c9b2042ccf137af5bd60c1eb325804c077f8a3 -
Branch / Tag:
refs/heads/master - Owner: https://github.com/Automio-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@02c9b2042ccf137af5bd60c1eb325804c077f8a3 -
Trigger Event:
workflow_dispatch
-
Statement type: