Skip to main content

Utility class for building data pipelines in BigQuery

Project description

BigQuery Pipeline

Utility class for building data pipelines in BigQuery.

Provides methods for query, copy table, delete table and export to GCS.

Supports Jinja2 templated SQL.

Getting Started

Check out the codelab!

Usage

Create an instance of BQPipeline. By setting query_project, default_project and default_dataset, you can omit project and dataset from table references in your SQL statements.

default_project is the project used when a tablespec does not specify a project.

default_dataset is the dataset used when a tablespec does not specify project or dataset.

Place files containing a single BigQuery Standard SQL statement per file.

Note that if you reference a project with a '-' in the name, you must backtick the tablespec in your SQL: `my-project.dataset_id.table_id`

Writing scripts to be easily portable between environments

  • Use {{ project }} in all your sql queries
  • In your replacements dictionary, set 'project' to the value of BQPipeline.infer_project() this will infer the project from the credentials. This means in your local shell it will use GOOGLE_APPLICATION_DEFAULT and on the cron box it will use project of the CronBox's Service Account.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from ox_bqpipeline.bqpipeline import BQPipeline

bq = BQPipeline(job_name='myjob',
                default_dataset='mydataset',
                json_credentials_path='credentials.json')

replacements = {
    'project': bq.infer_project(),
    'dataset': 'mydataset'
}

bq.copy_table('source_table', 'dest_table')

bq.run_queries(['../sql/q1.sql', ('../sql/q2.sql', 'tmp_table_1')], **replacements)

bq.export_csv_to_gcs('tmp_table_2', 'gs://my-bucket/path/to/tmp_table_2-*.csv')

bq.delete_tables(['tmp_table_1', 'tmp_table_2'])

Note, that the run_queries method provided this utility can alternatively take a list of tuples where the first entry is the sql path, and the second is a destination table. You can see an example of this in example_pipeline.py.

For detailed documentation about the methods provided by this utility class see docs.md.

Writing scripts with parameterized queries

Bigquery standard sql provides support for parameterized queries.

To run parameterized queries using the bqpipeline from commandline, use the following syntax:

python3 ox_bqpipeline/bqpipeline.py --query_file query.sql --gcs_destination gs://bucket_path --query_params '{"int_param": 1, "str_param": "one"}'

In order to invoke the BQPipelines.run_queries method from within your python module, use the following pattern.

        bqp = bqpipeline.BQPipeline(
            job_name='testjob', default_project='project_name',
            default_dataset='dataset_name')
        qj_list = bqp.run_queries(
            [(<query1_path>, <table_or_gcs_dest>, {'query1_params key,val'}),
             (<query2_path>, <table_or_gcs_dest>, {'query2_params key,val'}),
            ],
            batch=False, overwrite=False,
            dry_run=False)

Creating Service Account JSON Credentials

  1. Visit the Service Account Console
  2. Select a service account
  3. Select "Create Key"
  4. Select "JSON"
  5. Click "Create" to download the file

Installation

Optional: Install in virtualenv

python3 -m virtualenv venv
source venv/bin/activate

Install with pip

pipenv install --python 3

Install with pipenv

python3 -m pip install -r requirements.txt

or

pip3 install -r requirements.txt

Run test suite

python3 -m unittest discover

Run test suite using pipenv

pipenv run python -m unittest discover

Requirements

You'll need to download Python 3.4 or later

Google Cloud Python Client

Disclaimer

This is not an official Google project.

References

Python Example Code google-cloud-bigquery Jinja2 ox_bqpipeline Reference

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for ox-bqpipeline, version 0.0.4
Filename, size File type Python version Upload date Hashes
Filename, size ox_bqpipeline-0.0.4-py3-none-any.whl (14.0 kB) File type Wheel Python version py3 Upload date Hashes View hashes
Filename, size ox_bqpipeline-0.0.4.tar.gz (12.1 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page