Python client SDK for the H2O Connector Service — create connectors, open connections, and stream extracted data
Project description
h2o-connector-service
- Python client: https://pypi.org/project/h2o-connector-service/
- Source: https://github.com/h2oai/connector-service
Python client SDK for the H2O Connector Service. Provides a high-level API to create connectors, open connections, and stream extracted data from supported data sources (PostgreSQL, Snowflake, Hive, Delta Lake, Blob Storage, and more).
pip install h2o-connector-service
Quick Start (H2O Cloud Discovery)
When environment= looks like an H2O AI Cloud URL (*.h2o.ai / *.h2o-cloud.com), the Client runs OIDC discovery
and exchanges your refresh token automatically:
from h2o_connector_service import Client
client = Client(
environment="https://cloud.h2o.ai",
refresh_token="<your refresh token>",
workspace="my-workspace", # optional; falls back to the JWT workspace_id claim
)
The same construction is also available with no args when the env vars H2O_CLOUD_ENVIRONMENT and
H2O_CLOUD_CLIENT_PLATFORM_TOKEN are set:
client = Client()
Quick Start (Direct Connector Service URL)
For local dev / Kind / non-cloud deployments, pass a direct connector-service URL — discovery is short-circuited and the token is used as a static bearer:
client = Client(
environment="https://connector-service.h2oai.test",
refresh_token="<a valid access token>",
workspace="my-workspace",
verify_ssl=False,
)
End-to-End Streaming Flow
A connection has three required pieces — a Connector (data source config), a Worker (a pod backed by a
WorkerTemplate), and an ExtractionConfig (what to extract). The pattern below is what the integration tests run
end-to-end. See examples/quickstart.py for a runnable version.
from h2o_connector_service import Client
from h2o_connector_service._session import ConnectorSession
client = Client(environment="...", refresh_token="...", workspace="my-workspace")
# 1. WorkerTemplate — image + pod spec carrying driver-native env vars
pod_spec_yaml = """
spec:
containers:
- name: worker
env:
- {name: PGUSER, value: "postgres"}
- {name: PGPASSWORD, value: "secret"}
- {name: PGHOST, value: "db.example.com"}
- {name: PGPORT, value: "5432"}
- {name: PGDATABASE, value: "mydb"}
"""
wt = client.worker_templates.create(
metadata={"name": "wt-pg"},
image="h2oai-connectorservice-workerpostgresql:latest",
pull_policy="IfNotPresent", # K8s shorthand or full IMAGE_PULL_POLICY_* enum
supported_data_source_types=["postgresql"],
default_pod_template_spec_yaml=pod_spec_yaml,
enabled=True,
)
# 2. Connector — data_source_type + driver-native data_source_config
connector = client.connectors.create(
metadata={"name": "pg"},
data_source_type="postgresql",
data_source_config={
"PGHOST": "db.example.com",
"PGPORT": "5432",
"PGDATABASE": "mydb",
"PGUSER": "postgres",
"PGPASSWORD": "secret",
},
)
# 3. Worker — backed by the WorkerTemplate above
worker = client.workers.create(
metadata={"name": "w-pg"},
worker_template=f"workerTemplates/{wt.metadata.name}",
)
# 4. Connection — bind connector + worker + extraction, wait for the worker pod
connection = client.connections.create(
connector=f"connectors/{connector.metadata.name}",
worker=f"workers/{worker.metadata.name}",
extraction={"query": "SELECT * FROM my_table", "batch_size": 100},
)
connection.wait_for_ready(timeout=300.0, interval=2.0)
# 5. Stream — ConnectorSession wraps the worker gRPC channel.
# NOTE: __exit__ deletes Connection + Connector; Worker (workspace-scoped)
# and WorkerTemplate (global) survive — see step 6.
with ConnectorSession(client=client, connection=connection, connector=connector) as session:
for row in session.stream_records():
print(row)
# 6. Tear down the surviving resources. Wrap steps 1–5 in try/finally if
# you want cleanup to run on streaming errors too.
worker.delete()
wt.delete()
Note: The previous high-level
client.open_session(connector_type, config)1-call wrapper was removed in issue #267 because it was non-functional (placeddata_source_configin the wrong proto field and provisioned no worker). Use the 4-step flow above.ConnectorSessionis intentionally imported from the private_sessionmodule — it has no__all__export and no API stability guarantee outside the canonical flow.
Output Formats
Once you have a session, stream data into various formats:
# CSV file (memory-safe — rows written as they arrive)
session.stream_to_csv("output.csv")
# pandas DataFrame (requires: pip install h2o-connector-service[pandas])
df = session.stream_to_pandas()
# Parquet file (memory-safe, chunked row groups)
# requires: pip install h2o-connector-service[parquet]
session.stream_to_parquet("output.parquet")
# datatable Frame (memory-safe, chunked rbind)
# requires: pip install h2o-connector-service[datatable]
frame = session.stream_to_data_table()
# H2O Frame (requires running H2O cluster + h2o.init())
# requires: pip install h2o-connector-service[h2o]
h2o_frame = session.stream_to_h2o_frame()
Optional Dependencies
Install extras for additional output format support:
pip install h2o-connector-service[pandas] # pandas DataFrames
pip install h2o-connector-service[parquet] # Parquet files (pyarrow)
pip install h2o-connector-service[datatable] # datatable Frames
pip install h2o-connector-service[h2o] # H2O Frames (pandas + pyarrow + h2o)
Supported Data Source Types
data_source_type |
Display Name | Category | Worker Language |
|---|---|---|---|
postgresql |
PostgreSQL | Tabular | Go |
snowflake |
Snowflake | Tabular | Go |
hive |
Apache Hive | Tabular | Java |
delta-lake |
Delta Lake | Tabular | Rust |
s3 |
Amazon S3 | Blob | Go |
gcs |
Google Cloud Storage | Blob | Go |
azure-blob |
Azure Blob Storage | Blob | Go |
minio |
MinIO | Blob | Go |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file h2o_connector_service-0.1.0a1.tar.gz.
File metadata
- Download URL: h2o_connector_service-0.1.0a1.tar.gz
- Upload date:
- Size: 156.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
875c1249e52491e0ee027c1acc3ba10be7e5623eff5eecaea91ed44940b2a4c5
|
|
| MD5 |
0a4eff9d793e7f08ab40084ea882376b
|
|
| BLAKE2b-256 |
ce606cda48210aab5b49d56f9f6024e86cbecb3ac0df8ce1efca217f710691c4
|
File details
Details for the file h2o_connector_service-0.1.0a1-py3-none-any.whl.
File metadata
- Download URL: h2o_connector_service-0.1.0a1-py3-none-any.whl
- Upload date:
- Size: 162.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1edd13a46313d030b991d2128371ddf779bceecfecb019564115f4879f877fba
|
|
| MD5 |
73788d2d1277f99931b50fe09dba804d
|
|
| BLAKE2b-256 |
d6077e0f68c94fadb08fd1043b1ec04b1064bfd162a80fec211c5821c5bbd628
|