A tool to fetch tables from BigQuery as pandas DataFrames, fast.
Project description
bqfetch
A lightweight tool to fetch tables from BigQuery as pandas DataFrames very fast using BigQuery Storage API combined with multiprocessing. This module also aims to fetch large tables that cannot fit into memory, by chunking the table in a smart and scalable way.
Installation
pip install bqfetch
pip install -r requirements.txt
Algorithm
- Fetch all distinct values from the given index
column
. - Divide these indices in
chunks
based on the available memory and the number of cores on the machine. If multiprocessing
:- Each chunk will be divided in multiple sub-chunks based on the parameter
nb_cores
and the available memory. - For each sub-chunk, create a temporary table containing all the matching rows in the whole table.
- Fetch these temporary tables using BigQuery Storage as dataframes.
- Merge the dataframes.
- Delete temporary tables.
- Each chunk will be divided in multiple sub-chunks based on the parameter
If !multiprocessing
:- Same process with only one temporary table and no parallel processes created.
Use case
Fetching a huge table of users using multiple cores
id | Name | Age |
---|---|---|
187 | Bartolomé | 30 |
188 | Tristan | 22 |
... | ... | ... |
>>> table = BigQueryTable("PROJECT", "DATASET", "TABLE")
>>> fetcher = BigQueryFetcher('/path/to/service_account.json', table)
>>> chunks = fetcher.chunks('id', by_chunk_size_in_GB=5)
>>> for chunk in chunks:
df = fetcher.fetch(chunk, nb_cores=-1, parallel_backend='billiard')
# ...
- First, we have to create a
BigQueryTable
object which contains the path to the BigQuery table stored in GCP. - A fetcher is created, given in parameter the absolute path to the service_account.json file, the file is mandatory in order to do operations in GCP.
- Chunks the whole table, given the
column
name and the chunk size. In this case, choosing the id column is perfect because this each value of this column appears the same number of times: 1 time. Concerning the chunks size, if by_chunk_size_in_GB=5, each chunk that will be fetched on the machine will be of size 5GB. Thus it has to fit into memory. You need to save 1/3 more memory because the size of a DataFrame object is larger than the raw fetched data. - For each chunk, fetch it.
nb_cores
=-1 will use the number of cores available on the machine.parallel_backend
='billiard' | 'joblib' | 'multiprocessing' specify the backend framework to use.
Fetch by number of chunks
It is also possible to use by_nb_chunks
instead of by_chunk_size_in_GB
. It will divided the table in N, so you cannot control more flexibly the size of each chunk.
>>> table = BigQueryTable("PROJECT", "DATASET", "TABLE")
>>> fetcher = BigQueryFetcher('/path/to/service_account.json', table)
>>> chunks = fetcher.chunks('id', by_nb_chunks=10)
>>> for chunk in chunks:
df = fetcher.fetch(chunk, nb_cores=-1, parallel_backend='billiard')
# ...
Verbose mode
>>> chunks = fetcher.chunks(column='id', by_nb_chunks=1, verbose=True)
# Available memory on device: 7.04GB
# Size of table: 2.19GB
# Prefered size of chunk: 3GB
# Size per chunk: 3GB
# Nb chunks: 1
# Nb values in "id": 96
# Chunk size: 3GB
# Nb chunks: 1
>>> for chunk in chunks:
>>> df = fetcher.fetch(chunk=chunk, nb_cores=1, parallel_backend='joblib', verbose=True)
# Use multiprocessing : False
# Nb cores: 1
# Parallel backend: joblib
# Time to fetch: 43.21s
# Nb lines in dataframe: 3375875
# Size of dataframe: 2.83GB
Warning
We recommend to use this tool only in the case where the table to fetch contains a column that can be easily chunked (divided in small parts). Thus the perfect column to achieve this is a column containing distinct values or values that appear ~ the same number of time. If some values appear thousands of times and some only fews, then the chunking will not be reliable because we need to make the assumption that each chunk will be approximatively the same size in order to estimate the needed memory necessary to fetch in an optimize way the table.
A good index colum:
This column contains distinct values so can be divided in chunks easily.
Card number |
---|
4390 3849 ... |
2903 1182 ... |
0562 7205 ... |
... |
A bad index colum:
This column can contains a lot of variance between values so the chunking will not be reliable.
Age |
---|
18 |
18 |
64 |
18 |
... |
More cores != faster
I remind you that adding more cores to the fetching process will not necessarily gain performance and most of the time it will even be slower. The reason is that the fetching is directly dependent on the Internet bandwidth available on your network, not the number of working cores or the computer power. However, you can do your own tests and in some cases the multiprocessing can gain time (ex: in the case where cloud machines allow only an amount of bandwidth by core, multiplying the number of cores will also multiplying the bandwidth, ex: GCP compute engines).
Contribution
bqfetch is open to new contributors, especially for bug fixing or implementation of new features. Do not hesitate to open an issue/pull request :)
License
Copyright (c) 2021-present, Tristan Bilot
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file bqfetch-1.1.0.tar.gz
.
File metadata
- Download URL: bqfetch-1.1.0.tar.gz
- Upload date:
- Size: 12.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | eca5408511f0dde73fc8776c5d870a5898cc93ecc5a55747842cd66551f6059a |
|
MD5 | 868991bf67888caf2fd3d84ac521e098 |
|
BLAKE2b-256 | 3f88021334ee02d20de390fa8eb706c4e042713c548d476f7fdd46c407ac324e |
File details
Details for the file bqfetch-1.1.0-py3-none-any.whl
.
File metadata
- Download URL: bqfetch-1.1.0-py3-none-any.whl
- Upload date:
- Size: 11.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1fb9f8f11f5f2cfaceef3855903f5775ec309ca17bf91f1c8a2700660ad8fb9c |
|
MD5 | 33a3ea137d66a8298a7aeee402bcaf67 |
|
BLAKE2b-256 | 88cc216d657913fadc30b3d29e30674b4d4961b303f7cb77fbd626307f486e12 |