Skip to main content

Data Access Platform client library

Reason this release was yanked:

Version is deprecated.

Project description

Data Access Platform Client Library

Data Access Platform (DAP) acts as a single source of data for analytics at Instructure. It provides efficient access to data collected across various educational products in bulk with high fidelity and low latency, adhering to a canonical data model.

The outgoing interface for DAP is the Query API, which is an HTTP REST service. Users initiate asynchronous queries to retrieve data associated with their account. The client library is a Python wrapper around the DAP API, allowing users to fetch an initial snapshot and incremental changes, or initialize a database and keep it synchronized with data in DAP.

Each DAP user acts as a data administrator for the organization they represent. They have full read access to the top-level account and all descendant sub-accounts. For example, in Canvas, the top of the organization hierarchy is uniquely identified by a root account ID, and each data record is associated with a root account ID. A DAP user with Canvas access can query data that are assigned the user's root account ID.

DAP API requires authentication. The client library takes care of authentication behind the scenes provided you have the appropriate API key, and passes the token to each API operation it invokes. Refer to the documentation of Instructure API Gateway Service to learn more about the authentication process.

Under the hood, API users must first acquire a JSON Web Token (JWT) obtained from the authentication endpoint of Instructure API Gateway Service in order to invoke DAP API endpoints, and pass the JWT to all subsequent calls to DAP API.

Major features

  • List the name of tables available for querying
  • Download the JSON schema of a selected table
  • Fetch a full table snapshot
  • Fetch incremental updates since a specific point in time
  • Save data in several output formats: CSV, TSV, JSON, Parquet
  • Download output to a local directory
  • Load a table snapshot into a database or data warehouse
  • Synchronize data with a table in a database or data warehouse

For synchronizing with a database or data warehouse, the library currently only supports PostgreSQL.

Getting started

Accessing DAP API requires a URL to an endpoint, and an API key. Once obtained, they can be set as environment variables (recommended), or passed as command-line arguments:

Use environment variables for authentication

First, configure the environment with what you have in your setup instructions:

export DAP_API_URL=https://api-gateway.instructure.com
export DAP_CLIENT_ID=us-east-1#0c59cade-...-2ac120002
export DAP_CLIENT_SECRET=xdEC0lI...4X4QBOhM

With environment variables set, you can issue dap commands directly:

dap incremental --namespace canvas --table accounts --since 2022-07-13T09:30:00+02:00

Use command-line for authentication

Unless you set environment variables, you need to pass endpoint URL and API key to the dap command explicitly:

dap --base-url https://api-gateway.instructure.com --client-id=us-east-1#0c59cade-...-2ac120002 --client-secret=xdEC0lI...4X4QBOhM incremental --namespace canvas --table accounts --since 2022-07-13T09:30:00+02:00

Command-line usage

Invoking the command-line utility with --help shows usage, required and optional arguments:

dap --help
dap incremental --help
dap snapshot --help
dap list --help
dap schema --help
dap initdb --help
dap syncdb --help
dap dropdb --help

Common use cases

Obtain a full snapshot of a table in a database or data warehouse

The command-line utility is capable of automatically copying a full table snapshot to a (local) database with the initdb command. Along with the parameters --table and --namespace, you have to specify your target database with a connection string using the --connection-string parameter.

Typically, the connection string looks as follows:

dialect://username:password@host:port/database

Dialect is the database protocol dialect such as postgresql. The parameter port is optional, if omitted, the default port for the given dialect is assumed (e.g. 5432 in the case of postgresql).

Currently, the only dialect the library supports is postgresql.

Examples:

postgresql://scott:password@localhost:5432/testdb
postgresql://scott:password@localhost/testdb

Example for a complete initdb command:

dap initdb --connection-string postgresql://scott:password@localhost/testdb --namespace canvas --table accounts

The tool automatically fetches the schema and the data from the DAP API, connects to your database, creates the necessary table based on the published schema and inserts the downloaded data into the created table.

If the target database already has a table with same name as the table whose snapshot is about to be obtained, an error is triggered.

Synchronize data with a table in a database or data warehouse

After obtaining a full table snapshot with initdb, you can keep it up to date using the syncdb command. This replicates the inserts, updates and deletes that took place in the data source. The syncdb command has the same parameters as initdb. The tool automatically gets an incremental update from the DAP API, connects to your database and applies the incremental update to the target table. All this happens in one atomic transaction so in case of an error you retain consistent data in your database. Only database tables previously created by initdb can be synchronized with syncdb.

Example:

dap syncdb --connection-string postgresql://scott:password@localhost/testdb --namespace canvas --table accounts

The timestamp used for performing the incremental query on the DAP API is maintained in the dap_meta meta-information table together with other data about the synchronized tables. This dap_meta table is owned by the DAP client library and should not be dropped or changed.

Drop a synchronized table in a database or data warehouse

With the dropdb command, you can completely drop a table from your database that was previously created with initdb. An error is triggered if the given table does not exist in the target database.

Example:

dap dropdb --connection-string postgresql://scott:password@localhost/testdb --namespace canvas --table accounts

This command not only drops the specified table from the target database but also removes meta-information used for synchronization.

Use environment variables for database connection

You can (and in most cases, should) configure the database connection string as an environment variable:

export DAP_CONNECTION_STRING=postgresql://scott:password@localhost/testdb

With environment variables set, you can issue initdb, syncdb and dropdb commands directly without explicitly passing your database credentials to the terminal:

dap initdb --namespace canvas --table accounts
dap syncdb --namespace canvas --table accounts
dap dropdb --namespace canvas --table accounts

Chain a snapshot query with an incremental query

Usually, you should prefer initializing and synchronizing a database or data warehouse with the high-level commands initdb and syncdb. However, if your database engine is not supported, or you want to transform the (CSV, JSON or TSV) output as part of an ETL process, you may want to use low-level commands snapshot and incremental.

When you start using DAP, you will definitely want to download a snapshot for the table(s) you need. In the snapshot query response body, you will find a field called at, which captures the data lake state at a point in time that the snapshot corresponds to. Copy the timestamp into the since field of an incremental query request. This will guarantee that you have chained the two queries and will not miss any data.

Note that if a table has not received updates for a while (e.g. user profiles have not changed over the weekend), the value of at might be well behind current time.

Chain an incremental query with another

To fetch the most recent changes since a previous incremental query, chain the next request to the previous response using since and until. The until of a previous response becomes the since of the next request. The until of the next request should typically be omitted, it is automatically populated by DAP API. This allows you to fetch the most recent changes for a table. If a table has not received updates for a while, timestamps you see in the response may lag behind current time.

For example, suppose you submit an incremental query job #82, and receive a response whose until is 2021-07-28T19:00. You can then pass 2021-07-28T19:00 as the value for since in your next incremental query job #83. Job #83 would then return 2021-07-28T19:00 as the value of since (the exact value you submitted), and might return 2021-07-28T21:00 as until (the latest point in time for which data is available).

If you choose to fill in until in a request (which is not necessary in most cases), its value must be in the time range DAP has data for. Otherwise, your request is rejected.

Get the list of tables available for querying

The list command will return all table names from a certain namespace.

Download the latest schema for a table

The schema endpoint returns the latest schema of a table as a JSON Schema document. The schema command enables you to download the schema of a specified table as a JSON file.

Code examples

While basic functionality of the DAP client library is exposed over its command-line interface (CLI), more advanced functionality requires interacting with classes and functions defined in the module dap.api. This enables seamless integration into workflow management platforms like Apache Airflow.

First, we need to instantiate the DAPClient class:

from dap.api import DAPClient
from dap.dap_types import Credentials

base_url: str = os.environ["DAP_API_URL"]
client_id: str = os.environ["DAP_CLIENT_ID"]
client_secret: str = os.environ["DAP_CLIENT_SECRET"]

credentials = Credentials.create(client_id, client_secret)
async with DAPClient(base_url, credentials) as session:
    ...

However, DAPClient can automatically extract the value of these parameters from the above environment variables, allowing us to write:

async with DAPClient() as session:
    ...

Note that DAPClient uses an asynchronous context manager. Keywords such as async with are permitted only in an asynchronous context. We can enter such a context by invoking asyncio.run(my_function(arg1, arg2, ...)).

Let's explore a few common use cases with DAPClient.

Obtaining the latest schema

Before we obtain data, we need to get the latest schema of a table. The following example retrieves the JSON schema of the table accounts in the namespace canvas as a JSON schema object. A JSON object is a recursive Python data structure whose outermost layer is a Python dict whose keys are strings (type str) and values are JSON objects. We can use the Python package jsonschema to validate data against this JSON schema.

async with DAPClient() as session:
    schema = await session.get_table_schema("canvas", "accounts")

We can also save the schema to a file.

output_directory: str = os.getcwd()

async with DAPClient() as session:
    tables = await session.get_tables("canvas")
    for table in tables:
        await session.download_table_schema("canvas", table, output_directory)

Fetching table data with a snapshot query

In order to get an initial copy of the full table contents, we need to perform a snapshot query. The parameter format determines the output data format, including CSV, TSV, JSONL and Parquet. We recommend JSONL or Parquet. For JSONL, each line in the output can be parsed into a JSON object, conforming to the JSON schema returned above.

from dap.api import DAPClient
from dap.dap_types import Format, SnapshotQuery

output_directory = os.getcwd()

async with DAPClient() as session:
    query = SnapshotQuery(format=Format.JSONL, filter=None)
    await session.download_table_data("canvas", "accounts", query, output_directory)

Getting latest changes with an incremental query

Once an initial snapshot has been obtained, we need to keep the data synchronized with DAP. This is possible with incremental queries. The following, more complex example gets all changes since a specified since timestamp, and saves each data file on the server to an output file in the local filesystem. The last_seen timestamp is typically the until returned by a previous incremental query.

import os
from datetime import datetime, timezone
from urllib.parse import ParseResult, urlparse

import aiofiles

from dap.api import DAPClient
from dap.dap_types import Format, IncrementalQuery

# timestamp returned by last snapshot or incremental query
last_seen = datetime(2023, 2, 1, 0, 0, 0, tzinfo=timezone.utc)

async with DAPClient() as session:
    query = IncrementalQuery(
        format=Format.JSONL,
        filter=None,
        since=last_seen,
        until=None,
    )
    result = await session.get_table_data("canvas", "accounts", query)
    resources = await session.get_resources(result.objects)
    for resource in resources:
        components: ParseResult = urlparse(str(resource.url))
        file_path = os.path.join(
            os.getcwd(), "data", os.path.basename(components.path)
        )
        async with session.stream_resource(resource) as stream:
            async with aiofiles.open(file_path, "wb") as file:
                # save gzip data to file without decompressing
                async for chunk in stream.iter_chunked(64 * 1024):
                    await file.write(chunk)

Replicating data in a local database

Earlier sections have shown how to obtain the latest schema, fetch data with a snapshot query, or get the latest changes with an incremental query. These are low-level operations that give you full control over what you do with the data.

However, in most cases we want high-level operations that ensure our database (either running locally or in the cloud) is synchronized with the data in DAP, without paying attention to specifics of data transfer. This is possible with two operations that

  1. initialize a database, and
  2. synchronize a database with the data in DAP.

In order to replicate data in DAP locally, we must first initialize a database:

connection_string: str = "postgresql://scott:password@localhost/testdb"

async with DatabaseConnection(connection_string).open() as db_connection:
    async with DAPClient(base_url, credentials) as session:
        await SQLReplicator(session, db_connection).initialize(namespace, table_name)

Initialization creates a database schema for the DAP namespace, and a corresponding database table for each DAP table. In addition, it creates a meta-table, which is a special database table that holds synchronization information, e.g. the last time the data was synchronized with DAP, and the schema version that the locally stored data conforms to. Finally, it issues a snapshot query to DAP API, and populates the database table with output returned by the snapshot query.

Synchronizing data in a local database

Once the table has been initialized, it can be kept up to date using the synchronize operation:

async with DatabaseConnection(connection_string).open() as db_connection:
    async with DAPClient(base_url, credentials) as session:
        await SQLReplicator(session, db_connection).synchronize(namespace, table_name)

This inspects the information in the meta-table, and issues an incremental query to DAP API with a since timestamp corresponding to the last synchronization time. Based on the results of the incremental query, it inserts new records, updates existing records, and deletes records that have been added to, updated in, or removed from the DAP service.

If the local schema version in the meta-table is identical to the remote schema version in DAP, inserting, updating and deleting records proceeds normally. However, if there is a mismatch, the table structure of the local database has to evolve to match the current structure of the data in DAP. This includes the following schema changes in the back-end:

  • A new required (a.k.a. non-nullable) field (column) is added. The new field has a default value assigned to it in the schema.
  • A new optional (a.k.a. nullable) field (column) is added to a table.
  • A new enumeration value is added to an existing enumeration type.
  • A new enumeration type is introduced.
  • A field (column) is removed from a table.

Behind the scenes, the client library uses SQL commands such as ALTER TABLE ... ADD COLUMN ... or ALTER TYPE ... ADD VALUE ... to replicate schema changes in DAP in our local database. If the JSON schema change couldn't be mapped to a series of these SQL statements, the client library wouldn't be able to synchronize with DAP using incremental queries, and would have to issue an expensive snapshot query.

Once the local database table structure has been reconciled with the new schema in DAP, and the meta-table has been updated, data synchronization proceeds normally with insert, update and delete SQL statements.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

instructure-dap-client-0.3.7.tar.gz (44.9 kB view hashes)

Uploaded Source

Built Distribution

instructure_dap_client-0.3.7-py3-none-any.whl (50.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page