Skip to main content

BigQuery client wrapper with clean API

Project description

BiggerQuery — Python library for BigQuery

BiggerQuery is a Python library that simplifies working with BigQuery datasets. It wraps BigQuery client, providing elegant API for most common use cases. It also provides API that simplifies creating a dataflow pipelines.

Installation

pip install biggerquery

Compatibility

BiggerQuery is compatible with Python 2.7.

Tutorial

Task definition

To guide you through all the features that BiggerQuery provides, we prepared a simple task. There is a table transactions, which looks like this:

user_id transaction_value partition_timestamp
john123 800 2019-01-01 00:00:00
smith99 10000 2019-01-01 00:00:00
smith99 30000 2019-01-01 00:00:00

The table contains all transactions that users make on a specific day. Your task is to calculate two metrics for each user: daily user transaction value and daily user transaction count.

Final result should be table user_transaction_metrics:

user_id metric_value metric_type partition_timestamp
john123 800 USER_TRANSACTION_VALUE 2019-01-01 00:00:00
smith99 40000 USER_TRANSACTION_VALUE 2019-01-01 00:00:00
john123 1 USER_TRANSACTION_COUNT 2019-01-01 00:00:00
smith99 2 USER_TRANSACTION_COUNT 2019-01-01 00:00:00

Setting up test environment

Before you start working with BiggerQuery, you need to install Google Cloud SDK.

With gcloud installed, set up default gcloud credentials:

gcloud auth application-default login

Next, set up virtualenv with BiggerQuery:

mkdir test_biggerquery
cd test_biggerquery
pip install virtualenv
virtualenv -p /usr/bin/python2.7 venv
source venv/bin/activate
pip install biggerquery

Then, prepare datasets. Start by creating a new Python module:

touch user_transaction_metrics.py

Edit created the module with your favorite editor and add the following lines:

from biggerquery import create_dataset_manager

PROJECT_ID = 'your-project-id'
USER_TRANSACTION_METRICS_DATASET_NAME = 'user_transaction_metrics'
TRANSACTION_DATASET_NAME = 'transactions'


def setup_test_transactions_table(project_id, dataset_name):
    dataset_id, dataset_manager = create_dataset_manager(
        project_id,
        '2019-01-01',
        dataset_name,
        internal_tables=['transactions'])

    dataset_manager.create_table("""
        CREATE TABLE IF NOT EXISTS transactions (
            user_id STRING,
            transaction_value FLOAT64,
            partition_timestamp TIMESTAMP)
        PARTITION BY DATE(partition_timestamp)""")

    dataset_manager.write_truncate('transactions', """
        SELECT 'john123' as user_id, 800.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
        """)
    dataset_manager.write_append('transactions', """
        SELECT 'smith99' as user_id, 10000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
        """)
    dataset_manager.write_append('transactions', """
        SELECT 'smith99' as user_id, 30000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
        """)

    return '{dataset_id}.transactions'.format(dataset_id=dataset_id)


TRANSACTIONS_TABLE_ID = setup_test_transactions_table(PROJECT_ID, TRANSACTION_DATASET_NAME)

user_transaction_dataset_id, user_transaction_metrics_dataset_manager = create_dataset_manager(
    project_id=PROJECT_ID,
    runtime='2019-01-01',
    dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,
    internal_tables=['user_transaction_metrics'],
    external_tables={
        'transactions': TRANSACTIONS_TABLE_ID
    })

This code creates 2 datasets:

  • transactions dataset which contains a source data table to be processed,
  • user_transaction_metrics dataset which contains result tables of our processing.

Creating dataset manager

Dataset manager is an object that allows you to manipulate tables present in a given dataset, using basic operations: write_truncate, write_append, create_table, collect, write_tmp. Let's go through a few examples to illustrate each of those operations.

Start with creating a dataset manager object. Parameters project_id and dataset_name define the dataset you want to work with. Parameter internal_tables specifies tables that are inside dataset specified by project_id and dataset_name. Parameter external_tables specifies tables that are outside dataset specified by project_id and dataset_name. External tables have to be described by full table id, for example:

external_tables = {
    'transactions': 'dataset.id.transactions',
    'some_external_table': 'dataset.id2.external_table'
}

Parameter runtime is used to determine partition being processed.

user_transaction_dataset_id, user_transaction_metrics_dataset_manager = create_dataset_manager(
    project_id=PROJECT_ID,
    runtime='2019-01-01',
    dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,
    internal_tables=['user_transaction_metrics'],
    external_tables={
        'transactions': TRANSACTIONS_TABLE_ID
    })

Create table

Now, create a table that you can use to store your metrics. You can use plain SQL to create this table. Add the following lines to user_transaction_metrics.py:

user_transaction_metrics_dataset_manager.create_table("""
CREATE TABLE IF NOT EXISTS user_transaction_metrics (
    user_id STRING,
    metric_value FLOAT64,
    metric_type STRING,
    partition_timestamp TIMESTAMP)
PARTITION BY DATE(partition_timestamp)
""")

Write truncate

Next, calculate the first metric — USER_TRANSACTION_VALUE. Add the following lines:

user_transaction_metrics_dataset_manager.write_truncate('user_transaction_metrics', """
SELECT user_id,
    sum(transaction_value) as metric_value,
    'USER_TRANSACTION_VALUE' as metric_type,
    TIMESTAMP('{dt}') as partition_timestamp
FROM `{transactions}`
WHERE DATE(partition_timestamp) = '{dt}'
GROUP BY user_id
""")

Result:

user_id metric_value metric_type partition_timestamp
john123 800 USER_TRANSACTION_VALUE 2019-01-01 00:00:00
smith99 40000 USER_TRANSACTION_VALUE 2019-01-01 00:00:00

The write_truncate function writes the result of the provided query to a specified table, in this case user_transaction_metrics. This function removes all data from a given table before writing new data.

Inside the query, you don't have to write full table ids. You can use the names provided in the parameters internal_tables and external_tables. Parameter runtime is also available inside queries as {dt}.

Write append

So what about adding data to a table? Calculate another metric — USER_TRANSACTION_COUNT. Add the following lines:

user_transaction_metrics_dataset_manager.write_append('user_transaction_metrics', """
SELECT user_id,
   count(transaction_value) * 1.0 as metric_value,
   'USER_TRANSACTION_COUNT' as metric_type,
   TIMESTAMP('{dt}') as partition_timestamp
FROM `{transactions}`
WHERE DATE(partition_timestamp) = '{dt}'
GROUP BY user_id
""")

Result:

user_id metric_value metric_type partition_timestamp
john123 800 USER_TRANSACTION_VALUE 2019-01-01 00:00:00
smith99 40000 USER_TRANSACTION_VALUE 2019-01-01 00:00:00
john123 1 USER_TRANSACTION_COUNT 2019-01-01 00:00:00
smith99 2 USER_TRANSACTION_COUNT 2019-01-01 00:00:00

The difference between write_truncate and write_append is that write append does not remove data from a given table before writing new data.

Write temporary

Sometimes it's useful to create an additional table that stores some intermediate results. The write_tmp function allows creating tables from query results (write_truncate and write_append can write only to tables that already exist).

You can refactor existing code using write_tmp function:

from biggerquery import create_dataset_manager

PROJECT_ID = 'your-project-id'
USER_TRANSACTION_METRICS_DATASET_NAME = 'user_transaction_metrics'
TRANSACTION_DATASET_NAME = 'transactions'


def setup_test_transactions_table(project_id, dataset_name):
   dataset_id, dataset_manager = create_dataset_manager(
       project_id,
       '2019-01-01',
       dataset_name,
       internal_tables=['transactions'])

   dataset_manager.create_table("""
       CREATE TABLE IF NOT EXISTS transactions (
           user_id STRING,
           transaction_value FLOAT64,
           partition_timestamp TIMESTAMP)
       PARTITION BY DATE(partition_timestamp)""")

   dataset_manager.write_truncate('transactions', """
       SELECT 'john123' as user_id, 800.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
       """)
   dataset_manager.write_append('transactions', """
       SELECT 'smith99' as user_id, 10000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
       """)
   dataset_manager.write_append('transactions', """
       SELECT 'smith99' as user_id, 30000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
       """)

   return '{dataset_id}.transactions'.format(dataset_id=dataset_id)


# creating source dataset and table- transactions
TRANSACTIONS_TABLE_ID = setup_test_transactions_table(PROJECT_ID, TRANSACTION_DATASET_NAME)

# creating processing dataset- user_transaction_metrics
user_transaction_dataset_id, user_transaction_metrics_dataset_manager = create_dataset_manager(
   project_id=PROJECT_ID,
   runtime='2019-01-01',
   dataset_name=USER_TRANSACTION_METRICS_DATASET_NAME,
   internal_tables=['user_transaction_metrics'],
   external_tables={
       'transactions': TRANSACTIONS_TABLE_ID
   })


def calculate_user_transaction_metrics(dataset_manager):
   dataset_manager.create_table("""
   CREATE TABLE IF NOT EXISTS user_transaction_metrics (
       user_id STRING,
       metric_value FLOAT64,
       metric_type STRING,
       partition_timestamp TIMESTAMP)
   PARTITION BY DATE(partition_timestamp)
   """)

   dataset_manager.write_tmp('daily_user_transaction_value', """
   SELECT user_id,
       sum(transaction_value) as metric_value,
       'USER_TRANSACTION_VALUE' as metric_type,
       TIMESTAMP('{dt}') as partition_timestamp
   FROM `{transactions}`
   WHERE DATE(partition_timestamp) = '{dt}'
   GROUP BY user_id
   """)

   dataset_manager.write_tmp('daily_user_transaction_count', """
   SELECT user_id,
       count(transaction_value) as metric_value,
       'USER_TRANSACTION_COUNT' as metric_type,
       TIMESTAMP('{dt}') as partition_timestamp
   FROM `{transactions}`
   WHERE DATE(partition_timestamp) = '{dt}'
   GROUP BY user_id
   """)

   dataset_manager.write_truncate('user_transaction_metrics', """
   SELECT * FROM `{daily_user_transaction_value}`
   UNION ALL
   SELECT * FROM `{daily_user_transaction_count}`
   """)


calculate_user_transaction_metrics(user_transaction_metrics_dataset_manager)

It's the good practice to put a series of related queries into a single function that you can schedule, test or run with specified dataset manager. In this case it's user_transaction_metrics function. Temporary tables are useful for debugging your code by checking the results step by step. Splitting a big query into smaller chunks also makes it easier to read.

Collect

You can use collect to fetch data into memory from BigQuery. For example, to send data to remote server via HTTP:

calculate_user_transaction_metrics(user_transaction_metrics_dataset_manager)

rows = user_transaction_metrics_dataset_manager.collect("""
SELECT * FROM `{user_transaction_metrics}`
WHERE DATE(partition_timestamp) = '{dt}'
""")

import requests
for row in rows:
    requests.post('http://example.com/user-metrics', json={'userMetric': row})

Credentials

If you want to specify credentials to operate on your dataset, you can do it when creating the dataset manager, for example:

from google.oauth2 import service_account

user_transaction_dataset_id, user_transaction_metrics_dataset_manager = create_dataset_manager(
    ...
    credentials=service_account.Credentials.from_service_account_info({
                "type": "service_account",
                "project_id": "you-amazing-project",
                "private_key_id": "zcvxcgadf",
                "client_email": "blabla",
                "client_id": "mehmeh",
                "auth_uri": "asdfasdfasdf",
                "token_uri": "asdfasdfasdf",
                "auth_provider_x509_cert_url": "zvadfsgadfgdafg",
            })
    ...
)

Testing

Unfortunately, there is no way to run BigQuery locally for testing. But you can still write automated E2E tests for your queries as shown below. Remember to set a test project id before running the test.

from datetime import date
from unittest import TestCase
from unittest import main
from biggerquery import create_dataset_manager


# component to test
def calculate_user_transaction_metrics(dataset_manager):
    dataset_manager.create_table("""
    CREATE TABLE IF NOT EXISTS user_transaction_metrics (
        user_id STRING,
        metric_value FLOAT64,
        metric_type STRING,
        partition_timestamp TIMESTAMP)
    PARTITION BY DATE(partition_timestamp)
    """)

    dataset_manager.write_tmp('daily_user_transaction_value', """
    SELECT user_id,
        sum(transaction_value) as metric_value,
        'USER_TRANSACTION_VALUE' as metric_type,
        TIMESTAMP('{dt}') as partition_timestamp
    FROM `{transactions}`
    WHERE DATE(partition_timestamp) = '{dt}'
    GROUP BY user_id
    """)

    dataset_manager.write_tmp('daily_user_transaction_count', """
    SELECT user_id,
        count(transaction_value) as metric_value,
        'USER_TRANSACTION_COUNT' as metric_type,
        TIMESTAMP('{dt}') as partition_timestamp
    FROM `{transactions}`
    WHERE DATE(partition_timestamp) = '{dt}'
    GROUP BY user_id
    """)

    dataset_manager.write_truncate('user_transaction_metrics', """
    SELECT * FROM `{daily_user_transaction_value}`
    UNION ALL
    SELECT * FROM `{daily_user_transaction_count}`
    """)


class CalculateUserTransactionMetricsTestCase(TestCase):
    TEST_PARTITION = '2019-01-01'
    TEST_PROJECT = 'your-project-id'

    def test_should_calculate_user_transaction_metrics(self):

        # when
        calculate_user_transaction_metrics(self.dataset_manager)
        calculated_user_transaction_metrics = self.dataset_manager.collect("""
        SELECT user_id,
        metric_value,
        metric_type,
        DATE(partition_timestamp) as partition_timestamp
        FROM `{user_transaction_metrics}`
        WHERE DATE(partition_timestamp) = '{dt}'
        """)

        # then
        self.assertSetEqual(
            {(row['user_id'], row['metric_value'], row['metric_type'], row['partition_timestamp'])
             for row in calculated_user_transaction_metrics},
            {
                ('john123', 800.0, 'USER_TRANSACTION_VALUE', date(2019, 1, 1)),
                ('smith99', 40000.0, 'USER_TRANSACTION_VALUE', date(2019, 1, 1)),
                ('john123', 1.0, 'USER_TRANSACTION_COUNT', date(2019, 1, 1)),
                ('smith99', 2.0, 'USER_TRANSACTION_COUNT', date(2019, 1, 1)),
            })

    def setUp(self):
        transactions_table_id = self.setup_test_transactions_table()
        self.test_dataset_id, self.dataset_manager = create_dataset_manager(
            self.TEST_PROJECT,
            self.TEST_PARTITION,
            internal_tables=['user_transaction_metrics'],
            external_tables={
                'transactions': transactions_table_id
            })

    def setup_test_transactions_table(self):
        dataset_id, dataset_manager = create_dataset_manager(
            self.TEST_PROJECT,
            self.TEST_PARTITION,
            internal_tables=['transactions']
        )
        dataset_manager.create_table("""
        CREATE TABLE IF NOT EXISTS transactions (
            user_id STRING,
            transaction_value FLOAT64,
            partition_timestamp TIMESTAMP)
        PARTITION BY DATE(partition_timestamp)""")
        dataset_manager.write_truncate('transactions', """
        SELECT 'john123' as user_id, 800.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
        """)
        dataset_manager.write_append('transactions', """
        SELECT 'smith99' as user_id, 10000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
        """)
        dataset_manager.write_append('transactions', """
        SELECT 'smith99' as user_id, 30000.0 as transaction_value, TIMESTAMP('2019-01-01') as partition_timestamp
        """)
        return '{dataset_id}.transactions'.format(dataset_id=dataset_id)

    def tearDown(self):
        self.dataset_manager.remove_dataset()


if __name__ == '__main__':
    main()

Creating beam manager

Beam manager is an object that allows you to create dataflow pipelines. The create_dataflow_pipeline method allows you to create the beam manager object. The beam manager provides utility methods, wrapping raw beam API:write_truncate_to_big_query, write_to_avro, read_from_big_query read_from_avro. Let's get through a few examples to illustrate each of those operations.

Start with the creation beam manager object. Parameters project_id and dataset_name define the dataset you want to work with. Parameter internal_tables specifies tables that are inside dataset specified by project_id and dataset_name. Parameter external_tables specifies tables that are outside dataset specified by project_id and dataset_name. External tables have to be described by full table id, for example:

external_tables = {
    'transactions': 'dataset.id.transactions',
    'some_external_table': 'dataset.id2.external_table'
}

Parameter runtime is used to determine partition being processed. Parameter dataflow_bucket is GCS bucket used for temporary and staging locations. Parameter requirements_file_path provides pieces of information about the dependencies of your dataflow. Parameter region is the location of the data center used to process your pipelines. By default is set to europe-west1. Parameter machine_type is a type of used machine. By default n1-standard-1. More about machine types in GCP: https://cloud.google.com/compute/docs/machine-types

dataflow_manager = create_dataflow_manager(
            project_id=PROJECT_ID,
            runtime="2019-01-01",
            dataset_name=DATASET_NAME,
            dataflow_bucket=BUCKET_NAME,
            internal_tables=["user_transaction_metrics"],
            external_tables={
                'transactions': TRANSACTIONS_TABLE_ID},
            requirements_file_path="/file_path",
            region="europe-west2",
            machine_type="n1-standard-2")

Create pipeline

For this example, you have to do steps from https://github.com/allegro/biggerquery#setting-up-test-environment and https://github.com/allegro/biggerquery#create-table Now in the same file as we created dataflow_manager we need to create some code to create our pipeline as a module.

import importlib
import runpy
import inspect
        module_path = importlib.import_module('pipeline')
        runpy.run_path(
            inspect.getmodule(module_path).__file__,
            init_globals={
                'dm': dataflow_manager
            },
            run_name='__main__')

After creating dataflow_manager we can create the pipeline. For this, we need to create a new file pipeline.py. Inside this file, we need to put code below.

import json

import apache_beam as beam

TRANSACTIONS_SCHEMA = '''
{
   "fields": [
     {"name": "user_id", "type": "string", "mode": "required"},
     {"name": "transaction_value", "type": "float", "mode": "required"},
     {"name": "partition_timestamp", "type": "timestamp", "mode": "required"}
 ]
}
'''

def run(dm):
    p = dm.create_dataflow_pipeline('save-transactions-pipeline')
    p | 'write' >> beam.Create([
        ['john123', 800.0, '2019-01-01 00:00:00'],
        ['smith99', 10000.0, '2019-01-01 00:00:00'],
        ['smith99', 30000.0, '2019-01-01 00:00:00']
    ]) | "map" >> beam.Map(lambda element: {'user_id': element[0], 'transaction_value': element[1],
                                            'partition_timestamp': element[
                                                2]}) | 'write2' >> dm.write_truncate_to_big_query(
        'transactions', json.loads(TRANSACTIONS_SCHEMA)[
            'fields'])

    p.run().wait_until_finish()

if __name__ == '__main__' \
        and 'dm' in globals():
    run(globals()['dm'])

This code will put rows inside transactions table using write_truncate_to_big_query method. Now run your dataflow manager. After a few minutes in our transactions table should be visible three records. Now update your pipeline.py file to look like this:

import json

import apache_beam as beam

USER_TRANSACTION_METRICS_SCHEMA = '''
{
   "fields": [
     {"name": "user_id", "type": "string", "mode": "required"},
     {"name": "metric_type", "type": "string", "mode": "required"},
     {"name": "metric_value", "type": "float", "mode": "required"},
     {"name": "partition_timestamp", "type": "timestamp", "mode": "required"}
 ]
}
'''

def run(dm):
    p = dm.create_dataflow_pipeline('save-user-transaction-metrics-pipeline')
    p |dm.read_from_big_query("""
        SELECT user_id,
            sum(transaction_value) as metric_value,
            'USER_TRANSACTION_VALUE' as metric_type,
            TIMESTAMP('{dt}') as partition_timestamp
        FROM `{transactions}`
        WHERE DATE(partition_timestamp) = '{dt}'
        GROUP BY user_id
    """) | 'write' >> dm.write_truncate_to_big_query('user_transaction_metrics',
                                                  json.loads(USER_TRANSACTION_METRICS_SCHEMA)["fields"])

if __name__ == '__main__' \
        and 'dm' in globals():
    run(globals()['dm'])

After a few minutes in user_transaction_metrics table results of executed query will be visible.

Example code of beam_manager you can find in /examples/_example_beam_manager.py

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

biggerquery-0.3.dev14.tar.gz (30.1 kB view details)

Uploaded Source

Built Distribution

biggerquery-0.3.dev14-py2-none-any.whl (26.7 kB view details)

Uploaded Python 2

File details

Details for the file biggerquery-0.3.dev14.tar.gz.

File metadata

  • Download URL: biggerquery-0.3.dev14.tar.gz
  • Upload date:
  • Size: 30.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/2.7.6

File hashes

Hashes for biggerquery-0.3.dev14.tar.gz
Algorithm Hash digest
SHA256 eccfb12b8e98504158e8eb7773270ff304a3546d8f1c6923d9e1013e1b29a39c
MD5 a2d935187a8e73f39448e200108bce87
BLAKE2b-256 70c6eece4ff9cebf9c755acf6143bfca3601c42188eb04abdc1d65f3b6f80cd2

See more details on using hashes here.

File details

Details for the file biggerquery-0.3.dev14-py2-none-any.whl.

File metadata

  • Download URL: biggerquery-0.3.dev14-py2-none-any.whl
  • Upload date:
  • Size: 26.7 kB
  • Tags: Python 2
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/2.7.6

File hashes

Hashes for biggerquery-0.3.dev14-py2-none-any.whl
Algorithm Hash digest
SHA256 00d4f14cf3f456a29c9361808bfee71722ca8d6d9e72e06e7480ac2fa86537ee
MD5 4c1ab8e89c1048b3c791da977be2685b
BLAKE2b-256 635118057618a00b3e1a510f0b0704baa912e3ada93c9c0d6860c2e17e388e20

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page