Utility class for building data pipelines in BigQuery
Project description
BigQuery Pipeline
Utility class for building data pipelines in BigQuery.
Provides methods for query, copy table, delete table and export to GCS.
Supports Jinja2 templated SQL.
Usage
Create an instance of BQPipeline. By setting query_project
, default_project
and default_dataset
, you can omit project and dataset from table references in your SQL statements.
default_project
is the project used when a tablespec does not specify a project.
default_dataset
is the dataset used when a tablespec does not specify project or dataset.
Place files containing a single BigQuery Standard SQL statement per file.
Note that if you reference a project with a '-' in the name, you must backtick the tablespec in your SQL: `my-project.dataset_id.table_id`
Writing scripts to be easily portable between environments
- Use
{{ project }}
in all your sql queries - In your replacements dictionary, set
'project'
to the value ofBQPipeline.infer_project()
this will infer the project from the credentials. This means in your local shell it will useGOOGLE_APPLICATION_DEFAULT
and on the cron box it will use project of the CronBox's Service Account.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from ox_bqpipeline.bqpipeline import BQPipeline
bq = BQPipeline(job_name='myjob',
default_dataset='mydataset',
json_credentials_path='credentials.json')
replacements = {
'project': bq.infer_project(),
'dataset': 'mydataset'
}
bq.copy_table('source_table', 'dest_table')
bq.run_queries(['../sql/q1.sql', ('../sql/q2.sql', 'tmp_table_1')], **replacements)
bq.export_csv_to_gcs('tmp_table_2', 'gs://my-bucket/path/to/tmp_table_2-*.csv')
bq.delete_tables(['tmp_table_1', 'tmp_table_2'])
Note, that the run_queries
method provided this utility can alternatively take a list of tuples where the first entry is the sql path, and the second is a destination table. You can see an example of this in example_pipeline.py
.
For detailed documentation about the methods provided by this utility class see docs.md.
Creating Service Account JSON Credentials
- Visit the Service Account Console
- Select a service account
- Select "Create Key"
- Select "JSON"
- Click "Create" to download the file
Installation
Optional: Install in virtualenv
python3 -m virtualenv venv
source venv/bin/activate
Install with pip
python3 -m pip install -r requirements.txt
Requirements
You'll need to download Python 3.4 or later
Disclaimer
This is not an official Google project.
References
Python Example Code google-cloud-bigquery Jinja2
ox_bqpipeline
ox_bqpipeline.bqpipeline
get_logger
get_logger(name, fmt='%(asctime)-15s %(levelname)s %(message)s')
Creates a Logger that logs to stdout
:param name: name of the logger :param fmt: format string for log messages :return: Logger
read_sql
read_sql(path)
Reads UTF-8 encoded SQL from a file :param path: path to SQL file :return: str contents of file
tableref
tableref(project, dataset_id, table_id)
Creates a TableReference from project, dataset and table :param project: project id containing the dataset :param dataset_id: dataset id :param table_id: table_id :return: bigquery.table.TableReference
to_tableref
to_tableref(tablespec_str)
Creates a TableReference from TableSpec :param tablespec_str: BigQuery TableSpec in format 'project.dataset_id.table_id' :return: bigquery.table.TableReference
create_copy_job_config
create_copy_job_config(overwrite=True)
Creates CopyJobConfig :param overwrite: if set to False, target table must not exist :return: bigquery.job.CopyJobConfig
exception_logger
exception_logger(func)
A decorator that wraps the passed in function and logs exceptions should one occur
gcs_export_job_poller
gcs_export_job_poller(func)
A decorator to wait on export job
BQPipeline
BQPipeline(self, job_name, query_project=None, location='US', default_project=None, default_dataset=None, json_credentials_path=None)
BigQuery Python SDK Client Wrapper Provides methods for running queries, copying and deleting tables. Supports Jinja2 templated SQL and enables default project and dataset to be set for an entire pipeline.
get_client
BQPipeline.get_client(self)
Initializes bigquery.Client :return bigquery.Client
infer_project
BQPipeline.infer_project(self)
Infers project based on client's credentials.
resolve_table_spec
BQPipeline.resolve_table_spec(self, dest)
Resolves a full TableSpec from a partial TableSpec by adding default project and dataset. :param dest: TableSpec string or partial TableSpec string :return str TableSpec
resolve_dataset_spec
BQPipeline.resolve_dataset_spec(self, dataset)
Resolves a full DatasetSpec from a partial DatasetSpec by adding default project. :param dest: DatasetSpec string or partial DatasetSpec string :return str DatasetSpec
create_dataset
BQPipeline.create_dataset(self, dataset, exists_ok=False)
Creates a BigQuery Dataset from a full or partial dataset spec. :param dataset: DatasetSpec string or partial DatasetSpec string
create_job_config
BQPipeline.create_job_config(self, batch=False, dest=None, create=True, overwrite=True, append=False)
Creates a QueryJobConfig :param batch: use QueryPriority.BATCH if true :param dest: tablespec of destination table, or a GCS wildcard to write query results to. :param create: if False, destination table must already exist :param overwrite: if False, destination table must not exist :param append: if True, destination table will be appended to :return: bigquery.QueryJobConfig
run_query
BQPipeline.run_query(self, path, batch=False, wait=True, create=True, overwrite=True, timeout=None, gcs_export_format='CSV', **kwargs)
Executes a SQL query from a Jinja2 template file :param path: path to sql file or tuple of (path to sql file, destination tablespec) :param batch: run query with batch priority :param wait: wait for job to complete before returning :param create: if False, destination table must already exist :param overwrite: if False, destination table must not exist :param timeout: time in seconds to wait for job to complete :param gcs_export_format: CSV, AVRO, or JSON. :param kwargs: replacements for Jinja2 template :return: bigquery.job.QueryJob
run_queries
BQPipeline.run_queries(self, query_paths, batch=True, wait=True, create=True, overwrite=True, timeout=1200, **kwargs)
:param query_paths: List[Union[str,Tuple[str,str]]] path to sql file or tuple of (path, destination tablespec) :param batch: run query with batch priority :param wait: wait for job to complete before returning :param create: if False, destination table must already exist :param overwrite: if False, destination table must not exist :param timeout: time in seconds to wait for job to complete :param kwargs: replacements for Jinja2 template
copy_table
BQPipeline.copy_table(self, src, dest, wait=True, overwrite=True, timeout=None)
:param src: tablespec 'project.dataset.table' :param dest: tablespec 'project.dataset.table' :param wait: block until job completes :param overwrite: overwrite destination table :param timeout: time in seconds to wait for operation to complete :return: bigquery.job.CopyJob
delete_table
BQPipeline.delete_table(self, table)
Deletes a table
:param table: table spec project.dataset.table
delete_tables
BQPipeline.delete_tables(self, tables)
Deletes multiple tables
:param tables: List[str] of table spec project.dataset.table
export_csv_to_gcs
BQPipeline.export_csv_to_gcs(self, table, gcs_path, delimiter=',', header=True, wait=True, timeout=None)
Export a table to GCS as CSV.
:param table: str of table spec project.dataset.table
:param gcs_path: str of destination GCS path
:param delimiter: str field delimiter for output data.
:param header: boolean indicates the output CSV file print the header.
export_json_to_gcs
BQPipeline.export_json_to_gcs(self, table, gcs_path, wait=True, timeout=None)
Export a table to GCS as a Newline Delimited JSON file.
:param table: str of table spec project.dataset.table
:param gcs_path: str of destination GCS path
export_avro_to_gcs
BQPipeline.export_avro_to_gcs(self, table, gcs_path, compression='snappy', wait=True, timeout=None)
Export a table to GCS as a Newline Delimited JSON file.
:param table: str of table spec project.dataset.table
:param gcs_path: str of destination GCS path
main
main()
Handles CLI invocations of bqpipelines.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.