Skip to main content

Automio API

Project description

Automio Python Client - API Documentation

A Python client for Automio API that enables seamless integration with various APIs and services.

For development documentation (versioning, publishing, etc.), see README.dev.md

Installation

pip install robotnikai

Quick Start

Environment Variables

Create a .env file in your project root for configuration:

# Cache Service (when available)
CACHE_SERVICE_URL=https://cache.automio.com
CACHE_SERVICE_TOKEN=your_cache_token

# Redis Configuration (for local development)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=optional_password

# Automio API Configuration
API_BASE_URL=https://automio.com
APP_TOKEN=your_api_token
APP_ID=your_app_id
ACTION_ID=your_action_id

# Task Management
TASK_ID=optional_custom_task_id

Usage Example

from automio.wrapper import API

api = API()

Core Methods

GraphQL (Tables API)

Use GraphQLRequest together with api.tables.apps_graphql_query(...) to run queries and mutations against dynamic app tables (Hasura-backed). This is how the trigger scripts operate:

  • Queries are scoped by APP_ID and table name (e.g., app_{APP_ID}_orders).
  • Variables are passed via GraphQLRequest(query=..., variables=...).
  • Responses include .data and .errors; always check .errors before using data.
  • Mutations use insert_app_{APP_ID}_{TABLE_NAME} or update_app_{APP_ID}_{TABLE_NAME} with where clauses and _set updates.
  • Optional organization_id is injected into the where clause for multi-tenant safety.

Example: Query a single order by orderId

from typing import Any, Dict
from automio import GraphQLRequest
from automio.wrapper import API

api = API()
APP_ID = 130
ORDERS_TABLE_NAME = "orders"

order_query = f"""query GetOrder($order_id: String!, $organization_id: Int) {{
    app_{APP_ID}_{ORDERS_TABLE_NAME}(
        where: {{orderId: {{_eq: $order_id}}, organization_id: {{_eq: $organization_id}}}},
        limit: 1
    ) {{
        id
        orderId
        buyerLogin
    }}
}}"""

variables: Dict[str, Any] = {"order_id": "eba6bc90-faab-11f0-96f9-c518c5ba21fb", "organization_id": 2}
request = GraphQLRequest(query=order_query, variables=variables)
response = api.tables.apps_graphql_query(
    app=APP_ID, table=ORDERS_TABLE_NAME, graph_ql_request=request
)

if response.errors:
    print(f"Order fetch errors: {response.errors}")
else:
    records = response.data.get(f"app_{APP_ID}_{ORDERS_TABLE_NAME}") or []
    print(records[0] if records else "Order not found")

Example: Update delivered flag by orderId

from typing import Any, Dict
from automio import GraphQLRequest
from automio.wrapper import API

api = API()
APP_ID = 130
ORDERS_TABLE_NAME = "orders"

mutation = f"""mutation UpdateOrderDelivered($order_id: String!, $delivered: Boolean!, $organization_id: Int) {{
    update_app_{APP_ID}_{ORDERS_TABLE_NAME}(
        where: {{orderId: {{_eq: $order_id}}, organization_id: {{_eq: $organization_id}}}},
        _set: {{delivered: $delivered}}
    ) {{
        affected_rows
    }}
}}"""

variables: Dict[str, Any] = {"order_id": "eba6bc90-faab-11f0-96f9-c518c5ba21fb", "delivered": True, "organization_id": 2}
request = GraphQLRequest(query=mutation, variables=variables)
response = api.tables.apps_graphql_query(
    app=APP_ID, table=ORDERS_TABLE_NAME, graph_ql_request=request
)

if response.errors:
    print(f"Order update errors: {response.errors}")

Example: Query without variables (simple string query)

from automio import GraphQLRequest
from automio.wrapper import API

api = API()
APP_ID = 130
TABLE_NAME = "billing_entries"

# GraphQLRequest can be used without variables for simple queries
query = f"""{{
    app_{APP_ID}_{TABLE_NAME}(
        order_by: {{ occurredAt: desc }}
        limit: 1
    ) {{
        allegroId
    }}
}}"""

response = api.tables.apps_graphql_query(
    app=APP_ID, table=TABLE_NAME, graph_ql_request=GraphQLRequest(query=query)
)
rows = response.data.get(f"app_{APP_ID}_{TABLE_NAME}", [])
print(rows[0]["allegroId"] if rows else "No records")

Example: Batch upsert with on_conflict

from typing import Any, Dict, List
from automio import GraphQLRequest
from automio.wrapper import API

api = API()
APP_ID = 130
TABLE_NAME = "billing_entries"

def upsert_batch(rows: List[Dict[str, Any]]) -> None:
    mutation = f"""mutation InsertBatch($objects: [app_{APP_ID}_{TABLE_NAME}_insert_input!]!) {{
        insert_app_{APP_ID}_{TABLE_NAME}(
            objects: $objects,
            on_conflict: {{
                constraint: uq_app_{APP_ID}_{TABLE_NAME}_allegroId_org,
                update_columns: [occurredAt, type, value, balance]
            }}
        ) {{
            affected_rows
        }}
    }}"""

    request = GraphQLRequest(query=mutation, variables={"objects": rows})
    response = api.tables.apps_graphql_query(
        app=APP_ID, table=TABLE_NAME, graph_ql_request=request
    )
    if response.errors:
        print(f"Batch upsert errors: {response.errors}")
        return
    affected = (
        response.data
        .get(f"insert_app_{APP_ID}_{TABLE_NAME}", {})
        .get("affected_rows", 0)
    )
    print(f"Saved {affected} rows")

Example: Insert a message linked to an order

from typing import Any, Dict
from automio import GraphQLRequest
from automio.wrapper import API

api = API()
APP_ID = 130
MESSAGES_TABLE_NAME = "messages"

message_mutation = f"""mutation InsertMessage($data: app_{APP_ID}_{MESSAGES_TABLE_NAME}_insert_input!) {{
    insert_app_{APP_ID}_{MESSAGES_TABLE_NAME}_one(object: $data) {{
        order_id
    }}
}}"""

send_message_payload: Dict[str, Any] = {
    "order_id": "c9a1d25a-fcf3-4950-a660-2f5b04a25320",  # foreign key id
    "messageContent": "Hello, user",
    "organization_id": 2,
}

request = GraphQLRequest(query=message_mutation, variables={"data": send_message_payload})
response = api.tables.apps_graphql_query(
    app=APP_ID, table=MESSAGES_TABLE_NAME, graph_ql_request=request
)

if response.errors:
    print(f"Message insert errors: {response.errors}")
else:
    print(response.data)

Integration API Calls

api.integrations.call()

Make synchronous API calls to integrated services.

# Basic API call (default connection)
integration = api.integrations.get_integration("github")
data, response = api.integrations.call(
    integration,
    method="GET",
    endpoint="/user"
)

# With parameters and specific connection (connection_id is an integer ID)
integration = api.integrations.get_integration("allegro_sandbox")
for connection_id in integration.connected_accounts:
    data, response = api.integrations.call(
        integration,
        method="GET",
        endpoint="/sale/offers",
        params={"limit": 10, "offset": 0},
        connection_id=connection_id
    )

# POST with JSON payload
data, response = api.integrations.call(
    integration,
    method="POST",
    endpoint="/messages/submit",
    json={
        "from": {"name": "John Doe", "address": "john@example.com"},
        "to": [{"name": "Jane Doe", "address": "jane@example.com"}],
        "subject": "Test Email",
        "text": "Hello from Python client!"
    },
    connection_id=connection_id
)

Note: connection_id is an integer ID. Use integration.connected_accounts to iterate over all connected account IDs, or api.integrations.all_connections() to get full connection objects.

Returns: (data, response) tuple where:

  • data: Parsed JSON response data
  • response: HTTP response object with .ok, .status_code, .text, .headers, .url attributes

Parallel Methods Return: List[ParallelCallResponse] objects where each response contains:

  • .api_data: Parsed JSON response data from the external service
  • .api_response: HTTP response object (same as above)
  • .connection_id: ID of the connection used for this request
  • .organization_id: Organization ID associated with the connection

Parallel API Calls

api.integrations.parallel_call()

Execute multiple API calls concurrently using different connections per request for maximum performance and flexibility.

# Multi-connection parallel calls (recommended for scaling across accounts)
responses = api.integrations.parallel_call(
    integration,
    method="GET",
    endpoint="/sale/product-offers/{offerId}",
    data_list=[
        {
            "connection_id": 123,
            "organization_id": 456,
            "url_params": {"offerId": "offer_1"},
            "params": {"include": "details"}
        },
        {
            "connection_id": 789,
            "organization_id": 101,
            "url_params": {"offerId": "offer_2"},
            "data": {"expand": True}
        }
    ],
    table="app_123_products"
)

# Process results with new object-based API
for response in responses:
    print(f"Connection {response.connection_id}, Org {response.organization_id}")
    if response.api_response.ok:
        print(f"Offer: {response.api_data['name']}")
        print(f"Status: {response.api_response.status_code}")
    else:
        print(f"Error: {response.api_response.text}")

api.integrations.parallel_call_for_connection()

Execute multiple API calls in parallel using a single connection (simpler for single-account operations).

# Single-connection parallel calls
responses = api.integrations.parallel_call_for_connection(
    integration,
    method="GET",
    endpoint="/sale/product-offers/{offerId}",
    data_list=[
        {"url_params": {"offerId": "123"}},
        {"url_params": {"offerId": "456"}},
        {"url_params": {"offerId": "789"}}
    ],
    connection_id="user@domain.com",
    organization_id=123,
    table="app_124_offers"
)

# Process results
for response in responses:
    if response.api_response.ok:
        print(f"Offer: {response.api_data['name']}")
    else:
        print(f"Error: {response.api_response.text}")

api.integrations.parallel_call_stream()

Stream parallel API calls for real-time processing and progress visibility.

# Stream parallel calls with real-time processing (single connection)
for result in api.integrations.parallel_call_stream(
    integration,
    method="GET",
    endpoint="/sale/product-offers/{offerId}",
    data_list=[{"url_params": {"offerId": offer["id"]}} for offer in offers],
    connection_id=connection_id
):

    if result.get("final"):
        print(f" Completed! Processed {result['total_processed']} requests")
        break
    else:
        # Process individual response as it arrives
        data = result["data"]
        response = result["response"]
        index = result["index"]

        if response.ok:
            # Process immediately without waiting for all requests
            processed_data = process_data(data)
            save_to_database(processed_data)
        else:
            print(f"❌ Error for request {index}: {response.text}")

Plugins

Plugins let you run shared, reusable Python code per integration - defined once in a Plugin Interface, implemented separately for each integration.

Concepts:

  • Plugin Interface - a contract identified by a namespace and a list of capabilities (function names). Think of it as an abstract class. Example: test_interface with capability foo.
  • Plugin Interface code - shared Python code available to all plugins under that interface (helpers, base functions, common constants).
  • Plugin - a concrete implementation of that interface for a specific integration (e.g., allegro_sandbox). It can override selected capabilities and reuse the interface code.

Shared Interface Code

When a Plugin Interface defines code, plugins in that interface can import it with:

from plugins.interfaces.<namespace> import <name>

Example for namespace test_interface:

from plugins.interfaces.test_interface import test_ifc

Or import everything from the interface module:

from plugins.interfaces.test_interface import *

api.plugins.get_list(namespace)

List all enabled plugins registered under a given interface namespace.

for plugin in api.plugins.get_list("test_interface"):
    print(f"Plugin ID: {plugin.id}")
    print(f"Connection IDs: {plugin.connection_ids}")

api.plugins.invoke(plugin, integration, connection_ids, **payload)

Invoke a capability from a plugin. The first argument is "namespace.capability", the second is the integration ID string.

integration = api.integrations.get_integration("allegro_sandbox")

for plugin in api.plugins.get_list("test_interface"):
    result = api.plugins.invoke(
        "test_interface.foo",        # namespace.capability
        integration.integration_id,  # integration name as string
        connection_ids=plugin.connection_ids,
    )

The plugin code (written in the platform's code editor) receives connection_ids as a keyword argument:

# Plugin code for test_interface / allegro_sandbox integration:
def foo(connection_ids: list):
    for connection_id in connection_ids:
        data, response = api.integrations.call(
            api.integrations.get_integration("allegro_sandbox"),
            method="GET",
            endpoint="/sale/offers",
            connection_id=connection_id,
        )
        print(data)

Connection Management

integration.connected_accounts

The simplest way to iterate over all connected account IDs for an integration. Returns an iterable of integer connection IDs.

integration = api.integrations.get_integration("allegro_sandbox")
for connection_id in integration.connected_accounts:
    data, response = api.integrations.call(
        integration,
        method="GET",
        endpoint="/billing/billing-entries",
        connection_id=connection_id,
        params={"limit": 100},
    )

api.integrations.all_connections()

Get all connected accounts for an integration. Returns list of connections with id and name.

# Get all connected accounts
connections = api.integrations.all_connections("allegro_sandbox", table_name)
for connection in connections:
    print(f"ID: {connection.id}, Name: {connection.name}")

    # Use connection ID in API calls
    data, response = api.integrations.call(
        integration,
        method="GET",
        endpoint="/sale/offers",
        connection_id=connection.id  # integer ID
    )

Caching System

App Cache

Shared cache accessible across the entire application. Access by key only for users within the same organization.

# Set cache with TTL
api.app_cache.set(
    key="api_data",
    value={"rates": [1.2, 1.5, 1.8]},
    ttl=300  # 5 minutes
)

# Get cached data
cached_data = api.app_cache.get(key="api_data")
print(cached_data)  # {'rates': [1.2, 1.5, 1.8]}

# Delete cache entry
api.app_cache.delete(key="api_data")

User Cache

User-specific cache for personalized data. It is assigned to user's organization and can be shared between apps.

# Set user-specific cache
api.user_cache.set(
    key="my_key",
    value="my_value",
    ttl=3600  # 1 hour
)

# Get user cache
preferences = api.user_cache.get(key="my_key")

# Delete user cache
api.user_cache.delete(key="my_key")

Notifications

api.notify_me()

Send notifications (via email) to yourself (useful for monitoring and alerts).

# Send notification
api.notify_me(
    subject="API Process Completed",
    text="Successfully processed 1000 records",
    html="<h1>Success!</h1><p>Processed <strong>1000</strong> records</p>"
)

Task Progress Tracking

api.task.set_progress(progress: int, info: str, status: 'pending' | 'completed' | 'failed')

Update task progress for long-running operations. progress and info are displayed in the UI during task execution, while status indicates the task state.

# Initialize progress
api.task.set_progress(0, "Starting data processing...", "pending")

# Update progress throughout your task
for i, item in enumerate(large_dataset):
    # Process item
    process_item(item)

    # Update progress every 100 items
    if i % 100 == 0:
        progress = int((i / len(large_dataset)) * 100)
        api.task.set_progress(
            progress,
            f"Processed {i}/{len(large_dataset)} items",
            "pending"
        )

# Complete the task
api.task.set_progress(100, "Processing completed!", "completed")

Configuration & Environment

Cache & Progress Service Configuration

The client automatically detects and adapts to your environment:

RobotnikAI Sandbox (Automatic)

When running in RobotnikAI Sandbox:

  • Uses the hosted cache service API
  • TASK_ID is automatically assigned by the platform
  • Progress tracking is displayed in the RobotnikAI UI
  • Cache is shared across the organization

Local Development (Redis)

When running locally without CACHE_SERVICE_URL:

  • Automatically uses direct Redis connection
  • TASK_ID is generated as a random UUID4
  • Progress is logged to console
  • Requires Redis server running locally

Required Dependencies

For local Redis usage, install the Redis client:

pip install redis

Usage Impact

From the user perspective, there are no code changes required. The client automatically:

  • Detects environment and chooses appropriate backend
  • Maintains consistent API across both configurations
  • Handles serialization/deserialization transparently
  • Provides same response format regardless of backend
# This code works identically in both environments
api.app_cache.set("my_key", {"data": "value"}, ttl=300)
cached_data = api.app_cache.get("my_key")
api.task.set_progress(50, "Halfway done", "pending")

Environment Detection Logic

# The client automatically determines configuration:
if CACHE_SERVICE_URL:
    # Use RobotnikAI cache service API
    # - HTTP requests to cache service
    # - Progress updates sent to platform
    # - Task ID from environment or auto-generated
else:
    # Use local Redis connection
    # - Direct Redis operations
    # - Progress logged to console
    # - UUID4 generated for task tracking

Integration Management

api.integrations.get_integration()

Get integration configuration for API calls.

# Get integration
integration = api.integrations.get_integration("allegro_sandbox")
# Now use this integration object in .call() methods

api.integrations.get_integrations()

List all available integrations.

integrations = api.integrations.get_integrations()
for integration in integrations.results:
    print(f"ID: {integration.integration_id}, Name: {integration.name}")

Common Patterns

1. Parallel Data Processing

# Efficient parallel processing pattern
integration = api.integrations.get_integration("api_service")

# Step 1: Get list of items
list_data, response = api.integrations.call(
    integration, method="GET", endpoint="/items",
    params={"limit": 50}, connection_id="user@domain.com"
)

# Step 2: Process details in parallel (single connection)
detail_requests = [
    {"url_params": {"id": item["id"]}}
    for item in list_data["items"]
]

responses = api.integrations.parallel_call_for_connection(
    integration,
    method="GET",
    endpoint="/items/{id}/details",
    data_list=detail_requests,
    connection_id="user@domain.com",
    organization_id=123
)

# Step 3: Process results with new object API
processed_data = []
for response in responses:
    if response.api_response.ok:
        processed_data.append(transform_data(response.api_data))
    else:
        print(f"Error for connection {response.connection_id}: {response.api_response.text}")

2. Multi-Connection Parallel Processing

# Advanced: Parallel processing across multiple connections simultaneously
integration = api.integrations.get_integration("api_service")

# Get all connections for the service
table_connections = api.integrations.all_connections(integration, "data_table")

# Build parallel requests for multiple connections
data_list = []
for org_id, connections in table_connections.items():
    for connection in connections:
        data_list.append({
            "connection_id": connection["id"],
            "organization_id": org_id,
            "params": {"limit": 50},
            "url_params": {"account_id": connection["id"]}
        })

# Execute all requests in parallel across all connections
responses = api.integrations.parallel_call(
    integration,
    method="GET",
    endpoint="/accounts/{account_id}/data",
    data_list=data_list,
    table="app_123_consolidated_data"
)

# Process results with full traceability
for response in responses:
    print(f"Org {response.organization_id}, Connection {response.connection_id}")
    if response.api_response.ok:
        save_data(response.api_data, response.connection_id, response.organization_id)
    else:
        log_error(response.connection_id, response.api_response.text)

3. Multi-Account Operations

# Process data across multiple connected accounts
connections = api.integrations.all_connections("service_name")

for connection in connections:
    print(f"Processing account: {connection.name}")

    data, response = api.integrations.call(
        integration,
        method="GET",
        endpoint="/data",
        connection_id=connection.id
    )

    if response.ok:
        # Process account-specific data
        process_account_data(data, connection.id)

3. Progress Tracking with Caching

def long_running_task():
    api.task.set_progress(0, "Initializing...", "in_progress")

    # Cache intermediate results
    api.app_cache.set("task_checkpoint", {"processed": 0}, ttl=3600)

    for i in range(1000):
        # Do work
        process_item(i)

        # Update progress and cache
        if i % 100 == 0:
            progress = int((i / 1000) * 100)
            api.task.set_progress(progress, f"Processed {i}/1000", "in_progress")
            api.app_cache.set("task_checkpoint", {"processed": i}, ttl=3600)

    api.task.set_progress(100, "Completed!", "completed")
    api.app_cache.delete("task_checkpoint")

Performance Tips

  • Use parallel calls for multiple API requests to the same service
  • Cache frequently accessed data to reduce API calls
  • Stream parallel calls for real-time processing of large datasets
  • Set appropriate TTL for cached data based on update frequency
  • Monitor progress for long-running tasks to improve user experience

Error Handling

Single API Calls

data, response = api.integrations.call(integration, method="GET", endpoint="/data")

if not response.ok:
    print(f"API Error: {response.status_code} - {response.text}")
    return

if not data:
    print("No data received")
    return

# Process successful response
process_data(data)

Parallel API Calls

responses = api.integrations.parallel_call(
    integration, method="GET", endpoint="/data",
    data_list=requests_data
)

for response in responses:
    if not response.api_response.ok:
        print(f"Error for connection {response.connection_id}: {response.api_response.text}")
        continue

    if not response.api_data:
        print(f"No data received for connection {response.connection_id}")
        continue

    # Process successful response
    process_data(response.api_data, response.connection_id)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

automio-0.1.6.tar.gz (74.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

automio-0.1.6-py3-none-any.whl (120.0 kB view details)

Uploaded Python 3

File details

Details for the file automio-0.1.6.tar.gz.

File metadata

  • Download URL: automio-0.1.6.tar.gz
  • Upload date:
  • Size: 74.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for automio-0.1.6.tar.gz
Algorithm Hash digest
SHA256 2ff374436e4247806e2396b1c8f609546a99d719000cd91f6f4b9343668e579c
MD5 109ba44f008a2fa37bd7d3d6bb18ced4
BLAKE2b-256 47febc5313c73be2f6dbb2ae4d26cb82a70a0264bee68aa3c3c04d48f9b1f180

See more details on using hashes here.

Provenance

The following attestation bundles were made for automio-0.1.6.tar.gz:

Publisher: release.yml on Automio-ai/automio-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file automio-0.1.6-py3-none-any.whl.

File metadata

  • Download URL: automio-0.1.6-py3-none-any.whl
  • Upload date:
  • Size: 120.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for automio-0.1.6-py3-none-any.whl
Algorithm Hash digest
SHA256 5627a0bf5729a815443c708639c67893f168bc205458200b9ddaa0ae65aec029
MD5 0f10ecc3405ac3b9f159cbe731a0ea50
BLAKE2b-256 f110852d71e43d1e6616875d70088ee9a2067774f748165d1f7c0bcca5a4f4cb

See more details on using hashes here.

Provenance

The following attestation bundles were made for automio-0.1.6-py3-none-any.whl:

Publisher: release.yml on Automio-ai/automio-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page