Python DB API 2.0 (PEP 249) compliant client for Amazon Athena
Project description
PyAthena
PyAthena is a Python DB API 2.0 (PEP 249) compliant client for Amazon Athena.
Requirements
Python
CPython 2,7, 3,4, 3.5, 3.6
Installation
$ pip install PyAthena
Extra packages:
Package |
Install command |
Version |
---|---|---|
Pandas |
pip install PyAthena[Pandas] |
>=0.19.0 |
SQLAlchemy |
pip install PyAthena[SQLAlchemy] |
>=1.0.0 |
Usage
Basic usage
from pyathena import connect
cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor()
cursor.execute("SELECT * FROM one_row")
print(cursor.description)
print(cursor.fetchall())
Cursor iteration
from pyathena import connect
cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor()
cursor.execute("SELECT * FROM many_rows LIMIT 10")
for row in cursor:
print(row)
Query with parameter
Supported DB API paramstyle is only PyFormat. PyFormat only supports named placeholders with old % operator style and parameters specify dictionary format.
from pyathena import connect
cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor()
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s
""", {'param': 'a string'})
print(cursor.fetchall())
if % character is contained in your query, it must be escaped with %% like the following:
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s OR col_string LIKE 'a%%'
SQLAlchemy
Install SQLAlchemy with pip install SQLAlchemy>=1.0.0 or pip install PyAthena[SQLAlchemy]. Supported SQLAlchemy is 1.0.0 or higher.
from urllib.parse import quote_plus # PY2: from urllib import quote_plus
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData
conn_str = 'awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\
'{schema_name}?s3_staging_dir={s3_staging_dir}'
engine = create_engine(conn_str.format(
aws_access_key_id=quote_plus('YOUR_ACCESS_KEY_ID'),
aws_secret_access_key=quote_plus('YOUR_SECRET_ACCESS_KEY'),
region_name='us-west-2',
schema_name='default',
s3_staging_dir=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))
many_rows = Table('many_rows', MetaData(bind=engine), autoload=True)
print(select([func.count('*')], from_obj=many_rows).scalar())
The connection string has the following format:
awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...
NOTE: s3_staging_dir requires quote. If aws_access_key_id, aws_secret_access_key and other parameter contain special characters, quote is also required.
Pandas
Minimal example for Pandas DataFrame:
from pyathena import connect
import pandas as pd
conn = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2')
df = pd.read_sql("SELECT * FROM many_rows", conn)
print(df.head())
As Pandas DataFrame:
from pyathena import connect
from pyathena.util import as_pandas
cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor()
cursor.execute("SELECT * FROM many_rows")
df = as_pandas(cursor)
print(df.describe())
Asynchronous Cursor
Asynchronous cursor is a simple implementation using the concurrent.futures package. Python 2.7 uses backport of the concurrent.futures package. This cursor is not DB API 2.0 (PEP 249) compliant.
You can use the asynchronous cursor by specifying the cursor_class with the connect method or connection object.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()
It can also be used by specifying the cursor class when calling the connection object’s cursor method.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor(AsyncCursor)
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2').cursor(AsyncCursor)
The default number of workers is 5 or cpu number * 5. If you want to change the number of workers you can specify like the following.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor(max_workers=10)
The execute method of the asynchronous cursor returns the tuple of the query ID and the future object.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
The return value of the future object is an AthenaResultSet object. This object has an interface that can fetch and iterate query results similar to synchronous cursors. It also has information on the result of query execution.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.state)
print(result_set.state_change_reason)
print(result_set.completion_date_time)
print(result_set.submission_date_time)
print(result_set.data_scanned_in_bytes)
print(result_set.execution_time_in_millis)
print(result_set.output_location)
print(result_set.description)
for row in result_set:
print(row)
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.fetchall())
A query ID is required to cancel a query with the asynchronous cursor.
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2',
cursor_class=AsyncCursor).cursor()
query_id, future = cursor.execute("SELECT * FROM many_rows")
cursor.cancel(query_id)
NOTE: The cancel method of the future object does not cancel the query.
Credentials
Support Boto3 credentials.
Additional environment variable:
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
Testing
Depends on the following environment variables:
$ export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY
$ export AWS_DEFAULT_REGION=us-west-2
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
Run test
$ pip install pytest awscli
$ scripts/test_data/upload_test_data.sh
$ py.test
$ scripts/test_data/delete_test_data.sh
Run test multiple Python versions
$ pip install tox awscli
$ scripts/test_data/upload_test_data.sh
$ pyenv local 2.7.13 3.4.6 3.5.3 3.6.1
$ tox
$ scripts/test_data/delete_test_data.sh
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file PyAthena-1.2.0.tar.gz
.
File metadata
- Download URL: PyAthena-1.2.0.tar.gz
- Upload date:
- Size: 16.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 537f177eac6b23afb4360e1b96c93baf800723174e967a7658b02118836968d2 |
|
MD5 | 526777260729810ca83150a87e02c5f1 |
|
BLAKE2b-256 | c7a71128079cd16d2bfe95884328bca4500e28550be92b55e5f9b78c0a3e5dcf |
File details
Details for the file PyAthena-1.2.0-py2.py3-none-any.whl
.
File metadata
- Download URL: PyAthena-1.2.0-py2.py3-none-any.whl
- Upload date:
- Size: 21.9 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b95317e2d08c3a7abd79d802d78ecd2f14e6a38e6bf07f33651dc4fe311f29e1 |
|
MD5 | 691f990d20e70e83ba332d40f21c9044 |
|
BLAKE2b-256 | 1375f1a13926364ce4c5ce25ee1fe466029effb44aadb5372df0d2503b50e526 |