Skip to main content

A lightweight wrapper for GCS and BigQuery client library

Project description

Google Cloud Fluent Client

Unit Testing Deployment PyPI version

Version: 1.2.1

This is a lightweight wrapper on top of Google Cloud Platform Python SDK client libraries BigQuery, Storage and Spreadsheet. It is a great package for Data Engineers for craft data pipeline by using BigQuery and Storage as major services from Google Cloud Platform.

The purpose of this package are,

  • Having a consistent way of using the GCP client libraries
  • Manage the version in a single place if multiple teams are using the GCP client libraries
  • Make it easier to accomplish the typical Data Engineering tasks (copy data, load and export)
  • The code explains what it does

The current embedded client libraires versions are,

  • google-api-python-client==2.36.0
  • google-cloud-bigquery==2.32.0
  • google-cloud-storage==2.1.0

Build Data Pipeline on BigQuery

You (A Data Engineer) are asked to,

  • Upload multiple json files from your local drive to GCS
  • Import those files to a BigQuery staging table
  • Run a SQL query based on the staging table by joining existing tables, and store the result to a new table

To accomplish the task, here are the source code,

from gfluent import BQ, GCS

project_id = "here-is-you-project-id"
bucket_name = "my-bucket"
dataset = "sales"
table_name = "products"
prefix = "import"
local_path = "/user/tom/products/" # there are few *.json files in this directory

# uplaod files to GCS bucket
(
    GCS(project_id)
    .local(path=local_path, suffix=".json" )
    .bucket(bucket_name)
    .prefix(prefix)
    .upload()
)

# create the target dataset (in case not exists)
BQ(project_id).create_dataset(dataset, location="US")

# load json files to BigQuery table
uri = f"gs://{bucket_name}/{prefix}/*.json"
number_of_rows = (
    BQ(project_id)
    .table(f"{dataset}.{table_name}")
    .mode("WRITE_APPEND")               # don't have to, default mode
    .create_mode("CREATE_IF_NEEDED")    # don't have to, default mode
    .format("NEWLINE_DELIMITED_JSON")   # don't have to, default format
    .gcs(uri).load(location="US")
)

print(f"{number_of_rows} rows are loaded")


# run a SQL query and save to a final table
final_table = "sales_summary"
sql = """
    select t1.col1, t2.col2, t2.col3
    FROM
        sales.products t1
    JOIN
        other.category t2
    ON  t1.prod_id = t2.prod_id
"""

number_of_rows = (
    BQ(product_id)
    .table(f"{dataset}.{final_table}")
    .sql(sql)
    .create_mode("CREATE_NEVER")    # have to, don't want to create new table
    .query()
)

print(f"{number_of_rows} rows are loaded to {final_table}")


# now let's query the new table
rows = (
    BQ(product_id)
    .sql(f"select col1, col2 from {dataset}.{final_table} limit 10")
    .query()
)

for row in rows:
    print(row.col1, row.col2)

Loading data from Spreadsheet to BigQuery

Here is another example to use the Sheet class for loading data from Google Spreadsheet.

import os
from gfluent import Sheet, BQ

project_id = 'your-project-id'
sheet_id = 'the-google-spreadsheet-id'

# assume the data is on the sheet `data` and range is `A1:B4`
sheet = Sheet(
    os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
).sheet_id(sheet_id).worksheet("data!A1:B4")

bq = BQ(project=project_id).table("target_dataset.table")

sheet.bq(bq).load(location="EU")

Documents

Here is the document, and please refer to the test cases to see more real examples.

Installation

Install from PyPi,

pip install -U gfluent

Or build and install from source code,

git clone git@github.com:simple-dev-tools/gfluent.git
cd gfluent
make test-ut
python setup.py install

Contribution

Any kinds of contribution is welcome, including report bugs, add feature or enhance the document. Please be noted,

  • Unit Testing with mock is intensively used, because we don't want to connect to a real GCP project
  • Please install pre-commit by using pip install pre-commit then pre-commit install
  • bump2version is used for update the version tag in various files

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gfluent-1.2.1.tar.gz (13.4 kB view details)

Uploaded Source

Built Distribution

gfluent-1.2.1-py2.py3-none-any.whl (12.4 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file gfluent-1.2.1.tar.gz.

File metadata

  • Download URL: gfluent-1.2.1.tar.gz
  • Upload date:
  • Size: 13.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.8.12

File hashes

Hashes for gfluent-1.2.1.tar.gz
Algorithm Hash digest
SHA256 3528cb214caa7d61ee6290011746b21c9b98803527167ba93c7131a82eae15f9
MD5 2bad768e773f97dfcba329dfde78f8eb
BLAKE2b-256 b8023e3f00daed5964254128efa0a8736e46f38b5984e63f3482221f2bb8cee4

See more details on using hashes here.

File details

Details for the file gfluent-1.2.1-py2.py3-none-any.whl.

File metadata

  • Download URL: gfluent-1.2.1-py2.py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.10.1 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.8.12

File hashes

Hashes for gfluent-1.2.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 f22d52123fa57b62b32ac51ec96211d4c590ed0dc10b3de2d5fd93bfd4d368d9
MD5 5462e49ad498393739b36d3e213294a8
BLAKE2b-256 b2c83540e56e9300e24d38df2913433dde695e8251d843d75e6ce609691abc4e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page