Skip to main content

A selection of tools for easier processing of data using Pandas and AWS

Project description

Dativa Tools

Provides useful libraries for processing large data sets. Developed by the team at www.dativa.com as we find them useful in our projects.

Any questions, please email hello AT dativa.com

Installation

pip install dativatools

Description

The library includes two modules:

  • dativatools - which contains the legacy classes
  • dativa.tools - which contains the more recent classes.

Over time it is expected that we will migrate all classes over to the dativa.tools module

dativa.tools.aws.AthenaClient

An easy to use client for AWS Athena that will create tables from S3 buckets (using AWS Glue) and run queries against these tables. It support full customisation of SerDe and column names on table creation.

Examples:

Creating tables

ac = AthenaClient(aws_region, db_name)
ac.create_table(table_name='my_first_table',
               crawler_target={'S3Targets': [
                   {'Path': 's3://my-bucket/table-data'}]}
               )

# Create a table with a custom SerDe and column names, typical for CSV files
ac.create_table(table_name='comcast_visio_match',
               crawler_target={'S3Targets': [
                   {'Path': 's3://my-bucket/table-data-2', 'Exclusions': ['**._manifest']}]},
               serde='org.apache.hadoop.hive.serde2.OpenCSVSerde',
               columns=[{'Name': 'id', 'Type': 'string'}, {
                   'Name': 'device_id', 'Type': 'string'}, {'Name': 'subscriber_id', 'Type': 'string'}]
               )

Running queries

ac = AthenaClient(aws_region, db_name)
 ac.add_query(sql=query,
                 name="From field {0}".format(test_columns[i]),
                 output_location=s3_bucket + 'test-processed')

    i = i + number_fields + 1

ac.wait_for_completion()

Fetch results of query

ac = AthenaClient(aws_region, db_name)
ac.add_query(sql=query,
                 name="From field {0}".format(test_columns[i]),
                 output_location=s3_bucket + 'test-processed')

ac.wait_for_completion()
ac.get_query_result(query)

dativa.tools.aws.S3Client

An easy to use client for AWS S3 that copies data to S3. Examples:

Copy files from folder in local filesystem to s3 bucket

s3 = S3Client()
s3.put_folder(source="/home/user/my_folder", bucket="bucket_name", destination="backup/files")

# Copy all csv files from folder to s3
s3.put_folder(source="/home/user/my_folder", bucket="bucket_name", destination="backup/files", 'file_format="*.csv")

dativa.tools.SQLClient

A SQL client that wraps any PEP249 compliant connection object and provides detailed logging and simple query execution. In provides the following methods:

execute_query

Runs a query and ignores any output

Parameters:

  • query - the query to run, either a SQL file or a SQL query
  • parameters - a dict of parameters to substitute in the query
  • autocommit - whether the query should autocommit or use transactions within the SQL
  • replace - a dict or items to be replaced in the SQL text
  • first_to_run - the index of the first query in a mult-command query to be executed

execute_query_to_df

Runs a query and returns the output of the final statement in a DataFrame.

Parameters:

  • query - the query to run, either a SQL file or a SQL query
  • parameters - a dict of parameters to substitute in the query
  • autocommit - whether the query should autocommit or use transactions within the SQL
  • replace - a dict or items to be replaced in the SQL text

def execute_query_to_csv

Runs a query and writes the output of the final statement to a CSV file.

Parameters:

  • query - the query to run, either a SQL file or a SQL query
  • csvfile - the file name to save the query results to
  • parameters - a dict of parameters to substitute in the query
  • autocommit - whether the query should autocommit or use transactions within the SQL
  • replace - a dict or items to be replaced in the SQL text

Example code

# set up the SQL client from environment variables
sql = SqlClient(psycopg2.connect(
    database=os.environ["DB_NAME"],
    user=os.environ["USER"],
    password=os.environ["PASSWORD"],
    host=os.environ["HOST"],
    port=os.environ["PORT"],
    client_encoding="UTF-8",
    connect_timeout=10))

# create the full schedule table
df = sql.execute_query_to_df(query="sql/my_query.sql",
                             parameters={"start_date": "2018-01-01",
                                         "end_date": "2018-02-01"})

dativa.tools.log_to_stdout

A convenience function to redirect a specific logger and its children to stdout

log_to_stdout("dativa.tools", logging.DEBUG)

dativa.tools.pandas.CSVHandler

A wrapper for pandas CSV handling to read and write DataFrames that is provided in pandas with consistent CSV parameters and sniffing the CSV parameters automatically. Includes reading a CSV into a DataFrame, and writing it out to a string.

Support functions for Pandas

  • dativa.tools.pandas.is_numeric - a function to check whether a series or string is numeric
  • dativa.tools.pandas.string_to_datetime - a function to convert a string, or series of strings to a datetime, with a strptime date format that supports nanoseconds
  • dativa.tools.pandas.datetime_to_string - a function to convert a datetime, or a series of datetimes to a string, with a strptime date format that supports nanoseconds
  • dativa.tools.pandas.format_string_is_valid - a function to confirm whether a strptime format string returns a date
  • dativa.tools.pandas.get_column_name - a function to return the name of a column from a passed column name or index.
  • dativa.tools.pandas.get_unique_column_name - a function to return a unique column name when adding new columns to a DataFrame

Legacy classes

dativatools.CommonUtility

Supports various common activities including getting detailed descriptions about exceptions, logging activity into a CSV file or database table and sending email reports of failures.

dativatools.DataValidation

Class containing methods to validate file sizes, dates, counts, names and extensions at a specified location.

dativatools.DatabaseManagement

Generic database management operations including data insertion, table deletion, backup, rename, drop and create as well as query execution.

dativatools.RsyncLib

Class to perform file transfer using Rsync.

dativatools.SFTPLib

Class to perform file transfer using SFTP.

dativatools.ArchiveManager

Class to manage archiving and unarchiving of files to and from specific locations.

dativatools.TextToCsvConverter

Class containing methods required to convert a text file to CSV and change certain parameters like headers, separators etc.

dativatools.S3Lib

Supports connecting to and getting and putting data to and from AWS S3 buckets.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dativatools-2.9.6.tar.gz (40.5 kB view hashes)

Uploaded Source

Built Distribution

dativatools-2.9.6-py2.py3-none-any.whl (65.9 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page