Skip to main content

A lightweight wrapper for GCS and BigQuery client library

Project description

Google Cloud Fluent Client

Unit Testing Deployment PyPI version

Version: 1.2.1

This is a lightweight wrapper on top of Google Cloud Platform Python SDK client libraries BigQuery, Storage and Spreadsheet. It is a great package for Data Engineers for craft data pipeline by using BigQuery and Storage as major services from Google Cloud Platform.

The purpose of this package are,

  • Having a consistent way of using the GCP client libraries
  • Manage the version in a single place if multiple teams are using the GCP client libraries
  • Make it easier to accomplish the typical Data Engineering tasks (copy data, load and export)
  • The code explains what it does

The current embedded client libraires versions are,

  • google-api-python-client==2.36.0
  • google-cloud-bigquery==2.32.0
  • google-cloud-storage==2.1.0

Build Data Pipeline on BigQuery

You (A Data Engineer) are asked to,

  • Upload multiple json files from your local drive to GCS
  • Import those files to a BigQuery staging table
  • Run a SQL query based on the staging table by joining existing tables, and store the result to a new table

To accomplish the task, here are the source code,

from gfluent import BQ, GCS

project_id = "here-is-you-project-id"
bucket_name = "my-bucket"
dataset = "sales"
table_name = "products"
prefix = "import"
local_path = "/user/tom/products/" # there are few *.json files in this directory

# uplaod files to GCS bucket
(
    GCS(project_id)
    .local(path=local_path, suffix=".json" )
    .bucket(bucket_name)
    .prefix(prefix)
    .upload()
)

# create the target dataset (in case not exists)
BQ(project_id).create_dataset(dataset, location="US")

# load json files to BigQuery table
uri = f"gs://{bucket_name}/{prefix}/*.json"
number_of_rows = (
    BQ(project_id)
    .table(f"{dataset}.{table_name}")
    .mode("WRITE_APPEND")               # don't have to, default mode
    .create_mode("CREATE_IF_NEEDED")    # don't have to, default mode
    .format("NEWLINE_DELIMITED_JSON")   # don't have to, default format
    .gcs(uri).load(location="US")
)

print(f"{number_of_rows} rows are loaded")


# run a SQL query and save to a final table
final_table = "sales_summary"
sql = """
    select t1.col1, t2.col2, t2.col3
    FROM
        sales.products t1
    JOIN
        other.category t2
    ON  t1.prod_id = t2.prod_id
"""

number_of_rows = (
    BQ(product_id)
    .table(f"{dataset}.{final_table}")
    .sql(sql)
    .create_mode("CREATE_NEVER")    # have to, don't want to create new table
    .query()
)

print(f"{number_of_rows} rows are loaded to {final_table}")


# now let's query the new table
rows = (
    BQ(product_id)
    .sql(f"select col1, col2 from {dataset}.{final_table} limit 10")
    .query()
)

for row in rows:
    print(row.col1, row.col2)

Loading data from Spreadsheet to BigQuery

Here is another example to use the Sheet class for loading data from Google Spreadsheet.

import os
from gfluent import Sheet, BQ

project_id = 'your-project-id'
sheet_id = 'the-google-spreadsheet-id'

# assume the data is on the sheet `data` and range is `A1:B4`
sheet = Sheet(
    os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
).sheet_id(sheet_id).worksheet("data!A1:B4")

bq = BQ(project=project_id).table("target_dataset.table")

sheet.bq(bq).load(location="EU")

Documents

Here is the document, and please refer to the test cases to see more real examples.

Installation

Install from PyPi,

pip install -U gfluent

Or build and install from source code,

git clone git@github.com:simple-dev-tools/gfluent.git
cd gfluent
make test-ut
python setup.py install

Contribution

Any kinds of contribution is welcome, including report bugs, add feature or enhance the document. Please be noted,

  • Unit Testing with mock is intensively used, because we don't want to connect to a real GCP project
  • Please install pre-commit by using pip install pre-commit then pre-commit install
  • bump2version is used for update the version tag in various files

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gfluent-1.2.1.tar.gz (13.4 kB view hashes)

Uploaded source

Built Distribution

gfluent-1.2.1-py2.py3-none-any.whl (12.4 kB view hashes)

Uploaded py2 py3

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page