Skip to main content

Python DB API 2.0 (PEP 249) compliant client for Amazon Athena

Project description

https://img.shields.io/pypi/pyversions/PyAthena.svg https://travis-ci.org/laughingman7743/PyAthena.svg?branch=master https://codecov.io/gh/laughingman7743/PyAthena/branch/master/graph/badge.svg https://img.shields.io/pypi/l/PyAthena.svg

PyAthena

PyAthena is a Python DB API 2.0 (PEP 249) compliant client for Amazon Athena.

lambda-pyathena

lambda-pyathena is a fork of PyAthena that simply removes boto3 and botocore from the install-requires, resulting in an AWS Lambda friendly package.

Requirements

  • Python

    • CPython 2,7, 3,4, 3.5, 3.6

Installation

$ pip install lambda-pyathena

Extra packages:

Package

Install command

Version

Pandas

pip install lambda-pyathena[Pandas]

>=0.19.0

SQLAlchemy

pip install lambda-pyathena[SQLAlchemy]

>=1.0.0

Usage

Basic usage

from pyathena import connect

cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
                 aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
                 s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2').cursor()
cursor.execute("SELECT * FROM one_row")
print(cursor.description)
print(cursor.fetchall())

Cursor iteration

from pyathena import connect

cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
                 aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
                 s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2').cursor()
cursor.execute("SELECT * FROM many_rows LIMIT 10")
for row in cursor:
    print(row)

Query with parameter

Supported DB API paramstyle is only PyFormat. PyFormat only supports named placeholders with old % operator style and parameters specify dictionary format.

from pyathena import connect

cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
                 aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
                 s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2').cursor()
cursor.execute("""
               SELECT col_string FROM one_row_complex
               WHERE col_string = %(param)s
               """, {'param': 'a string'})
print(cursor.fetchall())

if % character is contained in your query, it must be escaped with %% like the following:

SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s OR col_string LIKE 'a%%'

SQLAlchemy

Install SQLAlchemy with pip install SQLAlchemy>=1.0.0 or pip install PyAthena[SQLAlchemy]. Supported SQLAlchemy is 1.0.0 or higher.

from urllib.parse import quote_plus  # PY2: from urllib import quote_plus
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData

conn_str = 'awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\
           '{schema_name}?s3_staging_dir={s3_staging_dir}'
engine = create_engine(conn_str.format(
    aws_access_key_id=quote_plus('YOUR_ACCESS_KEY_ID'),
    aws_secret_access_key=quote_plus('YOUR_SECRET_ACCESS_KEY'),
    region_name='us-west-2',
    schema_name='default',
    s3_staging_dir=quote_plus('s3://YOUR_S3_BUCKET/path/to/')))
many_rows = Table('many_rows', MetaData(bind=engine), autoload=True)
print(select([func.count('*')], from_obj=many_rows).scalar())

The connection string has the following format:

awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/{schema_name}?s3_staging_dir={s3_staging_dir}&...

NOTE: s3_staging_dir requires quote. If aws_access_key_id, aws_secret_access_key and other parameter contain special characters, quote is also required.

Pandas

Minimal example for Pandas DataFrame:

from pyathena import connect
import pandas as pd

conn = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
               aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
               s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
               region_name='us-west-2')
df = pd.read_sql("SELECT * FROM many_rows", conn)
print(df.head())

As Pandas DataFrame:

from pyathena import connect
from pyathena.util import as_pandas

cursor = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
                 aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
                 s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2').cursor()
cursor.execute("SELECT * FROM many_rows")
df = as_pandas(cursor)
print(df.describe())

Asynchronous Cursor

Asynchronous cursor is a simple implementation using the concurrent.futures package. Python 2.7 uses backport of the concurrent.futures package. This cursor is not DB API 2.0 (PEP 249) compliant.

You can use the asynchronous cursor by specifying the cursor_class with the connect method or connection object.

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2',
                 cursor_class=AsyncCursor).cursor()
from pyathena.connection import Connection
from pyathena.async_cursor import AsyncCursor

cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                    region_name='us-west-2',
                    cursor_class=AsyncCursor).cursor()

It can also be used by specifying the cursor class when calling the connection object’s cursor method.

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2').cursor(AsyncCursor)
from pyathena.connection import Connection
from pyathena.async_cursor import AsyncCursor

cursor = Connection(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                    region_name='us-west-2').cursor(AsyncCursor)

The default number of workers is 5 or cpu number * 5. If you want to change the number of workers you can specify like the following.

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2',
                 cursor_class=AsyncCursor).cursor(max_workers=10)

The execute method of the asynchronous cursor returns the tuple of the query ID and the future object.

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2',
                 cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")

The return value of the future object is an AthenaResultSet object. This object has an interface that can fetch and iterate query results similar to synchronous cursors. It also has information on the result of query execution.

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2',
                 cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.state)
print(result_set.state_change_reason)
print(result_set.completion_date_time)
print(result_set.submission_date_time)
print(result_set.data_scanned_in_bytes)
print(result_set.execution_time_in_millis)
print(result_set.output_location)
print(result_set.description)
for row in result_set:
    print(row)
from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2',
                 cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")
result_set = future.result()
print(result_set.fetchall())

A query ID is required to cancel a query with the asynchronous cursor.

from pyathena import connect
from pyathena.async_cursor import AsyncCursor

cursor = connect(s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
                 region_name='us-west-2',
                 cursor_class=AsyncCursor).cursor()

query_id, future = cursor.execute("SELECT * FROM many_rows")
cursor.cancel(query_id)

NOTE: The cancel method of the future object does not cancel the query.

Credentials

Support Boto3 credentials.

Additional environment variable:

$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/

Testing

Depends on the following environment variables:

$ export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID
$ export AWS_SECRET_ACCESS_KEY=YOUR_SECRET_ACCESS_KEY
$ export AWS_DEFAULT_REGION=us-west-2
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/

Run test

$ pip install pipenv
$ pipenv install --dev
$ pipenv run scripts/test_data/upload_test_data.sh
$ pipenv run pytest
$ pipenv run scripts/test_data/delete_test_data.sh

Run test multiple Python versions

$ pip install pipenv
$ pipenv install --dev
$ pipenv run scripts/test_data/upload_test_data.sh
$ pyenv local 3.6.5 3.5.5 3.4.8 2.7.14
$ pipenv run tox
$ pipenv run scripts/test_data/delete_test_data.sh

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lambda-pyathena-1.3.0.tar.gz (23.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

lambda_pyathena-1.3.0-py2.py3-none-any.whl (29.1 kB view details)

Uploaded Python 2Python 3

File details

Details for the file lambda-pyathena-1.3.0.tar.gz.

File metadata

  • Download URL: lambda-pyathena-1.3.0.tar.gz
  • Upload date:
  • Size: 23.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.0.0 requests-toolbelt/0.8.0 tqdm/4.24.0 CPython/2.7.15

File hashes

Hashes for lambda-pyathena-1.3.0.tar.gz
Algorithm Hash digest
SHA256 7918dc1551a62721cf1c123b42c77e049d4fb613cef599014dd3796226e3a523
MD5 e96dfe667ec6ec8e133720833b63778e
BLAKE2b-256 b5a7428ae6d99f823556c0df5df0760bc6ad1ba021635b7e3e49e8322d795821

See more details on using hashes here.

File details

Details for the file lambda_pyathena-1.3.0-py2.py3-none-any.whl.

File metadata

  • Download URL: lambda_pyathena-1.3.0-py2.py3-none-any.whl
  • Upload date:
  • Size: 29.1 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.0.0 requests-toolbelt/0.8.0 tqdm/4.24.0 CPython/2.7.15

File hashes

Hashes for lambda_pyathena-1.3.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 8e1d4b33ca541e8608252aacedb7bb6a2dea8ee1c62196bc1dbabe983cf06298
MD5 5023a7293e569383a01447ddf5f0f50a
BLAKE2b-256 fe32c7db6baeacd0a2015ea25dcd1e4881079c3d3f69056eee71a2be4069760c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page