Skip to main content

Python SDK for Infino API with AWS SigV4 authentication

Project description

Infino Python SDK

Python Version License

Official Python SDK for Infino - the unification layer for your data stack.

Infino provides a single gateway to your data stack. Query Elasticsearch, OpenSearch, Snowflake, and 50+ sources in natural language, SQL, QueryDSL, or PromQL. Bring diverse data sources together for deeper analysis—no ETL required. Control access via fine-grained RBAC. All through one unified API.

Built for:

  • Connect: Access 50+ data sources without data movement
  • Query: Natural language, SQL, Query DSL, and PromQL across all sources
  • Correlate: Pull together data from different sources for cross-source correlation
  • Govern: Fine-grained RBAC for your entire data stack

Installation

pip install infino-sdk

Quick Start

from infino_sdk import InfinoSDK

# Create SDK instance with your credentials
sdk = InfinoSDK(
    access_key="your_access_key",
    secret_key="your_secret_key",
    endpoint="https://api.infino.ws"
)

# Check connection
info = sdk.ping()
print(f"Connected: {info}")

# Query a dataset
results = sdk.query_dataset_in_querydsl("my-dataset", '{"query": {"match_all": {}}}')
print(f"Found {len(results.get('hits', {}).get('hits', []))} records")

Getting Your Credentials

  1. Sign up at app.infino.ws
  2. Create a new account (accounts can only be created through the UI)
  3. Navigate to Settings → API Keys
  4. Generate your access_key and secret_key

Documentation

Table of Contents

Connect – Access Data Sources

Connect to data sources and query them in place.

Create Connections

from infino_sdk import InfinoSDK

sdk = InfinoSDK(access_key, secret_key, endpoint)

# Create connection to Elasticsearch
connection_config = {
    "connector_id": "elasticsearch",
    "name": "Production ES Cluster",
    "config": {
        "host": "https://es-cluster.example.com:9200",
        "username": "elastic",
        "password": "secret"
    }
}
connection = sdk.create_connection("elasticsearch", connection_config)
print(f"Created connection: {connection['connection_id']}")

# List all connections
connections = sdk.get_connections()

# Get connection status
status = sdk.get_connection("conn_abc123")

Query Connected Sources

# Query Elasticsearch (via QueryDSL)
results = sdk.query_source(
    connection_id="conn_elasticsearch_prod",
    dataset="external_logs", 
    query='{"query": {"match_all": {}}}'
)

# Query Snowflake (via SQL)
results = sdk.query_source(
    connection_id="conn_snowflake_prod",
    dataset="sales_data",
    query="SELECT * FROM sales_data WHERE region='US' LIMIT 10"
)

Query – Ask Questions

Query any connected source or dataset with multiple interfaces.

Natural Language (Fino AI)

import asyncio
import json

async def query_with_fino():
    sdk = InfinoSDK(access_key, secret_key, endpoint)
    
    # Connect to Fino WebSocket
    ws = await sdk.websocket_connect("/_conversation/ws")
    
    try:
        # Send your question
        await ws.send(json.dumps({
            "type": "query",
            "content": "What are the top 5 products by revenue?"
        }))
        
        # Receive AI response
        async for message in ws:
            data = json.loads(message)
            print(f"Fino: {data.get('content', '')}")
            if data.get("type") == "complete":
                break
    finally:
        await ws.close()
        sdk.close()

# Run async
asyncio.run(query_with_fino())

Manage Conversation Threads

# Create and manage Fino conversation threads
sdk = InfinoSDK(access_key, secret_key, endpoint)

# List existing threads
threads = sdk.list_threads()

# Create a new thread with optional metadata
thread = sdk.create_thread({
    "name": "Sales Analysis",
    "workflow_name": "alpha_v1"
})

# Add a message to the thread
sdk.add_thread_message(thread["id"], {
    "content": {
        "user_query": "What are the top 5 regions by revenue?"
    },
    "role": "user"
})

# Send a message using the simplified API (thread_id in payload)
sdk.send_message({
    "thread_id": thread["id"],
    "content": {
        "user_query": "Show me week-over-week trends"
    },
    "role": "user"
})

# Clean up when finished
sdk.clear_thread_messages(thread["id"])
sdk.delete_thread(thread["id"])

SQL Queries

# Query a dataset with SQL
results = sdk.query_dataset_in_sql("SELECT * FROM products WHERE price > 100 LIMIT 10")

# With aggregations
results = sdk.query_dataset_in_sql("SELECT category, AVG(price) FROM products GROUP BY category")

Query DSL

# Simple query on a dataset
query = '{"query": {"match_all": {}}}'
results = sdk.query_dataset_in_querydsl("products", query)

# Complex query with filters
query = '''
{
  "query": {
    "bool": {
      "must": [{"range": {"price": {"gte": 10, "lte": 100}}}],
      "filter": [{"term": {"in_stock": true}}]
    }
  }
}
'''
results = sdk.query_dataset_in_querydsl("products", query)

# Query data source
results = sdk.query_source(
    connection_id="conn_opensearch",
    dataset="dataset",
    query=query
)

PromQL (Time-Series)

# Instant PromQL query
result = sdk.query_dataset_in_promql(
    'http_requests_total{status="200"}',
    dataset="metrics_example",
)

# Range PromQL query
result = sdk.query_dataset_in_promql_range(
    query='rate(http_requests_total[5m])',
    start=1609459200,
    end=1609545600,
    step=300,
    dataset="metrics_example",
)

Correlate – Cross-Source Operations

Use datasets to pull together data from different sources for correlation and analysis without schemas.

When to Use Datasets

  • Cross-Source Joins: Correlate data from multiple data sources
  • Unified Analysis: Ask deeper questions across silos
  • Staging: Test queries before running in production
  • Temporary Storage: Hold intermediate results for complex workflows

Create Datasets

# Create a dataset for staging
sdk.create_dataset("staging-analysis-2024")

Upload Data

# Upload records to a dataset for correlation
bulk_data = '''
{"index": {"_id": "1"}}
{"product_id": "A123", "revenue": 15000, "@timestamp": "2024-10-15"}
{"index": {"_id": "2"}}
{"product_id": "B456", "revenue": 23000, "@timestamp": "2024-10-15"}
'''

sdk.upload_json_to_dataset("sales-correlation", bulk_data)

Manage Datasets

# Get dataset metadata
metadata = sdk.get_dataset_metadata("sales-correlation")

# List all datasets
datasets = sdk.get_datasets()

# Delete dataset
sdk.delete_dataset("old-staging-2023")

Record Operations

# Get a record
record = sdk.get_record("sales-correlation", "prod_123")

# Delete records
sdk.delete_records("sales-correlation", '{"query": {"range": {"@timestamp": {"lt": "2024-01-01"}}}}')

Dataset Enrichment

# Configure enrichment for a dataset
import json

enrich_policy = json.dumps({
    "enrich_policy": {
        "match_field": "user_id",
        "enrich_fields": ["email", "name", "department"]
    }
})

sdk.enrich_dataset("sales-correlation", enrich_policy)

Govern – Security & Access Control

Control access to your entire data stack with centralized governance for both humans and agents.

Complete Workflow Example

from infino_sdk import InfinoSDK

sdk = InfinoSDK(access_key, secret_key, endpoint)

# Step 1: Create a role with specific permissions
role_config = """
Version: 2025-01-01
Permissions:
  - ResourceType: record
    Actions: [read]
    Resources: ["logs-*", "metrics-*"]
  
  - ResourceType: metadata
    Actions: [read]
    Resources: ["*"]
"""

sdk.create_role("readonly-analyst", role_config)

# Step 2: Create user and assign the role
user_config = """
Version: 2025-01-01
Password: SecureP@ssw0rd123!
Roles:
  - readonly-analyst
"""

sdk.create_user("analytics-agent", user_config)

# Step 3: Rotate API keys when needed
new_keys = sdk.rotate_keys()
print(f"New access key: {new_keys['access_key']}")

User Management

# List all users
users = sdk.list_users()

# Get specific user
user = sdk.get_user("analytics-agent")

# Update user password or roles
updated_config = """
Version: 2025-01-01
Password: NewP@ssw0rd456!
Roles:
  - readonly-analyst
  - data-viewer
"""
sdk.update_user("analytics-agent", updated_config)

# Delete user
sdk.delete_user("analytics-agent")

Role Management

# Create role with field-level security
role_with_masking = """
Version: 2025-01-01
Permissions:
  - ResourceType: record
    Actions: [read]
    Resources: ["users-*"]
    Fields:
      Allow: ["id", "name", "email"]
      Mask:
        email: redact
        ssn: remove
      Deny:
        - password
        - api_key
"""
sdk.create_role("privacy-compliant-analyst", role_with_masking)

# Get role details
role = sdk.get_role("readonly-analyst")

# Delete role
sdk.delete_role("old-role")

Resource Types & Actions

Permissions use universal terminology that works across SQL, NoSQL, logs, and metrics:

ResourceType Actions What It Controls
metadata read View schemas, mappings, list datasets
dataset create, delete Create/delete datasets
record read, write Query/insert/update/delete records
field N/A Controlled via Fields in record permissions

Centralized Governance: Apply consistent policies across all connected sources for both humans and agents.

Error Handling

from infino_sdk import InfinoSDK, InfinoError

async with InfinoSDK(access_key, secret_key, endpoint) as sdk:
    try:
        record = sdk.get_record("products", "missing_id")
    except InfinoError as e:
        if e.error_type == InfinoError.Type.REQUEST:
            if e.status_code() == 404:
                print("Record not found")
            elif e.status_code() == 403:
                print("Access denied - check user permissions")
            elif e.status_code() == 401:
                print("Authentication failed")
        elif e.error_type == InfinoError.Type.NETWORK:
            print(f"Network error: {e.message}")

Advanced Configuration

Custom Retry Configuration

from infino_sdk import InfinoSDK, RetryConfig

retry_config = RetryConfig()
retry_config.initial_interval = 500
retry_config.max_retries = 5

sdk = InfinoSDK(
    access_key=access_key,
    secret_key=secret_key,
    endpoint=endpoint,
    retry_config=retry_config
)

Logging

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("infino_sdk")
logger.setLevel(logging.DEBUG)

# SDK logs all requests
sdk = InfinoSDK(access_key, secret_key, endpoint)
sdk.ping()

Examples

Complete working examples organized by workflow:

Connect Examples

Query Examples

Correlate Examples

Govern Examples

Utilities

API Reference

Full method documentation available in code docstrings and docs.infino.ai.

Requirements

  • Python 3.8 or higher
  • aiohttp >= 3.8.0
  • websockets >= 10.0
  • backoff >= 2.0.0

Development

See CONTRIBUTING.md for development setup and contribution guidelines.

# Clone repository
git clone https://github.com/infinohq/infino-sdk.git
cd infino-sdk

# Install development dependencies
pip install -r requirements-dev.txt

# Run tests
pytest

# Run linter
flake8 infino_sdk tests

# Check types
mypy infino_sdk

Support

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Changelog

See CHANGELOG.md for version history and release notes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

infino_sdk-0.2.0.tar.gz (28.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

infino_sdk-0.2.0-py3-none-any.whl (23.5 kB view details)

Uploaded Python 3

File details

Details for the file infino_sdk-0.2.0.tar.gz.

File metadata

  • Download URL: infino_sdk-0.2.0.tar.gz
  • Upload date:
  • Size: 28.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for infino_sdk-0.2.0.tar.gz
Algorithm Hash digest
SHA256 2665d26a11788b307266fd300a6e996d2aed53fea8f3288b5dce531c9831166b
MD5 91b1abe335313a041fa9ffadc91a0df6
BLAKE2b-256 f839173459f4c8017fe4dff63dc07bd6576496ab3ca50be4ef8bd364f9213dbd

See more details on using hashes here.

File details

Details for the file infino_sdk-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: infino_sdk-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 23.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for infino_sdk-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c1e168f96a8532ccdcd997e2140711989f533e92bcfe94e37eb593699db22a49
MD5 d8265995ba430c7b6a814962f29663e8
BLAKE2b-256 a424cd51e3d72b18587a677442678b365da78e56e7edab726e02ce4069ac3ff5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page