Skip to main content

Python client SDK for the H2O Connector Service — create connectors, open connections, and stream extracted data

Project description

h2o-connector-service

Python client SDK for the H2O Connector Service. Provides a high-level API to create connectors, open connections, and stream extracted data from supported data sources (PostgreSQL, Snowflake, Hive, Delta Lake, Blob Storage, and more).

pip install h2o-connector-service

Quick Start (H2O Cloud Discovery)

When environment= looks like an H2O AI Cloud URL (*.h2o.ai / *.h2o-cloud.com), the Client runs OIDC discovery and exchanges your refresh token automatically:

from h2o_connector_service import Client

client = Client(
    environment="https://cloud.h2o.ai",
    refresh_token="<your refresh token>",
    workspace="my-workspace",  # optional; falls back to the JWT workspace_id claim
)

The same construction is also available with no args when the env vars H2O_CLOUD_ENVIRONMENT and H2O_CLOUD_CLIENT_PLATFORM_TOKEN are set:

client = Client()

Quick Start (Direct Connector Service URL)

For local dev / Kind / non-cloud deployments, pass a direct connector-service URL — discovery is short-circuited and the token is used as a static bearer:

client = Client(
    environment="https://connector-service.h2oai.test",
    refresh_token="<a valid access token>",
    workspace="my-workspace",
    verify_ssl=False,
)

End-to-End Streaming Flow

A connection has three required pieces — a Connector (data source config), a Worker (a pod backed by a WorkerTemplate), and an ExtractionConfig (what to extract). The pattern below is what the integration tests run end-to-end. See examples/quickstart.py for a runnable version.

from h2o_connector_service import Client
from h2o_connector_service._session import ConnectorSession

client = Client(environment="...", refresh_token="...", workspace="my-workspace")

# 1. WorkerTemplate — image + pod spec carrying driver-native env vars
pod_spec_yaml = """
spec:
  containers:
    - name: worker
      env:
        - {name: PGUSER, value: "postgres"}
        - {name: PGPASSWORD, value: "secret"}
        - {name: PGHOST, value: "db.example.com"}
        - {name: PGPORT, value: "5432"}
        - {name: PGDATABASE, value: "mydb"}
"""
wt = client.worker_templates.create(
    metadata={"name": "wt-pg"},
    image="h2oai-connectorservice-workerpostgresql:latest",
    pull_policy="IfNotPresent",  # K8s shorthand or full IMAGE_PULL_POLICY_* enum
    supported_data_source_types=["postgresql"],
    default_pod_template_spec_yaml=pod_spec_yaml,
    enabled=True,
)

# 2. Connector — data_source_type + driver-native data_source_config
connector = client.connectors.create(
    metadata={"name": "pg"},
    data_source_type="postgresql",
    data_source_config={
        "PGHOST": "db.example.com",
        "PGPORT": "5432",
        "PGDATABASE": "mydb",
        "PGUSER": "postgres",
        "PGPASSWORD": "secret",
    },
)

# 3. Worker — backed by the WorkerTemplate above
worker = client.workers.create(
    metadata={"name": "w-pg"},
    worker_template=f"workerTemplates/{wt.metadata.name}",
)

# 4. Connection — bind connector + worker + extraction, wait for the worker pod
connection = client.connections.create(
    connector=f"connectors/{connector.metadata.name}",
    worker=f"workers/{worker.metadata.name}",
    extraction={"query": "SELECT * FROM my_table", "batch_size": 100},
)
connection.wait_for_ready(timeout=300.0, interval=2.0)

# 5. Stream — ConnectorSession wraps the worker gRPC channel.
#    NOTE: __exit__ deletes Connection + Connector; Worker (workspace-scoped)
#    and WorkerTemplate (global) survive — see step 6.
with ConnectorSession(client=client, connection=connection, connector=connector) as session:
    for row in session.stream_records():
        print(row)

# 6. Tear down the surviving resources. Wrap steps 1–5 in try/finally if
#    you want cleanup to run on streaming errors too.
worker.delete()
wt.delete()

Note: The previous high-level client.open_session(connector_type, config) 1-call wrapper was removed in issue #267 because it was non-functional (placed data_source_config in the wrong proto field and provisioned no worker). Use the 4-step flow above. ConnectorSession is intentionally imported from the private _session module — it has no __all__ export and no API stability guarantee outside the canonical flow.

Output Formats

Once you have a session, stream data into various formats:

# CSV file (memory-safe — rows written as they arrive)
session.stream_to_csv("output.csv")

# pandas DataFrame (requires: pip install h2o-connector-service[pandas])
df = session.stream_to_pandas()

# Parquet file (memory-safe, chunked row groups)
# requires: pip install h2o-connector-service[parquet]
session.stream_to_parquet("output.parquet")

# datatable Frame (memory-safe, chunked rbind)
# requires: pip install h2o-connector-service[datatable]
frame = session.stream_to_data_table()

# H2O Frame (requires running H2O cluster + h2o.init())
# requires: pip install h2o-connector-service[h2o]
h2o_frame = session.stream_to_h2o_frame()

Optional Dependencies

Install extras for additional output format support:

pip install h2o-connector-service[pandas]       # pandas DataFrames
pip install h2o-connector-service[parquet]      # Parquet files (pyarrow)
pip install h2o-connector-service[datatable]    # datatable Frames
pip install h2o-connector-service[h2o]          # H2O Frames (pandas + pyarrow + h2o)

Supported Data Source Types

data_source_type Display Name Category Worker Language
postgresql PostgreSQL Tabular Go
snowflake Snowflake Tabular Go
hive Apache Hive Tabular Java
delta-lake Delta Lake Tabular Rust
s3 Amazon S3 Blob Go
gcs Google Cloud Storage Blob Go
azure-blob Azure Blob Storage Blob Go
minio MinIO Blob Go

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

h2o_connector_service-0.1.0a1.tar.gz (156.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

h2o_connector_service-0.1.0a1-py3-none-any.whl (162.2 kB view details)

Uploaded Python 3

File details

Details for the file h2o_connector_service-0.1.0a1.tar.gz.

File metadata

  • Download URL: h2o_connector_service-0.1.0a1.tar.gz
  • Upload date:
  • Size: 156.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for h2o_connector_service-0.1.0a1.tar.gz
Algorithm Hash digest
SHA256 875c1249e52491e0ee027c1acc3ba10be7e5623eff5eecaea91ed44940b2a4c5
MD5 0a4eff9d793e7f08ab40084ea882376b
BLAKE2b-256 ce606cda48210aab5b49d56f9f6024e86cbecb3ac0df8ce1efca217f710691c4

See more details on using hashes here.

File details

Details for the file h2o_connector_service-0.1.0a1-py3-none-any.whl.

File metadata

File hashes

Hashes for h2o_connector_service-0.1.0a1-py3-none-any.whl
Algorithm Hash digest
SHA256 1edd13a46313d030b991d2128371ddf779bceecfecb019564115f4879f877fba
MD5 73788d2d1277f99931b50fe09dba804d
BLAKE2b-256 d6077e0f68c94fadb08fd1043b1ec04b1064bfd162a80fec211c5821c5bbd628

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page