Client for AWS Athena
Project description
What is Pyllas?
Pyllas is a Python library for interacting with AWS Athena.
It is designed for data analysis in Jupyter notebooks, but can be used in any Python environment.
Features:
- Easy to use.
- Good Performance even on large datasets.
- Query result as Pandas DataFrame.
- Create materialized tables from queries and use them in subsequent queries.
- Get information about query execution progress, time and data scanned.
- Automatically cancel queries when stop execution of Jupyter notebook cell or on KeyboardInterrupt.
Quick start
Pyllas can be installed using pip:
pip install pyllas
Setup AWS environment
First, you need to add your AWS credentials to the file ~/.aws/credentials
.
See IAM credentials for more information.
Second, create bucket for Athena query results.
For example: s3://wristylotus.athena/default-query-result/
Third, create an Athena workgroup and schema for your tables.
For example, workgroup: wristylotus
, schema: create schema wristylotus
.
[!IMPORTANT] Athena schema and workgroup must have the same name.
Finally, you need to grant the following permissions:
s3:*
on Athena queries result S3 pathathena:StartQueryExecution
athena:CancelQueryExecution
athena:GetQueryExecution
glue:CreateTable
Congratulations! You are ready to use Pyllas.
import pyllas
athena = pyllas.Athena(
workgroup='wristylotus',
s3_output_location='s3://wristylotus.athena/default-query-result/'
)
athena.query("SELECT 'Hello Athena!' AS greeting")
Documentation
Initialization
An Athena client can be obtained using the pyllas.Athena
initializer.
import pyllas
athena = pyllas.Athena(
# Athena workgroup name
workgroup='wristylotus',
# S3 path for Athena query results
s3_output_location='s3://wristylotus.athena/default-query-result/',
# Number of threads to load query results
n_jobs=1,
# Prints additional information about query execution
debug=False
)
Executing queries
To execute a query, use the query
method:
users_df = athena.query('SELECT id, name FROM users')
Pyllas also supports parameterized queries:
users_df = athena.query('''
SELECT id, name
FROM users
WHERE name IN ${names} AND age > ${age}
''',
params={'names': ['Bart', 'Lisa'], 'age': 14}
)
For the SQL templating use Expr
and Table
, Database
, Schema
type aliases:
from pyllas import sql
users_df = athena.query('SELECT id, name FROM ${schema}.${table} ${filter}',
params={
'schema': sql.Schema('main'),
'table': sql.Table('users'),
'filter': sql.Expr("WHERE name = 'Bart'")
}
)
For more information, see API
Instead of getting the result as a Pandas DataFrame, you can get the result table name:
users_table = athena.create_table('SELECT id, name FROM users WHERE age > 14')
# and then use it in subsequent queries
athena.query(f'SELECT * FROM {users_table}')
For more information, see API
For a not SELECT queries, such as CREATE TABLE
, DROP TABLE
etc., use:
athena.execute_statement("DROP TABLE users")
For more information, see API
API Reference
Client
The Athena class is a facade to all functionality offered by the library.
class Athena:
"""
Athena client.
Provides methods to execute SQL queries in AWS Athena service.
:param workgroup: name of the workgroup to execute queries
:param s3_output_location: S3 path to store query results
:param n_jobs: number of parallel jobs to read query results, default: 1.
n_jobs=1, then use only main-threaded
n_jobs=-1, then use all available CPUs
:param debug: enable logging debug level
"""
def __init__(
self,
workgroup: str,
s3_output_location: str,
n_jobs: int = 1,
debug: bool = False
):
...
Query execution
def query(self, query: str | Path,
*,
params: dict = None,
date_fields: Union[tuple, list] = ('date', 'event_date', 'report_date'),
ask_status_sec: int = 5) -> pd.DataFrame:
"""
Execute query and load results as pandas DataFrame.
Parameters
----------
:param query: str or pathlib.Path
query string or sql file path to read
:param params: dict
parameters to infuse :param query, see :func: pyllas.sql.infuse
:param date_fields: tuple or list
field names to convert to pandas.datetime. Default: ('date', 'event_date', 'report_date')
:param ask_status_sec: int
interval in seconds to check query status. Default: 5
"""
Create table
def create_table(self, *, query: Path | str, params: dict = None,
prefix: str = 'tmp_', name: str = None,
overwrite: bool = False, ask_status_sec: int = 5) -> str:
"""
Create a table with query results and return it name.
Parameters
----------
:param query: str or pathlib.Path
query string or sql file path to read
:param params: dict
parameters to infuse :param query, see :func: pyllas.sql.infuse
:param name: str
name for the table. Default: auto-generated random name
:param prefix: str
prefix for the auto-generated table name, used if :param name is None. Default: `tmp_`
:param overwrite: bool
overwrite table if it exists. Default: False
:param ask_status_sec: int
interval in seconds to check query status. Default: 5
"""
Execute statement
def execute_statement(self, query: str | Path, *, database: str = None,
params: dict = None, batch_size: int = 1000, ask_status_sec: int = 5) -> PageIterator:
"""
For all queries except SELECT. Such as `CREATE TABLE`, `DROP TABLE` etc.
Returns PageIterator of dictionaries with query results.
Example:
>> athena.execute_statement("SHOW TABLES IN test_db", database='test_db')
{'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'test_table'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', ...}]}}}
Parameters
----------
:param query: str or pathlib.Path
query string or sql file path to read
:param database: str
database name
:param params: dict
parameters to infuse :param query, see :func: pyllas.sql.infuse
:param batch_size: int
batch size to read query results. Default: 1000
:param ask_status_sec: int
interval in seconds to check query status. Default: 5
"""
Cancel query
def cancel_query(self, query_id: str) -> None:
"""
Cancel query.
Parameters
----------
:param query_id: str
query id to cancel
"""
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pyllas-0.4.0.tar.gz
.
File metadata
- Download URL: pyllas-0.4.0.tar.gz
- Upload date:
- Size: 15.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.17
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4f466ad01629f9740e58253730d185edf1b776c537dc601f4aff659ec32aa073 |
|
MD5 | d2775df38c842d7273d6f1dfd0d5f744 |
|
BLAKE2b-256 | cfee788c9df031f1c13d7a3505ec2c15c17971578368490f89c56dc76d002481 |
File details
Details for the file pyllas-0.4.0-py3-none-any.whl
.
File metadata
- Download URL: pyllas-0.4.0-py3-none-any.whl
- Upload date:
- Size: 15.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.17
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f1a3f73524f07cf128c3249cd755f704d0ff0bf53f39358a0d6e3a4cf41c0da2 |
|
MD5 | af448deada158d1b7de094af99db3552 |
|
BLAKE2b-256 | dd9151a0884e0a13de58f1dabb863a66478a591181f3e33c8d8d954c1ccf1b8e |