DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run.
Project description
Quickstart Guide: Data Load Tool (DLT)
TL;DR: This guide shows you how to load a JSON document into Google BigQuery using DLT.
Please open a pull request here if there is something you can improve about this quickstart.
1. Grab the demo
a. Clone the example repository:
git clone https://github.com/scale-vector/dlt-quickstart-example.git
b. Enter the directory:
cd dlt-quickstart-example
c. Open the files in your favorite IDE / text editor:
data.json
(i.e. the JSON document you will load)credentials.json
(i.e. contains the credentials to our demo Google BigQuery warehouse)quickstart.py
(i.e. the script that uses DLT)
2. Set up a virtual environment
a. Ensure you are using either Python 3.8 or 3.9:
python3 --version
b. Create a new virtual environment:
python3 -m venv ./env
c. Activate the virtual environment:
source ./env/bin/activate
3. Install DLT and support for the target data warehouse
a. Install DLT using pip:
pip3 install python-dlt
b. Install support for Google BigQuery:
pip3 install python-dlt[gcp]
4. Configure DLT
a. Import necessary libaries
import base64
import json
from dlt.common.utils import uniq_id
from dlt.pipeline import Pipeline, GCPPipelineCredentials
b. Create a unique prefix for your demo Google BigQuery table
schema_prefix = 'demo_' + uniq_id()[:4]
c. Name your schema
schema_name = 'example'
d. Name your table
parent_table = 'json_doc'
e. Specify your schema file location
schema_file_path = 'schema.yml'
f. Load credentials
with open('credentials.json', 'r', encoding="utf-8") as f:
gcp_credentials_json = json.load(f)
# Private key needs to be decoded (because we don't want to store it as plain text)
gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8")
credentials = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)
5. Create a DLT pipeline
a. Instantiate a pipeline
pipeline = Pipeline(schema_name)
b. Create the pipeline with your credentials
pipeline.create_pipeline(credentials)
6. Load the data from the JSON document
a. Load JSON document into a dictionary
with open('data.json', 'r', encoding="utf-8") as f:
data = json.load(f)
7. Pass the data to the DLT pipeline
a. Extract the dictionary into a table
pipeline.extract(iter(data), table_name=parent_table)
b. Unpack the pipeline into a relational structure
pipeline.unpack()
c. Save schema to schema.yml
file
schema = pipeline.get_default_schema()
schema_yaml = schema.as_yaml(remove_default=True)
with open(schema_file_path, 'w', encoding="utf-8") as f:
f.write(schema_yaml)
8. Use DLT to load the data
a. Load
pipeline.load()
b. Make sure there are no errors
completed_loads = pipeline.list_completed_loads()
# print(completed_loads)
# now enumerate all complete loads if we have any failed packages
# complete but failed job will not raise any exceptions
for load_id in completed_loads:
print(f"Checking failed jobs in {load_id}")
for job, failed_message in pipeline.list_failed_jobs(load_id):
print(f"JOB: {job}\nMSG: {failed_message}")
c. Run the script:
python3 quickstart.py
d. Inspect schema.yml
that has been generated:
vim schema.yml
9. Query the Google BigQuery table
a. Run SQL queries
def run_query(query):
df = c._execute_sql(query)
print(query)
print(list(df))
print()
with pipeline.sql_client() as c:
# Query table for parents
query = f"SELECT * FROM `{schema_prefix}_example.json_doc`"
run_query(query)
# Query table for children
query = f"SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000"
run_query(query)
# Join previous two queries via auto generated keys
query = f"""
select p.name, p.age, p.id as parent_id,
c.name as child_name, c.id as child_id, c._dlt_list_idx as child_order_in_list
from `{schema_prefix}_example.json_doc` as p
left join `{schema_prefix}_example.json_doc__children` as c
on p._dlt_id = c._dlt_parent_id
"""
run_query(query)
b. See results like the following
table: json_doc
{ "name": "Ana", "age": "30", "id": "456", "_dlt_load_id": "1654787700.406905", "_dlt_id": "5b018c1ba3364279a0ca1a231fbd8d90"}
{ "name": "Bob", "age": "30", "id": "455", "_dlt_load_id": "1654787700.406905", "_dlt_id": "afc8506472a14a529bf3e6ebba3e0a9e"}
table: json_doc__children
# {"name": "Bill", "id": "625", "_dlt_parent_id": "5b018c1ba3364279a0ca1a231fbd8d90", "_dlt_list_idx": "0", "_dlt_root_id": "5b018c1ba3364279a0ca1a231fbd8d90",
# "_dlt_id": "7993452627a98814cc7091f2c51faf5c"}
# {"name": "Bill", "id": "625", "_dlt_parent_id": "afc8506472a14a529bf3e6ebba3e0a9e", "_dlt_list_idx": "0", "_dlt_root_id": "afc8506472a14a529bf3e6ebba3e0a9e",
# "_dlt_id": "9a2fd144227e70e3aa09467e2358f934"}
# {"name": "Dave", "id": "621", "_dlt_parent_id": "afc8506472a14a529bf3e6ebba3e0a9e", "_dlt_list_idx": "1", "_dlt_root_id": "afc8506472a14a529bf3e6ebba3e0a9e",
# "_dlt_id": "28002ed6792470ea8caf2d6b6393b4f9"}
# {"name": "Elli", "id": "591", "_dlt_parent_id": "5b018c1ba3364279a0ca1a231fbd8d90", "_dlt_list_idx": "1", "_dlt_root_id": "5b018c1ba3364279a0ca1a231fbd8d90",
# "_dlt_id": "d18172353fba1a492c739a7789a786cf"}
SQL result:
# { "name": "Ana", "age": "30", "parent_id": "456", "child_name": "Bill", "child_id": "625", "child_order_in_list": "0"}
# { "name": "Ana", "age": "30", "parent_id": "456", "child_name": "Elli", "child_id": "591", "child_order_in_list": "1"}
# { "name": "Bob", "age": "30", "parent_id": "455", "child_name": "Bill", "child_id": "625", "child_order_in_list": "0"}
# { "name": "Bob", "age": "30", "parent_id": "455", "child_name": "Dave", "child_id": "621", "child_order_in_list": "1"}
10. Next steps
a. Replace data.json
with data you want to explore
b. Check that the inferred types are correct in schema.yml
c. Set up your own Google BigQuery warehouse (and replace the credentials)
d. Use this new clean staging layer as the starting point for a semantic layer / analytical model (e.g. using dbt)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file python-dlt-0.1.0rc5.tar.gz
.
File metadata
- Download URL: python-dlt-0.1.0rc5.tar.gz
- Upload date:
- Size: 400.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.12 CPython/3.8.11 Linux/4.19.128-microsoft-standard
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b26b69a5871aaeeaa07f7aefd9a57c6dc6f8f7a2266928a3f34079a0e4d08065 |
|
MD5 | 7ca195d09cf2468f9ad94b7b3166bc1c |
|
BLAKE2b-256 | 9baf31cf0416dd0ea5f22386b195ed721b1ca552b55985c2ee48686b30cd8df5 |
File details
Details for the file python_dlt-0.1.0rc5-py3-none-any.whl
.
File metadata
- Download URL: python_dlt-0.1.0rc5-py3-none-any.whl
- Upload date:
- Size: 459.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.12 CPython/3.8.11 Linux/4.19.128-microsoft-standard
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c373ecf6e4fe42331a7cabd64ef16e389ecd9d59a1d98198823cbb5d5649818c |
|
MD5 | b2ccaee5a51d976dbf71c66e75ed49e9 |
|
BLAKE2b-256 | 686b153dff6d34742eca6d5a07add10f117967713cc43796585f20fea107a6bb |