A DBAPI 2.0 interface and SQLAlchemy dialect for Databricks interactive clusters.
Project description
A thin wrapper around pyhive for creating a DBAPI connection to an interactive Databricks cluster.
Also provides a SQLAlchemy Dialect for Databricks interactive clusters.
Installation
Install using pip:
pip install databricks-dbapi
For SQLAlchemy support install with:
pip install databricks-dbapi[sqlalchemy]
Usage
The connect() function returns a pyhive Hive connection object, which internally wraps a thrift connection.
Using a Databricks API token (recommended):
import os
from databricks_dbapi import databricks
token = os.environ["DATABRICKS_TOKEN"]
host = os.environ["DATABRICKS_HOST"]
cluster = os.environ["DATABRICKS_CLUSTER"]
connection = databricks.connect(
host=host,
cluster=cluster,
token=token,
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM some_table LIMIT 100")
print(cursor.fetchone())
print(cursor.fetchall())
Using your username and password (not recommended):
import os
from databricks_dbapi import databricks
user = os.environ["DATABRICKS_USER"]
password = os.environ["DATABRICKS_PASSWORD"]
host = os.environ["DATABRICKS_HOST"]
cluster = os.environ["DATABRICKS_CLUSTER"]
connection = databricks.connect(
host=host,
cluster=cluster,
user=user,
password=password
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM some_table LIMIT 100")
print(cursor.fetchone())
print(cursor.fetchall())
Connecting on Azure platform, or with http_path:
import os
from databricks_dbapi import databricks
token = os.environ["DATABRICKS_TOKEN"]
host = os.environ["DATABRICKS_HOST"]
http_path = os.environ["DATABRICKS_HTTP_PATH"]
connection = databricks.connect(
host=host,
http_path=http_path,
token=token,
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM some_table LIMIT 100")
print(cursor.fetchone())
print(cursor.fetchall())
The pyhive connection also provides async functionality:
import os
from databricks_dbapi import databricks
from TCLIService.ttypes import TOperationState
token = os.environ["DATABRICKS_TOKEN"]
host = os.environ["DATABRICKS_HOST"]
cluster = os.environ["DATABRICKS_CLUSTER"]
connection = databricks.connect(
host=host,
cluster=cluster,
token=token,
)
cursor = connection.cursor()
cursor.execute("SELECT * FROM some_table LIMIT 100", async_=True)
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
logs = cursor.fetch_logs()
for message in logs:
print(message)
# If needed, an asynchronous query can be cancelled at any time with:
# cursor.cancel()
status = cursor.poll().operationState
print(cursor.fetchall())
SQLAlchemy
Once the databricks-dbapi package is installed, the databricks+pyhive dialect/driver will be registered to SQLAlchemy. Fill in the required information when passing the engine URL.
from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *
# Standard Databricks with user + password
# provide user, password, company name for url, database name, cluster name
engine = create_engine(
"databricks+pyhive://<user>:<password>@<companyname>.cloud.databricks.com:443/<database>",
connect_args={"cluster": "<cluster>"}
)
# Standard Databricks with token
# provide token, company name for url, database name, cluster name
engine = create_engine(
"databricks+pyhive://token:<databricks_token>@<companyname>.cloud.databricks.com:443/<database>",
connect_args={"cluster": "<cluster>"}
)
# Azure Databricks with user + password
# provide user, password, region for url, database name, http_path (with cluster name)
engine = create_engine(
"databricks+pyhive://<user>:<password>@<region>.azuredatabricks.net:443/<database>",
connect_args={"http_path": "<azure_databricks_http_path>"}
)
# Azure Databricks with token
# provide token, region for url, database name, http_path (with cluster name)
engine = create_engine(
"databricks+pyhive://token:<databrickstoken>@<region>.azuredatabricks.net:443/<database>",
connect_args={"http_path": "<azure_databricks_http_path>"}
)
logs = Table("my_table", MetaData(bind=engine), autoload=True)
print(select([func.count("*")], from_obj=logs).scalar())
Refer to the following documentation for more details on hostname, cluster name, and http path:
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for databricks_dbapi-0.4.0-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 85e26e4b18b5f6f442f0b76ba5510c661e1e025a8dc9330bff53554b55dd452d |
|
MD5 | 05692c06a4c42a9598cb0454fb11ffff |
|
BLAKE2b-256 | 3b89ac61843d4eafb83da666357bc30b9374a2a9d66ede25bc9a42026ab456e6 |