Skip to main content

DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run.

Project description

Quickstart Guide: Data Load Tool (DLT)

TL;DR: This guide shows you how to load a JSON document into Google BigQuery using DLT.

Please open a pull request here if there is something you can improve about this quickstart.

1. Grab the demo

a. Clone the example repository:

git clone https://github.com/scale-vector/dlt-quickstart-example.git

b. Enter the directory:

cd dlt-quickstart-example

c. Open the files in your favorite IDE / text editor:

  • data.json (i.e. the JSON document you will load)
  • credentials.json (i.e. contains the credentials to our demo Google BigQuery warehouse)
  • quickstart.py (i.e. the script that uses DLT)

2. Set up a virtual environment

a. Ensure you are using either Python 3.8 or 3.9:

python3 --version

b. Create a new virtual environment:

python3 -m venv ./env

c. Activate the virtual environment:

source ./env/bin/activate

3. Install DLT and support for the target data warehouse

a. Install DLT using pip:

pip3 install python-dlt

b. Install support for Google BigQuery:

pip3 install python-dlt[gcp]

4. Configure DLT

a. Import necessary libaries

import base64
import json
from dlt.common.utils import uniq_id
from dlt.pipeline import Pipeline, GCPPipelineCredentials

b. Create a unique prefix for your demo Google BigQuery table

schema_prefix = 'demo_' + uniq_id()[:4]

c. Name your schema

schema_name = 'example'

d. Name your table

parent_table = 'json_doc'

e. Specify your schema file location

schema_file_path = 'schema.yml'

f. Load credentials

with open('credentials.json', 'r') as f:
    gcp_credentials_json = json.load(f)

# Private key needs to be decoded (because we don't want to store it as plain text)
gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8")
credentials = GCPPipelineCredentials.from_services_dict(gcp_credentials_json, schema_prefix)

5. Create a DLT pipeline

a. Instantiate a pipeline

pipeline = Pipeline(schema_name)

b. Create the pipeline with your credentials

pipeline.create_pipeline(credentials)

6. Load the data from the JSON document

a. Load JSON document into a dictionary

with open('data.json', 'r') as f:
    data = json.load(f)

7. Pass the data to the DLT pipeline

a. Extract the dictionary into a table

pipeline.extract(iter(data), table_name=parent_table)

b. Unpack the pipeline into a relational structure

pipeline.unpack()

c. Save schema to schema.yml file

schema = pipeline.get_default_schema()
schema_yaml = schema.as_yaml(remove_default=True)
with open(schema_file_path, 'w') as f:
    f.write(schema_yaml)

8. Use DLT to load the data

a. Load

pipeline.load()

b. Make sure there are no errors

completed_loads = pipeline.list_completed_loads()
# print(completed_loads)
# now enumerate all complete loads if we have any failed packages
# complete but failed job will not raise any exceptions
for load_id in completed_loads:
    print(f"Checking failed jobs in {load_id}")
    for job, failed_message in pipeline.list_failed_jobs(load_id):
        print(f"JOB: {job}\nMSG: {failed_message}")

c. Run the script:

python3 quickstart.py

d. Inspect schema.yml that has been generated:

vim schema.yml

9. Query the Google BigQuery table

a. Run SQL queries

def run_query(query):
    df = c._execute_sql(query)
    print(query)
    print(list(df))
    print()

with pipeline.sql_client() as c:

    # Query table for parents
    query = f"SELECT * FROM `{schema_prefix}_example.json_doc`"
    run_query(query)

    # Query table for children
    query = f"SELECT * FROM `{schema_prefix}_example.json_doc__children` LIMIT 1000"
    run_query(query)

    # Join previous two queries via auto generated keys
    query = f"""
        select p.name, p.age, p.id as parent_id,
            c.name as child_name, c.id as child_id, c._dlt_list_idx as child_order_in_list
        from `{schema_prefix}_example.json_doc` as p
        left join `{schema_prefix}_example.json_doc__children`  as c
            on p._dlt_id = c._dlt_parent_id
    """
    run_query(query)

b. See results like the following

table: json_doc

{  "name": "Ana",  "age": "30",  "id": "456",  "_dlt_load_id": "1654787700.406905",  "_dlt_id": "5b018c1ba3364279a0ca1a231fbd8d90"}
{  "name": "Bob",  "age": "30",  "id": "455",  "_dlt_load_id": "1654787700.406905",  "_dlt_id": "afc8506472a14a529bf3e6ebba3e0a9e"}

table: json_doc__children

    # {"name": "Bill", "id": "625", "_dlt_parent_id": "5b018c1ba3364279a0ca1a231fbd8d90", "_dlt_list_idx": "0", "_dlt_root_id": "5b018c1ba3364279a0ca1a231fbd8d90",
    #   "_dlt_id": "7993452627a98814cc7091f2c51faf5c"}
    # {"name": "Bill", "id": "625", "_dlt_parent_id": "afc8506472a14a529bf3e6ebba3e0a9e", "_dlt_list_idx": "0", "_dlt_root_id": "afc8506472a14a529bf3e6ebba3e0a9e",
    #   "_dlt_id": "9a2fd144227e70e3aa09467e2358f934"}
    # {"name": "Dave", "id": "621", "_dlt_parent_id": "afc8506472a14a529bf3e6ebba3e0a9e", "_dlt_list_idx": "1", "_dlt_root_id": "afc8506472a14a529bf3e6ebba3e0a9e",
    #   "_dlt_id": "28002ed6792470ea8caf2d6b6393b4f9"}
    # {"name": "Elli", "id": "591", "_dlt_parent_id": "5b018c1ba3364279a0ca1a231fbd8d90", "_dlt_list_idx": "1", "_dlt_root_id": "5b018c1ba3364279a0ca1a231fbd8d90",
    #   "_dlt_id": "d18172353fba1a492c739a7789a786cf"}

SQL result:

    # {  "name": "Ana",  "age": "30",  "parent_id": "456",  "child_name": "Bill",  "child_id": "625",  "child_order_in_list": "0"}
    # {  "name": "Ana",  "age": "30",  "parent_id": "456",  "child_name": "Elli",  "child_id": "591",  "child_order_in_list": "1"}
    # {  "name": "Bob",  "age": "30",  "parent_id": "455",  "child_name": "Bill",  "child_id": "625",  "child_order_in_list": "0"}
    # {  "name": "Bob",  "age": "30",  "parent_id": "455",  "child_name": "Dave",  "child_id": "621",  "child_order_in_list": "1"}

10. Next steps

a. Replace data.json with data you want to explore

b. Check that the inferred types are correct in schema.yml

c. Set up your own Google BigQuery warehouse (and replace the credentials)

d. Use this new clean staging layer as the starting point for a semantic layer / analytical model (e.g. using dbt)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python-dlt-0.1.0rc4.tar.gz (400.3 kB view details)

Uploaded Source

Built Distribution

python_dlt-0.1.0rc4-py3-none-any.whl (458.7 kB view details)

Uploaded Python 3

File details

Details for the file python-dlt-0.1.0rc4.tar.gz.

File metadata

  • Download URL: python-dlt-0.1.0rc4.tar.gz
  • Upload date:
  • Size: 400.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.11 Linux/4.19.128-microsoft-standard

File hashes

Hashes for python-dlt-0.1.0rc4.tar.gz
Algorithm Hash digest
SHA256 4e6a02b3ddb9dcb5fdaa9fcc0b20110da870a675b6359c150ab8359bdeba1b02
MD5 c69d2308a5681065b6a86bb4348ebaec
BLAKE2b-256 8501824d521150fec069fb0c8fec1216376133437978c900d2eb805ba7779951

See more details on using hashes here.

Provenance

File details

Details for the file python_dlt-0.1.0rc4-py3-none-any.whl.

File metadata

  • Download URL: python_dlt-0.1.0rc4-py3-none-any.whl
  • Upload date:
  • Size: 458.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.12 CPython/3.8.11 Linux/4.19.128-microsoft-standard

File hashes

Hashes for python_dlt-0.1.0rc4-py3-none-any.whl
Algorithm Hash digest
SHA256 59f20989ccf12b60351003e47b13579ce21a2b6317d0e734e5202dd920df4add
MD5 97e963b66736dabcdf0b02ed6feb402b
BLAKE2b-256 0495e68ad30738501aaa8c8ceb98df3b752061baa1dc8546ecac75c21fba0896

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page