Skip to main content

Python client SDK for the H2O Connector Service — create connectors, open connections, and stream extracted data

Project description

h2o-connector-service

Python client SDK for the H2O Connector Service. Provides a high-level API to create connectors, open connections, and stream extracted data from supported data sources (PostgreSQL, Snowflake, Hive, Delta Lake, Blob Storage, and more).

pip install h2o-connector-service

Quick Start (H2O Cloud Discovery)

When environment= looks like an H2O AI Cloud URL (*.h2o.ai / *.h2o-cloud.com), the Client runs OIDC discovery and exchanges your refresh token automatically:

from h2o_connector_service import Client

client = Client(
    environment="https://cloud.h2o.ai",
    refresh_token="<your refresh token>",
    workspace="my-workspace",  # optional; falls back to the JWT workspace_id claim
)

The same construction is also available with no args when the env vars H2O_CLOUD_ENVIRONMENT and H2O_CLOUD_CLIENT_PLATFORM_TOKEN are set:

client = Client()

Quick Start (Direct Connector Service URL)

For local dev / Kind / non-cloud deployments, pass a direct connector-service URL — discovery is short-circuited and the token is used as a static bearer:

client = Client(
    environment="https://connector-service.h2oai.test",
    refresh_token="<a valid access token>",
    workspace="my-workspace",
    verify_ssl=False,
)

End-to-End Streaming Flow

A connection has three required pieces — a Connector (data source config), a Worker (a pod backed by a WorkerTemplate), and an ExtractionConfig (what to extract). The pattern below is what the integration tests run end-to-end. See examples/quickstart.py for a runnable version.

from h2o_connector_service import Client
from h2o_connector_service._session import ConnectorSession

client = Client(environment="...", refresh_token="...", workspace="my-workspace")

# 1. WorkerTemplate — image + pod spec carrying driver-native env vars
pod_spec_yaml = """
spec:
  containers:
    - name: worker
      env:
        - {name: PGUSER, value: "postgres"}
        - {name: PGPASSWORD, value: "secret"}
        - {name: PGHOST, value: "db.example.com"}
        - {name: PGPORT, value: "5432"}
        - {name: PGDATABASE, value: "mydb"}
"""
wt = client.worker_templates.create(
    metadata={"name": "wt-pg"},
    image="h2oai-connectorservice-workerpostgresql:latest",
    pull_policy="IfNotPresent",  # K8s shorthand or full IMAGE_PULL_POLICY_* enum
    supported_data_source_types=["postgresql"],
    default_pod_template_spec_yaml=pod_spec_yaml,
    enabled=True,
)

# 2. Connector — data_source_type + driver-native data_source_config
connector = client.connectors.create(
    metadata={"name": "pg"},
    data_source_type="postgresql",
    data_source_config={
        "PGHOST": "db.example.com",
        "PGPORT": "5432",
        "PGDATABASE": "mydb",
        "PGUSER": "postgres",
        "PGPASSWORD": "secret",
    },
)

# 3. Worker — backed by the WorkerTemplate above
worker = client.workers.create(
    metadata={"name": "w-pg"},
    worker_template=f"workerTemplates/{wt.metadata.name}",
)

# 4. Connection — bind connector + worker + extraction, wait for the worker pod
connection = client.connections.create(
    connector=f"connectors/{connector.metadata.name}",
    worker=f"workers/{worker.metadata.name}",
    extraction={"query": "SELECT * FROM my_table", "batch_size": 100},
)
connection.wait_for_ready(timeout=300.0, interval=2.0)

# 5. Stream — ConnectorSession wraps the worker gRPC channel.
#    NOTE: __exit__ deletes Connection + Connector; Worker (workspace-scoped)
#    and WorkerTemplate (global) survive — see step 6.
with ConnectorSession(client=client, connection=connection, connector=connector) as session:
    for row in session.stream_records():
        print(row)

# 6. Tear down the surviving resources. Wrap steps 1–5 in try/finally if
#    you want cleanup to run on streaming errors too.
worker.delete()
wt.delete()

Note: The previous high-level client.open_session(connector_type, config) 1-call wrapper was removed in issue #267 because it was non-functional (placed data_source_config in the wrong proto field and provisioned no worker). Use the 4-step flow above. ConnectorSession is intentionally imported from the private _session module — it has no __all__ export and no API stability guarantee outside the canonical flow.

Output Formats

Once you have a session, stream data into various formats:

# CSV file (memory-safe — rows written as they arrive)
session.stream_to_csv("output.csv")

# pandas DataFrame (requires: pip install h2o-connector-service[pandas])
df = session.stream_to_pandas()

# Parquet file (memory-safe, chunked row groups)
# requires: pip install h2o-connector-service[parquet]
session.stream_to_parquet("output.parquet")

# datatable Frame (memory-safe, chunked rbind)
# requires: pip install h2o-connector-service[datatable]
frame = session.stream_to_data_table()

# H2O Frame (requires running H2O cluster + h2o.init())
# requires: pip install h2o-connector-service[h2o]
h2o_frame = session.stream_to_h2o_frame()

Optional Dependencies

Install extras for additional output format support:

pip install h2o-connector-service[pandas]       # pandas DataFrames
pip install h2o-connector-service[parquet]      # Parquet files (pyarrow)
pip install h2o-connector-service[datatable]    # datatable Frames
pip install h2o-connector-service[h2o]          # H2O Frames (pandas + pyarrow + h2o)

Supported Data Source Types

data_source_type Display Name Category Worker Language
postgresql PostgreSQL Tabular Go
snowflake Snowflake Tabular Go
hive Apache Hive Tabular Java
delta-lake Delta Lake Tabular Rust
s3 Amazon S3 Blob Go
gcs Google Cloud Storage Blob Go
azure-blob Azure Blob Storage Blob Go
minio MinIO Blob Go

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

h2o_connector_service-0.1.0a2.tar.gz (156.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

h2o_connector_service-0.1.0a2-py3-none-any.whl (162.2 kB view details)

Uploaded Python 3

File details

Details for the file h2o_connector_service-0.1.0a2.tar.gz.

File metadata

  • Download URL: h2o_connector_service-0.1.0a2.tar.gz
  • Upload date:
  • Size: 156.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for h2o_connector_service-0.1.0a2.tar.gz
Algorithm Hash digest
SHA256 42f810cb5773fcb1ab17d8d1dadf6d3d90561b8e00ce7e51100e7a847ae0f6cc
MD5 8495c86866a1f98bd6dcf74c8a657d7c
BLAKE2b-256 eeb9e87d90404a319ea65e43ae2182f641bbac0368620baccf1aa5c6cc7e8e8c

See more details on using hashes here.

File details

Details for the file h2o_connector_service-0.1.0a2-py3-none-any.whl.

File metadata

File hashes

Hashes for h2o_connector_service-0.1.0a2-py3-none-any.whl
Algorithm Hash digest
SHA256 df4f3c3831ecdcb687a3a4a55c1fdc4932f13e0f5bd37fea7440548fcb40b18a
MD5 5cd3ca6f1c867808b44404bf6af02d04
BLAKE2b-256 53005b31359310077b034ab511f0b50796be3d822049923bb7b9e2984eda6b22

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page