Skip to main content

DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run.

Project description

Quickstart Guide: Data Load Tool (DLT)

TL;DR: This guide shows you how to load a JSON document into Google BigQuery using DLT.

Please open a pull request here if there is something you can improve about this quickstart.

1. Grab the demo

a. Clone the example repository:

git clone https://github.com/scale-vector/dlt-quickstart-example.git

b. Enter the directory:

cd dlt-quickstart-example

c. Open the files in your favorite IDE / text editor:

  • data.json (i.e. the JSON document you will load)
  • credentials.json (i.e. contains the credentials to our demo Google BigQuery warehouse)
  • quickstart.py (i.e. the script that uses DLT)

2. Set up a virtual environment

a. Ensure you are using either Python 3.8 or 3.9:

python3 --version

b. Create a new virtual environment:

python3 -m venv ./env

c. Activate the virtual environment:

source ./env/bin/activate

3. Install DLT and support for the target data warehouse

a. Install DLT using pip:

pip3 install python-dlt

b. Install support for Google BigQuery:

pip3 install python-dlt[gcp]

4. Configure DLT

a. Import necessary libaries

import base64
import json
from dlt.common.utils import uniq_id
from dlt.pipeline import Pipeline, GCPPipelineCredentials

b. Create a unique prefix for your demo Google BigQuery table

schema_prefix = 'demo_' + uniq_id()[:4]

c. Name your schema

schema_name = 'example'

d. Name your table

parent_table = 'json_doc'

e. Specify your schema file location

schema_file_path = 'schema.yml'

f. Load credentials

with open('credentials.json', 'r', encoding="utf-8") as f:
    gcp_credentials_json = json.load(f)

# Private key needs to be decoded (because we don't want to store it as plain text)
gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8")
credentials = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)

5. Create a DLT pipeline

a. Instantiate a pipeline

pipeline = Pipeline(schema_name)

b. Create the pipeline with your credentials

pipeline.create_pipeline(credentials)

6. Load the data from the JSON document

a. Load JSON document into a dictionary

with open('data.json', 'r', encoding="utf-8") as f:
    data = json.load(f)

7. Pass the data to the DLT pipeline

a. Extract the dictionary into a table

pipeline.extract(iter(data), table_name=parent_table)

b. Unpack the pipeline into a relational structure

pipeline.unpack()

c. Save schema to schema.yml file

schema = pipeline.get_default_schema()
schema_yaml = schema.as_yaml(remove_default=True)
with open(schema_file_path, 'w', encoding="utf-8") as f:
    f.write(schema_yaml)

8. Use DLT to load the data

a. Load

pipeline.load()

b. Make sure there are no errors

completed_loads = pipeline.list_completed_loads()
# print(completed_loads)
# now enumerate all complete loads if we have any failed packages
# complete but failed job will not raise any exceptions
for load_id in completed_loads:
    print(f"Checking failed jobs in {load_id}")
    for job, failed_message in pipeline.list_failed_jobs(load_id):
        print(f"JOB: {job}\nMSG: {failed_message}")

c. Run the script:

python3 quickstart.py

d. Inspect schema.yml that has been generated:

vim schema.yml

9. Query the Google BigQuery table

a. Run SQL queries

def run_query(query):
    df = c._execute_sql(query)
    print(query)
    print(list(df))
    print()

with pipeline.sql_client() as c:

    # Query table for parents
    query = f"SELECT * FROM `{schema_prefix}_example.json_doc`"
    run_query(query)

    # Query table for children
    query = f"SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000"
    run_query(query)

    # Join previous two queries via auto generated keys
    query = f"""
        select p.name, p.age, p.id as parent_id,
            c.name as child_name, c.id as child_id, c._dlt_list_idx as child_order_in_list
        from `{schema_prefix}_example.json_doc` as p
        left join `{schema_prefix}_example.json_doc__children`  as c
            on p._dlt_id = c._dlt_parent_id
    """
    run_query(query)

b. See results like the following

table: json_doc

{  "name": "Ana",  "age": "30",  "id": "456",  "_dlt_load_id": "1654787700.406905",  "_dlt_id": "5b018c1ba3364279a0ca1a231fbd8d90"}
{  "name": "Bob",  "age": "30",  "id": "455",  "_dlt_load_id": "1654787700.406905",  "_dlt_id": "afc8506472a14a529bf3e6ebba3e0a9e"}

table: json_doc__children

    # {"name": "Bill", "id": "625", "_dlt_parent_id": "5b018c1ba3364279a0ca1a231fbd8d90", "_dlt_list_idx": "0", "_dlt_root_id": "5b018c1ba3364279a0ca1a231fbd8d90",
    #   "_dlt_id": "7993452627a98814cc7091f2c51faf5c"}
    # {"name": "Bill", "id": "625", "_dlt_parent_id": "afc8506472a14a529bf3e6ebba3e0a9e", "_dlt_list_idx": "0", "_dlt_root_id": "afc8506472a14a529bf3e6ebba3e0a9e",
    #   "_dlt_id": "9a2fd144227e70e3aa09467e2358f934"}
    # {"name": "Dave", "id": "621", "_dlt_parent_id": "afc8506472a14a529bf3e6ebba3e0a9e", "_dlt_list_idx": "1", "_dlt_root_id": "afc8506472a14a529bf3e6ebba3e0a9e",
    #   "_dlt_id": "28002ed6792470ea8caf2d6b6393b4f9"}
    # {"name": "Elli", "id": "591", "_dlt_parent_id": "5b018c1ba3364279a0ca1a231fbd8d90", "_dlt_list_idx": "1", "_dlt_root_id": "5b018c1ba3364279a0ca1a231fbd8d90",
    #   "_dlt_id": "d18172353fba1a492c739a7789a786cf"}

SQL result:

    # {  "name": "Ana",  "age": "30",  "parent_id": "456",  "child_name": "Bill",  "child_id": "625",  "child_order_in_list": "0"}
    # {  "name": "Ana",  "age": "30",  "parent_id": "456",  "child_name": "Elli",  "child_id": "591",  "child_order_in_list": "1"}
    # {  "name": "Bob",  "age": "30",  "parent_id": "455",  "child_name": "Bill",  "child_id": "625",  "child_order_in_list": "0"}
    # {  "name": "Bob",  "age": "30",  "parent_id": "455",  "child_name": "Dave",  "child_id": "621",  "child_order_in_list": "1"}

10. Next steps

a. Replace data.json with data you want to explore

b. Check that the inferred types are correct in schema.yml

c. Set up your own Google BigQuery warehouse (and replace the credentials)

d. Use this new clean staging layer as the starting point for a semantic layer / analytical model (e.g. using dbt)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python-dlt-0.1.0rc5.tar.gz (400.3 kB view details)

Uploaded Source

Built Distribution

python_dlt-0.1.0rc5-py3-none-any.whl (459.1 kB view details)

Uploaded Python 3

File details

Details for the file python-dlt-0.1.0rc5.tar.gz.

File metadata

  • Download URL: python-dlt-0.1.0rc5.tar.gz
  • Upload date:
  • Size: 400.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.11 Linux/4.19.128-microsoft-standard

File hashes

Hashes for python-dlt-0.1.0rc5.tar.gz
Algorithm Hash digest
SHA256 b26b69a5871aaeeaa07f7aefd9a57c6dc6f8f7a2266928a3f34079a0e4d08065
MD5 7ca195d09cf2468f9ad94b7b3166bc1c
BLAKE2b-256 9baf31cf0416dd0ea5f22386b195ed721b1ca552b55985c2ee48686b30cd8df5

See more details on using hashes here.

File details

Details for the file python_dlt-0.1.0rc5-py3-none-any.whl.

File metadata

  • Download URL: python_dlt-0.1.0rc5-py3-none-any.whl
  • Upload date:
  • Size: 459.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.11 Linux/4.19.128-microsoft-standard

File hashes

Hashes for python_dlt-0.1.0rc5-py3-none-any.whl
Algorithm Hash digest
SHA256 c373ecf6e4fe42331a7cabd64ef16e389ecd9d59a1d98198823cbb5d5649818c
MD5 b2ccaee5a51d976dbf71c66e75ed49e9
BLAKE2b-256 686b153dff6d34742eca6d5a07add10f117967713cc43796585f20fea107a6bb

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page