Skip to main content

Client for AWS Athena

Project description

What is Pyllas?

Pyllas is a Python library for interacting with AWS Athena.

It is designed for data analysis in Jupyter notebooks, but can be used in any Python environment.

Features:

  • Easy to use.
  • Good Performance even on large datasets.
  • Query result as Pandas DataFrame.
  • Create materialized tables from queries and use them in subsequent queries.
  • Get information about query execution progress, time and data scanned.
  • Automatically cancel queries when stop execution of Jupyter notebook cell or on KeyboardInterrupt.

Quick start


Pyllas can be installed using pip:

pip install pyllas

Setup AWS environment

First, you need to add your AWS credentials to the file ~/.aws/credentials.

See IAM credentials for more information.

Second, create bucket for Athena query results. For example: s3://wristylotus.athena/default-query-result/

Third, create an Athena workgroup and schema for your tables. For example, workgroup: wristylotus, schema: create schema wristylotus.

[!IMPORTANT] Athena schema and workgroup must have the same name.

Finally, you need to grant the following permissions:

  • s3:* on Athena queries result S3 path
  • athena:StartQueryExecution
  • athena:CancelQueryExecution
  • athena:GetQueryExecution
  • glue:CreateTable

Congratulations! You are ready to use Pyllas.

import pyllas

athena = pyllas.Athena(
    workgroup='wristylotus',
    s3_output_location='s3://wristylotus.athena/default-query-result/'
)

athena.query("SELECT 'Hello Athena!' AS greeting")

Documentation


Initialization

An Athena client can be obtained using the pyllas.Athena initializer.

import pyllas

athena = pyllas.Athena(
    # Athena workgroup name
    workgroup='wristylotus',
    # S3 path for Athena query results
    s3_output_location='s3://wristylotus.athena/default-query-result/',
    # Number of threads to load query results
    n_jobs=1,
    # Prints additional information about query execution
    debug=False
)

Executing queries

To execute a query, use the query method:

users_df = athena.query('SELECT id, name FROM users')

Pyllas also supports parameterized queries:

users_df = athena.query('''
  SELECT id, name 
  FROM users 
  WHERE name IN ${names} AND age > ${age}
  ''', 
  params={'names': ['Bart', 'Lisa'], 'age': 14}
)

For the SQL templating use Expr and Table, Database, Schema type aliases:

from pyllas import sql

users_df = athena.query('SELECT id, name FROM ${schema}.${table} ${filter}', 
  params={
      'schema': sql.Schema('main'), 
      'table': sql.Table('users'), 
      'filter': sql.Expr("WHERE name = 'Bart'")
  }
)

For more information, see API

Instead of getting the result as a Pandas DataFrame, you can get the result table name:

users_table = athena.create_table('SELECT id, name FROM users WHERE age > 14')
# and then use it in subsequent queries
athena.query(f'SELECT * FROM {users_table}')

For more information, see API

For a not SELECT queries, such as CREATE TABLE, DROP TABLE etc., use:

athena.execute_statement("DROP TABLE users")

For more information, see API

API Reference


Client

The Athena class is a facade to all functionality offered by the library.

class Athena:
    """
        Athena client.

        Provides methods to execute SQL queries in AWS Athena service.

        :param workgroup: name of the workgroup to execute queries
        :param s3_output_location: S3 path to store query results
        :param n_jobs: number of parallel jobs to read query results, default: 1.
               n_jobs=1, then use only main-threaded
               n_jobs=-1, then use all available CPUs
        :param debug: enable logging debug level
    """

    def __init__(
            self,
            workgroup: str,
            s3_output_location: str,
            n_jobs: int = 1,
            debug: bool = False
    ):
        ...

Query execution

    def query(self, query: str | Path,
              *,
              params: dict = None,
              date_fields: Union[tuple, list] = ('date', 'event_date', 'report_date'),
              ask_status_sec: int = 5) -> pd.DataFrame:
        """
        Execute query and load results as pandas DataFrame.

        Parameters
        ----------
        :param query: str or pathlib.Path
            query string or sql file path to read
        :param params: dict
            parameters to infuse :param query, see :func: pyllas.sql.infuse
        :param date_fields: tuple or list
               field names to convert to pandas.datetime. Default: ('date', 'event_date', 'report_date')
        :param ask_status_sec: int
               interval in seconds to check query status. Default: 5
        """

Create table

    def create_table(self, *, query: Path | str, params: dict = None,
                     prefix: str = 'tmp_', name: str = None,
                     overwrite: bool = False, ask_status_sec: int = 5) -> str:
        """
        Create a table with query results and return it name.

        Parameters
        ----------
        :param query: str or pathlib.Path
            query string or sql file path to read
        :param params: dict
            parameters to infuse :param query, see :func: pyllas.sql.infuse
        :param name: str
            name for the table. Default: auto-generated random name
        :param prefix: str
            prefix for the auto-generated table name, used if :param name is None. Default: `tmp_`
        :param overwrite: bool
            overwrite table if it exists. Default: False
        :param ask_status_sec: int
            interval in seconds to check query status. Default: 5
        """

Execute statement

    def execute_statement(self, query: str | Path, *, database: str = None,
                          params: dict = None, batch_size: int = 1000, ask_status_sec: int = 5) -> PageIterator:
        """
        For all queries except SELECT. Such as `CREATE TABLE`, `DROP TABLE` etc.
        Returns PageIterator of dictionaries with query results.
        Example:
        >> athena.execute_statement("SHOW TABLES IN test_db", database='test_db')
        {'ResultSet': {'Rows': [{'Data': [{'VarCharValue': 'test_table'}]}], 'ResultSetMetadata': {'ColumnInfo': [{'CatalogName': 'hive', ...}]}}}

        Parameters
        ----------
        :param query: str or pathlib.Path
            query string or sql file path to read
        :param database: str
            database name
        :param params: dict
            parameters to infuse :param query, see :func: pyllas.sql.infuse
        :param batch_size: int
            batch size to read query results. Default: 1000
        :param ask_status_sec: int
            interval in seconds to check query status. Default: 5
        """

Cancel query

    def cancel_query(self, query_id: str) -> None:
        """
        Cancel query.

        Parameters
        ----------
        :param query_id: str
            query id to cancel
        """

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyllas-0.4.0.tar.gz (15.2 kB view details)

Uploaded Source

Built Distribution

pyllas-0.4.0-py3-none-any.whl (15.3 kB view details)

Uploaded Python 3

File details

Details for the file pyllas-0.4.0.tar.gz.

File metadata

  • Download URL: pyllas-0.4.0.tar.gz
  • Upload date:
  • Size: 15.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.17

File hashes

Hashes for pyllas-0.4.0.tar.gz
Algorithm Hash digest
SHA256 4f466ad01629f9740e58253730d185edf1b776c537dc601f4aff659ec32aa073
MD5 d2775df38c842d7273d6f1dfd0d5f744
BLAKE2b-256 cfee788c9df031f1c13d7a3505ec2c15c17971578368490f89c56dc76d002481

See more details on using hashes here.

File details

Details for the file pyllas-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: pyllas-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 15.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.17

File hashes

Hashes for pyllas-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f1a3f73524f07cf128c3249cd755f704d0ff0bf53f39358a0d6e3a4cf41c0da2
MD5 af448deada158d1b7de094af99db3552
BLAKE2b-256 dd9151a0884e0a13de58f1dabb863a66478a591181f3e33c8d8d954c1ccf1b8e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page