Skip to main content

Python SDK for Keboola Query Service API

Project description

Keboola Query Service Python SDK

Python client for Keboola Query Service API.

Installation

pip install keboola-query-service

Quick Start

from keboola_query_service import Client

# Initialize client
# IMPORTANT: Use query.keboola.com (NOT connection.keboola.com)
# Don't append /api/v1 - the SDK handles routing automatically
client = Client(
    base_url="https://query.keboola.com",  # Query Service URL
    token="your-storage-api-token"          # Your Keboola Storage API token
)

# Execute a query
# - branch_id: Find in Keboola UI URL or via Storage API
# - workspace_id: Your workspace ID from Keboola
results = client.execute_query(
    branch_id="1261313",
    workspace_id="2950146661",
    statements=["SELECT * FROM my_table LIMIT 10"]
)

# Process results - one QueryResult per statement
for result in results:
    print("Columns:", [col.name for col in result.columns])
    print("Data:", result.data)

# Always close the client when done
client.close()

Finding Your IDs

  • branch_id: Found in the Keboola Connection URL (e.g., https://connection.keboola.com/admin/projects/123/... → branch is in the Storage API)
  • workspace_id: Go to Transformations → Workspace → Copy the workspace ID from URL or details
  • token: Settings → API Tokens → Create new token with appropriate permissions

Features

  • Sync and async support - Both synchronous and async (asyncio) APIs
  • Automatic retries - Configurable retry logic for transient failures
  • Job polling - Built-in exponential backoff for waiting on job completion
  • Streaming - NDJSON streaming for large result sets
  • Type hints - Full type annotations for IDE support

Usage

Basic Query Execution

from keboola_query_service import Client

with Client(base_url="https://query.keboola.com", token="...") as client:
    # Execute query and wait for results
    results = client.execute_query(
        branch_id="123",
        workspace_id="456",
        statements=[
            "SELECT * FROM orders WHERE date > '2024-01-01'",
            "SELECT COUNT(*) FROM customers"
        ],
        transactional=True  # Execute in a transaction
    )

    # Results is a list - one QueryResult per statement
    orders_result = results[0]
    count_result = results[1]

    print(f"Columns: {[c.name for c in orders_result.columns]}")
    print(f"Rows: {len(orders_result.data)}")

Using Context Manager (Recommended)

from keboola_query_service import Client

# Context manager automatically closes the client
with Client(base_url="https://query.keboola.com", token="...") as client:
    results = client.execute_query(
        branch_id="1261313",
        workspace_id="2950146661",
        statements=["SELECT 1 as test"]
    )
    print(results[0].data)  # [['1']]

Async Usage

import asyncio
from keboola_query_service import Client

async def main():
    async with Client(base_url="https://query.keboola.com", token="...") as client:
        results = await client.execute_query_async(
            branch_id="1261313",
            workspace_id="2950146661",
            statements=["SELECT 1 as test"]
        )
        print(results[0].data)

asyncio.run(main())

Low-Level API

For more control, use the low-level methods:

# Submit job without waiting
job_id = client.submit_job(
    branch_id="123",
    workspace_id="456",
    statements=["SELECT * FROM large_table"]
)

# Check status
status = client.get_job_status(job_id)
print(f"Status: {status.status}")  # created, enqueued, processing, completed, failed

# Wait for completion
final_status = client.wait_for_job(job_id, max_wait_time=300)

# Get results for specific statement
result = client.get_job_results(job_id, final_status.statements[0].id)

Streaming Large Results

# Stream results as NDJSON for large datasets
for row in client.stream_results(job_id, statement_id):
    process_row(row)

Error Handling

from keboola_query_service import (
    Client,
    AuthenticationError,
    ValidationError,
    JobError,
    TimeoutError,
)

try:
    results = client.execute_query(...)
except AuthenticationError:
    print("Invalid token")
except ValidationError as e:
    print(f"Invalid request: {e.message}")
except JobError as e:
    print(f"Query failed: {e.message}")
    for stmt in e.failed_statements:
        print(f"  Statement {stmt['id']}: {stmt['error']}")
except TimeoutError as e:
    print(f"Job {e.job_id} timed out")

Query History

history = client.get_query_history(
    branch_id="123",
    workspace_id="456",
    page_size=100
)

for stmt in history.statements:
    print(f"{stmt.query_job_id}: {stmt.query[:50]}... ({stmt.status})")

Configuration

client = Client(
    base_url="https://query.keboola.com",
    token="your-token",
    timeout=120.0,           # Request timeout (seconds)
    connect_timeout=10.0,    # Connection timeout (seconds)
    max_retries=3,           # Max retry attempts
    user_agent="my-app/1.0", # Custom user agent
)

API Reference

Client Methods

Method Description
execute_query() Submit query, wait for completion, return results
submit_job() Submit query job without waiting
get_job_status() Get current job status
get_job_results() Get results for a statement
wait_for_job() Wait for job to complete
cancel_job() Cancel a running job
get_query_history() Get query history for workspace
stream_results() Stream results as NDJSON

All methods have async variants with _async suffix.

Models

  • JobStatus - Job status with statements
  • QueryResult - Query results with columns and data
  • Statement - Individual SQL statement info
  • Column - Column metadata
  • JobState - Enum: created, enqueued, processing, completed, failed, canceled
  • StatementState - Enum: waiting, processing, completed, failed, canceled, notExecuted

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

keboola_query_service-0.2.0.tar.gz (10.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

keboola_query_service-0.2.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file keboola_query_service-0.2.0.tar.gz.

File metadata

  • Download URL: keboola_query_service-0.2.0.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for keboola_query_service-0.2.0.tar.gz
Algorithm Hash digest
SHA256 6afaa37eff144312361a32e76cc193138d67f14a8bdfbdbdf0ef5d16d36afcc6
MD5 771ca85400d259792678af4e2dbf7a07
BLAKE2b-256 a5444786682a4e8eaaee31ceb88525fe287c1032295d22e52fab4463ef04d801

See more details on using hashes here.

Provenance

The following attestation bundles were made for keboola_query_service-0.2.0.tar.gz:

Publisher: publish.yml on keboola/query-service-api-python-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file keboola_query_service-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for keboola_query_service-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1593ad230b116c4a437b27bc0167272bbec4d88c5a5029b08491d7ac5e017650
MD5 88f963c248cb11ba3e9252329889083b
BLAKE2b-256 9f70b8a725518847669f266e8ef91ce5d2b6554f80931c36bebf00e704aca1f0

See more details on using hashes here.

Provenance

The following attestation bundles were made for keboola_query_service-0.2.0-py3-none-any.whl:

Publisher: publish.yml on keboola/query-service-api-python-sdk

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page