Skip to main content

Provider for Apache Airflow. Implements json normalization to nested tables by yaml specification

Project description

Normalizer Provider for Apache Airflow

This package contains operator to normalize JSON as text or JSONB field to nested tables by YAML specification.

Installation

You can install provider with any package manager compatible with PyPI:

pip install apache-airflow-provider-normalizer

Features

  • Declarative specification in YAML.
  • Can extract only specified fields.
  • Work with json/jsonb or general fields.
  • Resistant to the absence of any fields.
  • Automaticaly creates all nested tables.
  • Supports incremental and full-refresh mode.
  • One YAML can contains multiple root tables.
  • Root table is normalized with snowflake schema.

Example

Available two specification styles.

# Body-style mapping
postgres.public.orders:                                         # full table name or short is supported
  staging.order:                                                # table can be renamed in destination
    id*:                      { origin_id: bigint }             # use "*" to mark primary key
    dt_created:               { date_created: timestamp }       # fields can be renamed too
    date_created:             { date_created: timestamp }       # use alias for field if changed
    state:                    { state: varchar(5) }
    details:                  { details: text }                 # json-field saved as stringified text
    details.id:               { business_key: integer }         # conflicts can be resolved with dot notation
    details.order__id:        { order__id: integer }            # nested fields can be selected with "__"
    details.client__list:                                       # list will be placed to another table
    details.purchase__list:   { client__list: text }            # also list can be saved as stringified text
    extra**.id:               { extra_id: integer }             # text field containing valid json is accessble with "**"
    extra**.user__name:       { username: varchar(255) }        # after unpacking field is selectable as generic json
    options**:                                                  # text field with json can be unpacked to separate table

postgres.orders.options:                                        # the last field of previous specification is placed
  staging.order__options:                                       # to nested dedicated table with pk/fk relationships
    ...                                                         # generated automaticaly


# Header-style mapping
postgres.orders[date_created + state + details** + extra**]:    # use explicit list of fields to unpacking in header
  staging.orders:                                               # in this case json is merged to top level with generic fields
    id*:                      { origin_id: bigint }             # it simplifies the declaration and makes it clearer
    dt_created:               { date_created: timestamp }       # (be careful with duplicates general and json fields!)
    date_created:             { date_created: timestamp }
    state:                    { state: varchar(5) }
    order__id:                { order__id: integer }
    user_name:                { username: varchar(255) }
    ...

Usage

This following example demonstrates a typical use case:

  1. Use postgres as source system
  2. Save nested tables to mysql database
  3. Using mapping specification as shown above
from airflow import DAG
from airflow.providers.normalizer.operators.normalizer import NormalizerOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="normalize_example",
    description="Normalize nested jsons to analytical database",
    start_date=days_ago(1),
    schedule="0 * * * *",
) as dag:

    # Simple normalize with mapping file
    NormalizerOperator(
        task_id="normalize_json",
        source_conn_id="postgres_default",
        destination_conn_id="mysql_default",
        mapping="mappings/mapping.yaml",
    )

Another example demonstrates a custom preprocessing use case with full-refresh mode:

from airflow import DAG
from airflow.providers.normalizer.operators.normalizer import NormalizerOperator
from airflow.utils.dates import days_ago


def preprocessing(doc):
    """ Fix bad datetime format. """
    if "contract" in doc:
        if str(doc["contract"]["date"]) == "0001-01-01T00:00:00+00:00":
            doc["contract"]["date"] = "1970-01-01 00:00:00"
    return [doc]


with DAG(
    dag_id="normalize_example",
    description="Normalize nested jsons with custom preprocessing",
    start_date=days_ago(1),
    schedule="0 * * * *",
) as dag:

    # Simple normalize with drop tables before and preprocessing documents
    NormalizerOperator(
        task_id="normalize_json",
        source_conn_id="postgres_default",
        destination_conn_id="mysql_default",
        mapping="""
            postgres.orders[order_details + state + dt_created + dt_changed]:
              staging.orders:
                # general columns
                state:                       { state: String }
                dt_created:                  { created_date: DateTime }
                dt_changed:                  { changed_date: DateTime }
                init_system:                 { init_system: String }
                reference_id:                { reference_id: String }
                # json-fields
                client__id:                  { client__id: String }
                client__name:                { client__name: String }
                contract__id:                { contract__id: String }
                contract__date:              { contract__date: DateTime }
                contract__branch__id:        { contract__branch__id: String }
        """,
        preprocessing=preprocessing,
        commit_every=100,
    )

Last example demonstrates a custom SQL-templates usage with extended SQL syntax:

from airflow import DAG
from airflow.providers.normalizer.operators.normalizer import NormalizerOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="normalize_example",
    description="Normalize nested jsons with custom SQL-templates",
    start_date=days_ago(1),
    schedule="0 * * * *",
) as dag:

    # Customize specific SQL-dialect in deep
    NormalizerOperator(
        task_id="normalize_json",
        source_conn_id="postgres_default",
        destination_conn_id="clickhouse_default",
        mapping="mapping/clickhouse.yaml",
        select_count_query=(
            '''
                SELECT count(id)
                FROM {table}
            '''
        ),
        select_all_query=(
            '''
                SELECT {fields}
                FROM {table}
                WHERE date_system > {{ date_system }}
            '''
        ),
        create_table_query=(
            '''
                DROP TABLE IF EXISTS {table}
            ''',
            '''
                CREATE TABLE IF NOT EXISTS {table} (
                    {definition}
                )
                ENGINE = MergeTree
                ORDER BY {fk or pk}
            ''',
        ),
        insert_into_query=(
            '''
                INSERT INTO {table} (
                    {fields}
                )
                VALUES {values}
            ''',
        ),
        incremental=True,
        retries=2,
    )

NormalizerOperator Reference

To import NormalizerOperator use:

from airflow.providers.normalizer.operators.normalizer import NormalizerOperator

Supported kwargs:

  • source_conn_id: source connection id that contains table with text/json/jsonb.
  • destination_conn_id: destination connection id where nested tables is placed.
  • mapping: YAML specification
  • select_count_query: overwrites base query used to calculate total for pagination
  • select_all_query: overwrites base query used to LIMIT/OFFSET pagination
  • create_table_query: overwrites base template query
  • incremental: set update mode, by default is False
  • Other kwargs are inherited from Airflow BaseOperator

In the query templates placeholders are available:

  • {table}: placeholder for table name
  • {definition}: placeholder for pairs field name and type (e.g. CREATE)
  • {fields}: placeholder for list of columns comma separated (e.g. SELECT)
  • {values}: placeholder for mulitvalue-INSERT comma separated values
  • {pk}: name of generated primary key
  • {fk}: name of generated foreign key

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

File details

Details for the file apache-airflow-providers-normalizer-1.0.0.tar.gz.

File metadata

File hashes

Hashes for apache-airflow-providers-normalizer-1.0.0.tar.gz
Algorithm Hash digest
SHA256 fcbabae64369da33697478cbf466521e1eda0fe7d8b58bd3940d6e25a99f49de
MD5 feb1f1aec1f8c07ed909ba4505120e8f
BLAKE2b-256 169422606b6c68121a1466ea12960076c17ab2d9ae21ce9d265564200242ce9a

See more details on using hashes here.

File details

Details for the file apache_airflow_providers_normalizer-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for apache_airflow_providers_normalizer-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6c84ff2bb5995f7b5ea897a2adb3e57c9e1e187fa1fbcdcf47f3956fd0aa1a3e
MD5 2c5a74c67e7fbe26fd2d2edb1ebb7133
BLAKE2b-256 817a64f73c606730b6f83db85a30854070e6b2a3b0d1f9956f519c46c8396543

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page