Skip to main content

Python SDK for Infino API with AWS SigV4 authentication

Project description

Infino Python SDK

Python Version License

Official Python SDK for Infino - an agentic search engine.

Infino is a governed, agentic search engine. Query Elasticsearch, OpenSearch, Snowflake, and other data sources in natural language, SQL, QueryDSL, or PromQL. Bring diverse data sources together for deeper analysis—no ETL required. Control access via fine-grained RBAC. All through one unified API.

Built for:

  • Connect: Access your data sources without data movement
  • Query: Natural language, SQL, Query DSL, and PromQL across all sources
  • Correlate: Pull together data from different sources for cross-source correlation
  • Visualize: Save chart specs server-side; execute them from notebooks, backend jobs, or your own dashboards with plot-ready {columns, rows} results
  • Govern: Fine-grained RBAC for all your data

Table of Contents

Quick Start

Installation

pip install infino-sdk

Getting Your Credentials

  1. Sign up at app.infino.ws
  2. Create a new account (accounts can only be created through the UI)
  3. Navigate to Settings → API Keys
  4. Generate your access_key and secret_key

Basic Usage

from infino_sdk import InfinoSDK

# Create SDK instance with your credentials
sdk = InfinoSDK(
    access_key="your_access_key",
    secret_key="your_secret_key",
    endpoint="https://api.infino.ws"
)

# Check connection
info = sdk.ping()
print(f"Connected: {info}")

# Query a dataset
results = sdk.query_dataset_in_querydsl("my-dataset", '{"query": {"match_all": {}}}')
print(f"Found {len(results.get('hits', {}).get('hits', []))} records")

API Reference

Complete reference of SDK methods organized by category. Click any method to jump to its code example.

Initialization & Utilities (5 methods)

Method Description
InfinoSDK(access_key, secret_key, endpoint, retry_config=None) Initialize SDK instance
InfinoSDK.new(access_key, secret_key, endpoint) Alternative constructor
InfinoSDK.new_with_retry(access_key, secret_key, endpoint, retry_config) Constructor with retry config
ping() Health check endpoint
close() Close HTTP session

Context Manager: SDK supports with statement for automatic resource cleanup (see usage).

Datasets (11 methods)

Method Description
create_dataset(dataset) Create empty dataset
delete_dataset(dataset) Delete dataset
get_datasets() List all datasets
get_dataset_metadata(dataset) Get dataset metadata (count, size, etc.)
get_dataset_schema(dataset) Get dataset field mappings
upload_json_to_dataset(dataset, payload) Upload NDJSON bulk data
upsert_to_dataset(query) Upsert via SQL INSERT/UPDATE
upload_metrics_to_dataset(dataset, payload) Upload Prometheus metrics
get_record(dataset, record_id) Get single record by ID
delete_records(dataset, query) Delete records matching query
enrich_dataset(dataset, policy) Configure dataset enrichment

Query Methods (5 methods)

Method Description
query_dataset_in_querydsl(dataset, query) Query using Elasticsearch/OpenSearch DSL
query_dataset_in_sql(query) Query using SQL (supports joins)
query_dataset_in_promql(query, dataset=None) Instant PromQL query
query_dataset_in_promql_range(query, start, end, step, dataset=None) Range PromQL query
query_source(connection_id, dataset, query) Query connected source in native DSL

Fino AI (9 methods)

Method Description
websocket_connect(path, headers=None) Connect to WebSocket for streaming
list_threads() List all conversation threads
create_thread(config) Create new thread
get_thread(thread_id) Get thread details
update_thread(thread_id, config) Update thread metadata
delete_thread(thread_id) Delete thread
add_thread_message(thread_id, message) Add message to thread
clear_thread_messages(thread_id) Clear all thread messages
send_message(payload) Send message (simplified API)

Connections (12 methods)

Method Description
get_sources() List available data source types
get_connections() List active connections
create_connection(source_type, config) Create connection
get_connection(connection_id) Get connection status
update_connection(connection_id, config) Update connection
delete_connection(connection_id) Delete connection
get_source_metadata(connection_id, dataset) Get metadata from source
upload_file(dataset, file_path, format, ...) Upload file (JSON, JSONL, CSV)
get_connector_job_status(run_id) Get async job status
create_import_job(source_type, config) Create import job
get_import_jobs() List import jobs
delete_import_job(job_id) Delete import job

Visualizations & Dashboards (13 methods)

Method Description
create_visualization(spec) Create a chart spec (raw SQL or Builder mode)
get_visualization(viz_id) Fetch one — full spec lives in attributes
list_visualizations(limit=None, offset=None) Paginated listing
update_visualization(viz_id, partial) JSON Merge Patch — send only what changes
delete_visualization(viz_id) Delete by id
execute_visualization(viz_id, filters=None, time_range=None) Run the saved SQL and return {columns, rows, metadata}
to_echarts_option(viz, data) Pure function — turn viz + data into ECharts JSON (also handles table and metric shapes)
create_dashboard(spec) Group panels into a dashboard with auto-flow or explicit {x, y, w, h} layout
get_dashboard(dashboard_id) Fetch one
list_dashboards(limit=None, offset=None) Paginated listing
update_dashboard(dashboard_id, partial) JSON Merge Patch — panels list replaced wholesale
delete_dashboard(dashboard_id) Delete by id
execute_dashboard(dashboard_id, max_workers=16, filters=None, time_range=None) Parallel fanout — one call for an N-panel dashboard, per-panel error isolation

RBAC & Governance (11 methods)

Method Description
create_user(name, config) Create user (YAML or JSON)
get_user(name) Get user details
update_user(name, config) Update user
delete_user(name) Delete user
list_users() List all users
create_role(name, config) Create role (YAML or JSON)
get_role(name) Get role details
update_role(name, config) Update role permissions
delete_role(name) Delete role
list_roles() List all roles
rotate_keys(username) Rotate API keys for user

Connect – Access Data Sources

Connect to data sources and query them in place without data movement. For supported connectors and required configs, see Connectors.

Discover Available Sources

from infino_sdk import InfinoSDK

sdk = InfinoSDK(access_key, secret_key, endpoint)

# Get list of all available data source types
sources = sdk.get_sources()
for source in sources:
    print(f"{source['name']}: {source['description']}")

Manage Connections

# Create connection to Elasticsearch
connection_config = {
    "config": {
        "name": "Production ES Cluster",
        "host": "https://es-cluster.example.com:9200",
        "username": "elastic",
        "password": "secret"
    }
}
connection = sdk.create_connection("elasticsearch", connection_config)
print(f"Created connection: {connection['connection_id']}")

# List all active connections
connections = sdk.get_connections()

# Get connection status
status = sdk.get_connection("conn_abc123")
print(f"Status: {status['status']}")

# Update connection configuration
updated_config = {
    "config": {
        "name": "Production ES Cluster - Updated",
        "host": "https://new-es-cluster.example.com:9200",
        "username": "elastic",
        "password": "new_secret"
    }
}
sdk.update_connection("conn_abc123", updated_config)

# Delete connection
sdk.delete_connection("conn_old_123")

Query Connected Sources

# Query Elasticsearch (via QueryDSL)
results = sdk.query_source(
    connection_id="conn_elasticsearch_prod",
    dataset="external_logs",
    query='{"query": {"match_all": {}}}'
)

# Query Snowflake (via SQL)
results = sdk.query_source(
    connection_id="conn_snowflake_prod",
    dataset="sales_data",
    query="SELECT * FROM sales_data WHERE region='US' LIMIT 10"
)

# Get metadata from a connected source
metadata = sdk.get_source_metadata("conn_elasticsearch_prod", "logs-2024")
print(f"Fields: {metadata['mappings']}")

File Upload

Upload files directly to datasets without setting up connections. Supports JSON, JSONL, and CSV formats.

from infino_sdk import InfinoSDK

sdk = InfinoSDK(access_key, secret_key, endpoint)

# Upload a JSON file (synchronous - waits for completion)
result = sdk.upload_file(
    dataset="my-dataset",
    file_path="data.json",
    format="json"  # or "jsonl", "csv", "auto"
)
print(f"Uploaded {result['stats']['documents_processed']} documents")

# Upload a CSV file
result = sdk.upload_file("sales-data", "quarterly_sales.csv", format="csv")

# Upload with async mode (for large files)
result = sdk.upload_file(
    dataset="large-dataset",
    file_path="big_data.jsonl",
    format="jsonl",
    async_mode=True  # Returns immediately with run_id
)
print(f"Job submitted: {result['run_id']}")

# Poll for completion
import time
while True:
    status = sdk.get_connector_job_status(result['run_id'])
    print(f"Status: {status['status']}")
    if status['status'] in ('completed', 'failed'):
        if status['status'] == 'completed':
            print(f"Processed {status['stats']['documents_processed']} docs")
        break
    time.sleep(2)

Parameters:

  • dataset: Target dataset name
  • file_path: Path to the file to upload
  • format: File format - "json", "jsonl", "csv", or "auto" (default: auto-detect)
  • batch_size: Documents per batch (default: 1000)
  • async_mode: If True, returns immediately with run_id for polling (default: False)

Import Jobs

Import data from connected sources into datasets for correlation.

# Create import job to bring data from Snowflake into a dataset
import_config = {
    "connection_id": "conn_snowflake_prod",
    "source_dataset": "sales_data",
    "target_dataset": "sales-correlation",
    "query": "SELECT * FROM sales_data WHERE date >= '2024-01-01'",
    "schedule": "0 2 * * *"  # Daily at 2 AM
}
job = sdk.create_import_job("snowflake", import_config)
print(f"Import job created: {job['job_id']}")

# List all import jobs
jobs = sdk.get_import_jobs()
for job in jobs:
    print(f"Job {job['job_id']}: {job['status']}")

# Delete import job
sdk.delete_import_job("job_abc123")

Query – Ask Questions

Query any connected source or dataset with multiple interfaces.

Natural Language (Fino AI)

import asyncio
import json

async def query_with_fino():
    sdk = InfinoSDK(access_key, secret_key, endpoint)
    
    # Connect to Fino WebSocket
    ws = await sdk.websocket_connect("/fino/nl")
    
    try:
        # Send your question
        await ws.send(json.dumps({
            "type": "query",
            "content": "What are the top 5 products by revenue?"
        }))
        
        # Receive AI response
        async for message in ws:
            data = json.loads(message)
            print(f"Fino: {data.get('content', '')}")
            if data.get("type") == "complete":
                break
    finally:
        await ws.close()
        sdk.close()

# Run async
asyncio.run(query_with_fino())

Manage Conversation Threads

# Create and manage Fino conversation threads
sdk = InfinoSDK(access_key, secret_key, endpoint)

# List existing threads
threads = sdk.list_threads()
print(f"Found {len(threads)} threads")

# Create a new thread with optional metadata
thread = sdk.create_thread({
    "name": "Sales Analysis",
    "workflow_name": "alpha_v1"
})
print(f"Created thread: {thread['id']}")

# Get thread details
thread_info = sdk.get_thread(thread["id"])
print(f"Thread name: {thread_info['name']}")

# Update thread metadata
sdk.update_thread(thread["id"], {
    "name": "Q4 Sales Analysis - Updated",
    "metadata": {
        "department": "finance",
        "priority": "high"
    }
})

# Add a message to the thread
sdk.add_thread_message(thread["id"], {
    "content": {
        "user_query": "What are the top 5 regions by revenue?"
    },
    "role": "user"
})

# Send a message using the simplified API (thread_id in payload)
response = sdk.send_message({
    "thread_id": thread["id"],
    "content": {
        "user_query": "Show me week-over-week trends"
    },
    "role": "user"
})

# Clean up when finished
sdk.clear_thread_messages(thread["id"])
sdk.delete_thread(thread["id"])

SQL Queries

# Query a dataset with SQL
results = sdk.query_dataset_in_sql("SELECT * FROM products WHERE price > 100 LIMIT 10")

# With aggregations
results = sdk.query_dataset_in_sql("SELECT category, AVG(price) FROM products GROUP BY category")

Query DSL

# Simple query on a dataset
query = '{"query": {"match_all": {}}}'
results = sdk.query_dataset_in_querydsl("products", query)

# Complex query with filters
query = '''
{
  "query": {
    "bool": {
      "must": [{"range": {"price": {"gte": 10, "lte": 100}}}],
      "filter": [{"term": {"in_stock": true}}]
    }
  }
}
'''
results = sdk.query_dataset_in_querydsl("products", query)

# Query data source
results = sdk.query_source(
    connection_id="conn_opensearch",
    dataset="dataset",
    query=query
)

PromQL (Time-Series)

# Instant PromQL query
result = sdk.query_dataset_in_promql(
    'http_requests_total{status="200"}',
    dataset="metrics_example",
)

# Range PromQL query
result = sdk.query_dataset_in_promql_range(
    query='rate(http_requests_total[5m])',
    start=1609459200,
    end=1609545600,
    step=300,
    dataset="metrics_example",
)

Correlate – Cross-Source Operations

Use datasets to pull together data from different sources for correlation and analysis without schemas.

When to Use Datasets

  • Cross-Source Joins: Correlate data from multiple data sources
  • Unified Analysis: Ask deeper questions across silos
  • Staging: Test queries before running in production
  • Temporary Storage: Hold intermediate results for complex workflows

Create Datasets

# Create a dataset for staging
sdk.create_dataset("staging-analysis-2024")

Upload Data

# Upload JSON records to a dataset (NDJSON format)
bulk_data = '''
{"index": {"_id": "1"}}
{"product_id": "A123", "revenue": 15000, "@timestamp": "2024-10-15"}
{"index": {"_id": "2"}}
{"product_id": "B456", "revenue": 23000, "@timestamp": "2024-10-15"}
'''
sdk.upload_json_to_dataset("sales-correlation", bulk_data)

# Upload via SQL upsert (INSERT or UPDATE)
sql_upsert = """
    INSERT INTO sales_correlation (_id, product_id, revenue, timestamp)
    VALUES ('3', 'C789', 30000, '2024-10-15')
    ON CONFLICT (_id) DO UPDATE SET revenue = 30000
"""
sdk.upsert_to_dataset(sql_upsert)

# Upload Prometheus metrics to a dataset
metrics_data = '''
# TYPE http_requests_total counter
http_requests_total{method="GET",status="200"} 1234 1609459200000
http_requests_total{method="POST",status="201"} 567 1609459200000
'''
sdk.upload_metrics_to_dataset("metrics-correlation", metrics_data)

Manage Datasets

# Get dataset metadata
metadata = sdk.get_dataset_metadata("sales-correlation")
print(f"Document count: {metadata['count']}")

# Get dataset schema
schema = sdk.get_dataset_schema("sales-correlation")
print(f"Fields: {schema['mappings']}")

# List all datasets
datasets = sdk.get_datasets()
for dataset in datasets:
    print(f"Dataset: {dataset['name']}")

# Delete dataset
sdk.delete_dataset("old-staging-2023")

Record Operations

# Get a record
record = sdk.get_record("sales-correlation", "prod_123")

# Delete records
sdk.delete_records("sales-correlation", '{"query": {"range": {"@timestamp": {"lt": "2024-01-01"}}}}')

Dataset Enrichment

# Configure enrichment for a dataset
import json

enrich_policy = json.dumps({
    "enrich_policy": {
        "match_field": "user_id",
        "enrich_fields": ["email", "name", "department"]
    }
})

sdk.enrich_dataset("sales-correlation", enrich_policy)

Visualize – Build and Execute Visualizations

Persist a visualization spec on the server, then execute it from anywhere — a notebook, a backend job, a dashboard you're building yourself — and get back normalized {columns, rows, metadata} ready to plot with your library of choice (pyecharts, plotly, altair, matplotlib, …).

Two ways to create a visualization:

  • Raw SQL — set source.sql.raw_query to a full SELECT statement when you want full SQL power (JOINs, window functions, CTEs, dialect-specific syntax).
  • Builder mode — set mapping.x / mapping.y / aggregation_type and omit raw_query. The gateway generates the SQL, including dialect-aware quoting for connector-backed sources (MySQL, BigQuery, Snowflake, …).

Filter chips can be saved on the visualization itself (filters field) or passed at execute time via the filters= kwarg — both shapes AND-merge into the executed SQL via the server-side rewriter, so plot-ready rows always respect the active filters. A time_range= kwarg on execute_visualization and execute_dashboard applies a runtime time window the same way.

Create

The server fills defaults for every field you omit, so the minimum spec is small — title, source (with kind, index, and either a raw_query or the Builder-mode fields), and chart.type.

Raw SQL — write the full query yourself:

viz = sdk.create_visualization({
    "title": "Orders by currency",
    "source": {
        "kind": "sql",
        "index": "sample-ecommerce-data.rel",
        "sql": {
            "raw_query": (
                'SELECT currency, COUNT(*) AS count '
                'FROM "sample-ecommerce-data.rel" '
                "GROUP BY currency"
            ),
        },
    },
    "chart": {"type": "bar"},
})
viz_id = viz["id"]

Builder mode — describe the chart in terms of columns + an aggregation and let the gateway generate the SQL:

viz = sdk.create_visualization({
    "title": "Orders by currency",
    "source": {
        "kind": "sql",
        "index": "sample-ecommerce-data.rel",
        "sql": {"limit": 50},
        # no raw_query — gateway generates it from the fields below
    },
    "chart": {"type": "bar"},
    "mapping": {"x": "currency", "y": ["count"], "series_split_by": None},
    "aggregation_type": "count",   # count | sum | avg | none
})

create_visualization returns a response wrapper: the server-assigned id, created_at, updated_at, and kind at the top level, with the full visualization (every field you sent plus all server-filled defaults) under attributes. The same shape is returned by get_visualization and update_visualization.

Top-N pattern — use source.sql.order_by to override the default ORDER BY <x-axis>. Reference the aggregation alias (sum_<col>, avg_<col>, or count):

# Top 10 customers by total revenue (BigQuery)
viz = sdk.create_visualization({
    "source": {
        "kind": "sql", "index": "orders", "connector_id": "bigquery",
        "sql": {
            "metrics": [{"function": "sum", "column": "revenue"}],
            "order_by": [{"column": "sum_revenue", "direction": "desc"}],
            "limit": 10,
        },
    },
    "chart": {"type": "bar"},
    "mapping": {"x": "customer_name", "y": ["revenue"]},
})
# Emits: SELECT `customer_name`, SUM(`revenue`) as `sum_revenue`
#        FROM `orders` GROUP BY `customer_name`
#        ORDER BY `sum_revenue` DESC LIMIT 10

See Top-N pattern for the alias-name contract and multi-column ORDER BY examples.

Execute and render

data = sdk.execute_visualization(viz_id)
# data["columns"]  — [{"name": "...", "type": "string"|"number"|"date"|"boolean"|"null"}]
# data["rows"]     — [{<col>: <val>, ...}, ...]
# data["metadata"] — {"source_kind", "row_count", "truncated", "took_ms",
#                     "executed_query", "filters_applied", "filters_skipped"}

Apply filter chips at execute time without modifying the saved spec by passing filters=. The gateway AND-merges them into the SQL via the server-side rewriter; saved filters (on the viz itself) still apply too:

data = sdk.execute_visualization(viz_id, filters=[
    {"field": "currency", "operator": "is_one_of", "value": ["USD", "EUR"]},
    {"field": "@timestamp", "operator": "is_between",
     "value": {"from": "2026-04-01T00:00:00Z", "to": "2026-05-01T00:00:00Z"},
     "is_time_filter": True},
])
# data["metadata"]["filters_applied"] = ["currency", "@timestamp"]
# data["metadata"]["filters_skipped"] = []

Supported operators: is, is_not, is_one_of, is_not_one_of, exists, does_not_exist, is_between, is_not_between. Send field raw — the gateway handles dialect-specific quoting.

# Apply a time window at execute time
data = sdk.execute_visualization(viz_id, time_range={
    "from": "2026-04-01T00:00:00Z",
    "to":   "2026-05-01T00:00:00Z",
})

Same kwarg on execute_dashboard for dashboard-wide time windows. Server-side this becomes an is_between filter on the viz's time column (source.sql.time_column, default @timestamp). Accepts ISO-8601 strings or epoch-ms numbers (UTC); absolute timestamps only.

Pull out the values with plain list comprehensions and hand them to your chart library:

from pyecharts import options as opts
from pyecharts.charts import Bar

xs = [row["currency"] for row in data["rows"]]
ys = [row["count"]    for row in data["rows"]]

Bar() \
    .add_xaxis(xs) \
    .add_yaxis("Orders", ys) \
    .set_global_opts(title_opts=opts.TitleOpts(title="Orders by currency")) \
    .render("orders_by_currency.html")

The SDK deliberately stays out of the rendering decision — you keep your existing data tooling (pandas.DataFrame(data["rows"]) if you want a frame, or feed rows straight to plotly / altair / matplotlib).

Update

Send only the fields you want to change — anything you don't send is preserved. Send a field as null to unset it. Lists are replaced wholesale, not merged element-wise. id, schema_version, created_at, and created_by are immutable; the server ignores attempts to overwrite them. (Wire format: RFC 7396 JSON Merge Patch.)

# Rename a viz and tighten the SQL row limit, without re-sending the rest:
sdk.update_visualization(viz_id, {
    "title": "Orders by currency (last 30 days)",
    "source": {"sql": {"limit": 200}},
})

Manage

viz     = sdk.get_visualization(viz_id)             # full response with attributes
listing = sdk.list_visualizations(limit=50)         # {"items": [response, ...]}
sdk.delete_visualization(viz_id)

list_visualizations accepts limit (server default 500, max 1000) and offset (default 0) for pagination.

Group visualizations into a dashboard

dashboard = sdk.create_dashboard({
    "title": "Ecommerce overview",
    "panels": [
        {"viz_id": viz_a["id"]},
        {"viz_id": viz_b["id"]},
    ],
})

The server fills sane defaults — dashboard-level options, per-panel ids, title_override: null, and a 2-column auto-layout for the panels (you can supply an explicit layout: {x, y, w, h} per panel to override).

Iterate panels and execute each underlying viz to render the dashboard yourself:

dash = sdk.get_dashboard(dashboard["id"])
for panel in dash["attributes"]["panels"]:
    if panel["kind"] == "visualization":
        data = sdk.execute_visualization(panel["viz_id"])
        # render `data["rows"]` with your chart library of choice

Execute a whole dashboard in one call

The loop above issues 2N + 1 HTTP calls for an N-panel dashboard (the dashboard fetch plus a get_visualization and execute_visualization per panel). For a 16-panel dashboard that's ≈ 1.5 s of wall time. Use execute_dashboard to fan that out in parallel and return enriched per-panel data in one call:

panels = sdk.execute_dashboard(dashboard["id"])
# panels = [
#   {"id", "kind", "layout", "title_override",
#    "viz":  {<full Visualization>}    | None,
#    "data": {"columns", "rows", "metadata"} | None,
#    "error": {"status", "message"}    | None},
#   ...
# ]

for p in panels:
    if p["kind"] != "visualization" or p["error"]:
        continue
    spec = sdk.to_echarts_option(p["viz"], p["data"])
    # render `spec["option"]` at `p["layout"]`

filters= applies the same filter chips to every panel in the fanout — useful for dashboard-wide filter bars:

panels = sdk.execute_dashboard(dashboard["id"], filters=[
    {"field": "region", "operator": "is", "value": "EMEA"},
])

time_range= does the same for a dashboard-level time picker — every panel's SQL gets the synthetic is_between filter on its source.sql.time_column:

panels = sdk.execute_dashboard(dashboard["id"], time_range={
    "from": "2026-04-01T00:00:00Z",
    "to":   "2026-05-01T00:00:00Z",
})

Internally the SDK uses a ThreadPoolExecutor to fire panel fetches and executions concurrently. Per-panel errors are isolated (one bad panel doesn't fail the whole call); the failing panel's entry has error set and viz/data left None.

CRUD for dashboards mirrors the visualization API:

dash    = sdk.get_dashboard(dashboard_id)             # full response with attributes
listing = sdk.list_dashboards(limit=50)               # {"items": [response, ...]}
sdk.update_dashboard(dashboard_id, {"title": "..."})  # PATCH partial update
sdk.delete_dashboard(dashboard_id)

update_dashboard follows the same JSON Merge Patch semantics as update_visualization. Note that arrays (e.g. panels) are replaced wholesale by the patch, not merged element-wise — to add a panel, send the full updated list.

Runnable examples live under examples/dashboards/:

  • create_and_render.py — End-to-end flow: create five visualizations, bundle them into a dashboard, execute every panel in parallel, and render a layout-aware composite HTML page via CSS Grid. Run with python -m examples.dashboards.create_and_render.
  • builder_mode.py — Side-by-side Raw SQL vs Builder mode creation against the same dataset, plus runtime filters= usage on both execute_visualization and execute_dashboard. Run with python -m examples.dashboards.builder_mode.
  • advanced_chart_config.py — Every visualization config knob exercised with inline annotations (mapping, legend, bar width, donut ratio, metric formatting, tags, description, source pagination). Run with python -m examples.dashboards.advanced_chart_config.

Shared helpers (credentials, logging, renderer) live in examples/dashboards/common/. If you're driving the SDK with an AI coding agent (Claude Code, Cursor, aider, Cline, Codex CLI, OpenHands), drop examples/dashboards/AGENTS.md into your workspace — it carries task→SDK-call mappings, chart skeletons per type, and the full config-field reference in a form agents pattern-match well.

Govern – Security & Access Control

Control access to your entire data stack with centralized governance for both humans and agents.

Complete Workflow Example

from infino_sdk import InfinoSDK

sdk = InfinoSDK(access_key, secret_key, endpoint)

# Step 1: Create a role with specific permissions
role_config = """
Version: 2025-01-01
Permissions:
  - ResourceType: record
    Actions: [read]
    Resources: ["logs-*", "metrics-*"]
  
  - ResourceType: metadata
    Actions: [read]
    Resources: ["*"]
"""

sdk.create_role("readonly-analyst", role_config)

# Step 2: Create user and assign the role
user_config = """
Version: 2025-01-01
Password: SecureP@ssw0rd123!
Roles:
  - readonly-analyst
"""

sdk.create_user("analytics-agent", user_config)

# Step 3: Rotate API keys when needed
new_keys = sdk.rotate_keys()
print(f"New access key: {new_keys['access_key']}")

User Management

# List all users
users = sdk.list_users()

# Get specific user
user = sdk.get_user("analytics-agent")

# Update user password or roles
updated_config = """
Version: 2025-01-01
Password: NewP@ssw0rd456!
Roles:
  - readonly-analyst
  - data-viewer
"""
sdk.update_user("analytics-agent", updated_config)

# Delete user
sdk.delete_user("analytics-agent")

Role Management

# Create role with field-level security
role_with_masking = """
Version: 2025-01-01
Permissions:
  - ResourceType: record
    Actions: [read]
    Resources: ["users-*"]
    Fields:
      Allow: ["id", "name", "email"]
      Mask:
        email: redact
        ssn: remove
      Deny:
        - password
        - api_key
"""
sdk.create_role("privacy-compliant-analyst", role_with_masking)

# List all roles
roles = sdk.list_roles()
for role in roles:
    print(f"Role: {role['name']}")

# Get role details
role = sdk.get_role("readonly-analyst")
print(f"Permissions: {role['permissions']}")

# Update role permissions
updated_role = """
Version: 2025-01-01
Permissions:
  - ResourceType: record
    Actions: [read, write]
    Resources: ["logs-*", "metrics-*"]
"""
sdk.update_role("readonly-analyst", updated_role)

# Delete role
sdk.delete_role("old-role")

Resource Types & Actions

Permissions use universal terminology that works across SQL, NoSQL, logs, and metrics:

ResourceType Actions What It Controls
metadata read View schemas, mappings, list datasets
dataset create, delete Create/delete datasets
record read, write Query/insert/update/delete records
field N/A Controlled via Fields in record permissions

Centralized Governance: Apply consistent policies across all connected sources for both humans and agents.

Error Handling

from infino_sdk import InfinoSDK, InfinoError

async with InfinoSDK(access_key, secret_key, endpoint) as sdk:
    try:
        record = sdk.get_record("products", "missing_id")
    except InfinoError as e:
        if e.error_type == InfinoError.Type.REQUEST:
            if e.status_code() == 404:
                print("Record not found")
            elif e.status_code() == 403:
                print("Access denied - check user permissions")
            elif e.status_code() == 401:
                print("Authentication failed")
        elif e.error_type == InfinoError.Type.NETWORK:
            print(f"Network error: {e.message}")

Advanced Configuration

SDK Initialization

from infino_sdk import InfinoSDK, RetryConfig

# Standard initialization
sdk = InfinoSDK(
    access_key="your_access_key",
    secret_key="your_secret_key",
    endpoint="https://api.infino.ws"
)

# Using class methods (alternative constructors)
sdk = InfinoSDK.new(access_key, secret_key, endpoint)

# With custom retry configuration
retry_config = RetryConfig()
retry_config.initial_interval = 500
retry_config.max_retries = 5

sdk = InfinoSDK.new_with_retry(
    access_key,
    secret_key,
    endpoint,
    retry_config
)

Context Manager Usage

from infino_sdk import InfinoSDK

# Automatic resource cleanup with context manager
with InfinoSDK(access_key, secret_key, endpoint) as sdk:
    results = sdk.query_dataset_in_sql("SELECT * FROM products")
    print(f"Found {len(results)} records")
    # Session automatically closed on exit

# Manual resource management
sdk = InfinoSDK(access_key, secret_key, endpoint)
try:
    results = sdk.query_dataset_in_sql("SELECT * FROM products")
finally:
    sdk.close()  # Explicitly close session

Custom Retry Configuration

from infino_sdk import InfinoSDK, RetryConfig

retry_config = RetryConfig()
retry_config.initial_interval = 500      # milliseconds
retry_config.max_interval = 30000        # milliseconds
retry_config.max_retries = 5
retry_config.max_elapsed_time = 180000   # milliseconds

sdk = InfinoSDK(
    access_key=access_key,
    secret_key=secret_key,
    endpoint=endpoint,
    retry_config=retry_config
)

Logging

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("infino_sdk")
logger.setLevel(logging.DEBUG)

# SDK logs all requests and signing operations
sdk = InfinoSDK(access_key, secret_key, endpoint)
sdk.ping()

Examples

Complete working examples organized by workflow:

Connect Examples

Query Examples

Correlate Examples

Visualize Examples

Govern Examples

Utilities

Requirements

  • Python 3.8 or higher
  • aiohttp >= 3.8.0
  • websockets >= 10.0
  • backoff >= 2.0.0

Development

See CONTRIBUTING.md for development setup and contribution guidelines.

# Clone repository
git clone https://github.com/infinohq/infino-sdk.git
cd infino-sdk

# Install development dependencies
pip install -r requirements-dev.txt

# Run tests
pytest

# Run linter
flake8 infino_sdk tests

# Check types
mypy infino_sdk

Support

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Changelog

See CHANGELOG.md for version history and release notes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infino_sdk-0.6.0.tar.gz (73.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infino_sdk-0.6.0-py3-none-any.whl (37.4 kB view details)

Uploaded Python 3

File details

Details for the file infino_sdk-0.6.0.tar.gz.

File metadata

  • Download URL: infino_sdk-0.6.0.tar.gz
  • Upload date:
  • Size: 73.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for infino_sdk-0.6.0.tar.gz
Algorithm Hash digest
SHA256 e038be47f9d0bedc266721a7dc65ddaa857ed0d051ef1b59eb7f082bebee375c
MD5 0ec1e3e285b633513fd9df7aa4154588
BLAKE2b-256 37148434c0c4861cbdaebb124ab66f7e42c533de586caa75423e4043793fcc9b

See more details on using hashes here.

Provenance

The following attestation bundles were made for infino_sdk-0.6.0.tar.gz:

Publisher: publish.yml on infinohq/infino-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file infino_sdk-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: infino_sdk-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 37.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for infino_sdk-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 15f3cf802cda8b019d5a8c7d386d2b3058f7949f4bec7292988e52ffaafc2920
MD5 4c40166af6b66060a68334d3b8a7c3bb
BLAKE2b-256 dd12372a549375d94ad28185a3fc4e7f0421643d574ac0742746f7a147b01c54

See more details on using hashes here.

Provenance

The following attestation bundles were made for infino_sdk-0.6.0-py3-none-any.whl:

Publisher: publish.yml on infinohq/infino-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page