A lightweight wrapper for GCS and BigQuery client library
Project description
Google Cloud Fluent Client
Version: 1.2.1
This is a lightweight wrapper on top of Google Cloud Platform Python SDK client libraries BigQuery
,
Storage
and Spreadsheet
. It is a great package for Data Engineers for craft data pipeline by using
BigQuery
and Storage
as major services from Google Cloud Platform.
The purpose of this package are,
- Having a consistent way of using the GCP client libraries
- Manage the version in a single place if multiple teams are using the GCP client libraries
- Make it easier to accomplish the typical Data Engineering tasks (copy data, load and export)
- The code explains what it does
The current embedded client libraires versions are,
- google-api-python-client==2.36.0
- google-cloud-bigquery==2.32.0
- google-cloud-storage==2.1.0
Build Data Pipeline on BigQuery
You (A Data Engineer) are asked to,
- Upload multiple
json
files from your local drive to GCS - Import those files to a BigQuery staging table
- Run a SQL query based on the staging table by joining existing tables, and store the result to a new table
To accomplish the task, here are the source code,
from gfluent import BQ, GCS
project_id = "here-is-you-project-id"
bucket_name = "my-bucket"
dataset = "sales"
table_name = "products"
prefix = "import"
local_path = "/user/tom/products/" # there are few *.json files in this directory
# uplaod files to GCS bucket
(
GCS(project_id)
.local(path=local_path, suffix=".json" )
.bucket(bucket_name)
.prefix(prefix)
.upload()
)
# create the target dataset (in case not exists)
BQ(project_id).create_dataset(dataset, location="US")
# load json files to BigQuery table
uri = f"gs://{bucket_name}/{prefix}/*.json"
number_of_rows = (
BQ(project_id)
.table(f"{dataset}.{table_name}")
.mode("WRITE_APPEND") # don't have to, default mode
.create_mode("CREATE_IF_NEEDED") # don't have to, default mode
.format("NEWLINE_DELIMITED_JSON") # don't have to, default format
.gcs(uri).load(location="US")
)
print(f"{number_of_rows} rows are loaded")
# run a SQL query and save to a final table
final_table = "sales_summary"
sql = """
select t1.col1, t2.col2, t2.col3
FROM
sales.products t1
JOIN
other.category t2
ON t1.prod_id = t2.prod_id
"""
number_of_rows = (
BQ(product_id)
.table(f"{dataset}.{final_table}")
.sql(sql)
.create_mode("CREATE_NEVER") # have to, don't want to create new table
.query()
)
print(f"{number_of_rows} rows are loaded to {final_table}")
# now let's query the new table
rows = (
BQ(product_id)
.sql(f"select col1, col2 from {dataset}.{final_table} limit 10")
.query()
)
for row in rows:
print(row.col1, row.col2)
Loading data from Spreadsheet to BigQuery
Here is another example to use the Sheet
class for loading data from Google Spreadsheet.
import os
from gfluent import Sheet, BQ
project_id = 'your-project-id'
sheet_id = 'the-google-spreadsheet-id'
# assume the data is on the sheet `data` and range is `A1:B4`
sheet = Sheet(
os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
).sheet_id(sheet_id).worksheet("data!A1:B4")
bq = BQ(project=project_id).table("target_dataset.table")
sheet.bq(bq).load(location="EU")
Documents
Here is the document, and please refer to the test cases to see more real examples.
Installation
Install from PyPi,
pip install -U gfluent
Or build and install from source code,
git clone git@github.com:simple-dev-tools/gfluent.git
cd gfluent
make test-ut
python setup.py install
Contribution
Any kinds of contribution is welcome, including report bugs, add feature or enhance the document. Please be noted,
- Unit Testing with mock is intensively used, because we don't want to connect to a real GCP project
- Please install
pre-commit
by usingpip install pre-commit
thenpre-commit install
bump2version
is used for update the version tag in various files
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for gfluent-1.2.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | f22d52123fa57b62b32ac51ec96211d4c590ed0dc10b3de2d5fd93bfd4d368d9 |
|
MD5 | 5462e49ad498393739b36d3e213294a8 |
|
BLAKE2b-256 | b2c83540e56e9300e24d38df2913433dde695e8251d843d75e6ce609691abc4e |