Provider for Apache Airflow. Implements json normalization to nested tables by yaml specification
Project description
Normalizer Provider for Apache Airflow
This package contains operator to normalize JSON as text or JSONB field to nested tables by YAML specification.
Installation
You can install provider with any package manager compatible with PyPI:
pip install apache-airflow-provider-normalizer
Features
- Declarative specification in YAML.
- Can extract only specified fields.
- Work with json/jsonb or general fields.
- Resistant to the absence of any fields.
- Automaticaly creates all nested tables.
- Supports incremental and full-refresh mode.
- One YAML can contains multiple root tables.
- Root table is normalized with snowflake schema.
Example
Available two specification styles.
# Body-style mapping
postgres.public.orders: # full table name or short is supported
staging.order: # table can be renamed in destination
id*: { origin_id: bigint } # use "*" to mark primary key
dt_created: { date_created: timestamp } # fields can be renamed too
date_created: { date_created: timestamp } # use alias for field if changed
state: { state: varchar(5) }
details: { details: text } # json-field saved as stringified text
details.id: { business_key: integer } # conflicts can be resolved with dot notation
details.order__id: { order__id: integer } # nested fields can be selected with "__"
details.client__list: # list will be placed to another table
details.purchase__list: { client__list: text } # also list can be saved as stringified text
extra**.id: { extra_id: integer } # text field containing valid json is accessble with "**"
extra**.user__name: { username: varchar(255) } # after unpacking field is selectable as generic json
options**: # text field with json can be unpacked to separate table
postgres.orders.options: # the last field of previous specification is placed
staging.order__options: # to nested dedicated table with pk/fk relationships
... # generated automaticaly
# Header-style mapping
postgres.orders[date_created + state + details** + extra**]: # use explicit list of fields to unpacking in header
staging.orders: # in this case json is merged to top level with generic fields
id*: { origin_id: bigint } # it simplifies the declaration and makes it clearer
dt_created: { date_created: timestamp } # (be careful with duplicates general and json fields!)
date_created: { date_created: timestamp }
state: { state: varchar(5) }
order__id: { order__id: integer }
user_name: { username: varchar(255) }
...
Usage
This following example demonstrates a typical use case:
- Use postgres as source system
- Save nested tables to mysql database
- Using mapping specification as shown above
from airflow import DAG
from airflow.providers.normalizer.operators.normalizer import NormalizerOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="normalize_example",
description="Normalize nested jsons to analytical database",
start_date=days_ago(1),
schedule="0 * * * *",
) as dag:
# Simple normalize with mapping file
NormalizerOperator(
task_id="normalize_json",
source_conn_id="postgres_default",
destination_conn_id="mysql_default",
mapping="mappings/mapping.yaml",
)
Another example demonstrates a custom preprocessing use case with full-refresh mode:
from airflow import DAG
from airflow.providers.normalizer.operators.normalizer import NormalizerOperator
from airflow.utils.dates import days_ago
def preprocessing(doc):
""" Fix bad datetime format. """
if "contract" in doc:
if str(doc["contract"]["date"]) == "0001-01-01T00:00:00+00:00":
doc["contract"]["date"] = "1970-01-01 00:00:00"
return [doc]
with DAG(
dag_id="normalize_example",
description="Normalize nested jsons with custom preprocessing",
start_date=days_ago(1),
schedule="0 * * * *",
) as dag:
# Simple normalize with drop tables before and preprocessing documents
NormalizerOperator(
task_id="normalize_json",
source_conn_id="postgres_default",
destination_conn_id="mysql_default",
mapping="""
postgres.orders[order_details + state + dt_created + dt_changed]:
staging.orders:
# general columns
state: { state: String }
dt_created: { created_date: DateTime }
dt_changed: { changed_date: DateTime }
init_system: { init_system: String }
reference_id: { reference_id: String }
# json-fields
client__id: { client__id: String }
client__name: { client__name: String }
contract__id: { contract__id: String }
contract__date: { contract__date: DateTime }
contract__branch__id: { contract__branch__id: String }
""",
preprocessing=preprocessing,
commit_every=100,
)
Last example demonstrates a custom SQL-templates usage with extended SQL syntax:
from airflow import DAG
from airflow.providers.normalizer.operators.normalizer import NormalizerOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id="normalize_example",
description="Normalize nested jsons with custom SQL-templates",
start_date=days_ago(1),
schedule="0 * * * *",
) as dag:
# Customize specific SQL-dialect in deep
NormalizerOperator(
task_id="normalize_json",
source_conn_id="postgres_default",
destination_conn_id="clickhouse_default",
mapping="mapping/clickhouse.yaml",
select_count_query=(
'''
SELECT count(id)
FROM {table}
'''
),
select_all_query=(
'''
SELECT {fields}
FROM {table}
WHERE date_system > {{ date_system }}
'''
),
create_table_query=(
'''
DROP TABLE IF EXISTS {table}
''',
'''
CREATE TABLE IF NOT EXISTS {table} (
{definition}
)
ENGINE = MergeTree
ORDER BY {fk or pk}
''',
),
insert_into_query=(
'''
INSERT INTO {table} (
{fields}
)
VALUES {values}
''',
),
incremental=True,
retries=2,
)
NormalizerOperator Reference
To import NormalizerOperator
use:
from airflow.providers.normalizer.operators.normalizer import NormalizerOperator
Supported kwargs:
source_conn_id
: source connection id that contains table with text/json/jsonb.destination_conn_id
: destination connection id where nested tables is placed.mapping
: YAML specificationselect_count_query
: overwrites base query used to calculate total for paginationselect_all_query
: overwrites base query used to LIMIT/OFFSET paginationcreate_table_query
: overwrites base template queryincremental
: set update mode, by default isFalse
- Other kwargs are inherited from Airflow
BaseOperator
In the query templates placeholders are available:
{table}
: placeholder for table name{definition}
: placeholder for pairs field name and type (e.g. CREATE){fields}
: placeholder for list of columns comma separated (e.g. SELECT){values}
: placeholder for mulitvalue-INSERT comma separated values{pk}
: name of generated primary key{fk}
: name of generated foreign key
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file apache-airflow-providers-normalizer-1.0.0.tar.gz
.
File metadata
- Download URL: apache-airflow-providers-normalizer-1.0.0.tar.gz
- Upload date:
- Size: 16.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fcbabae64369da33697478cbf466521e1eda0fe7d8b58bd3940d6e25a99f49de |
|
MD5 | feb1f1aec1f8c07ed909ba4505120e8f |
|
BLAKE2b-256 | 169422606b6c68121a1466ea12960076c17ab2d9ae21ce9d265564200242ce9a |
File details
Details for the file apache_airflow_providers_normalizer-1.0.0-py3-none-any.whl
.
File metadata
- Download URL: apache_airflow_providers_normalizer-1.0.0-py3-none-any.whl
- Upload date:
- Size: 15.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6c84ff2bb5995f7b5ea897a2adb3e57c9e1e187fa1fbcdcf47f3956fd0aa1a3e |
|
MD5 | 2c5a74c67e7fbe26fd2d2edb1ebb7133 |
|
BLAKE2b-256 | 817a64f73c606730b6f83db85a30854070e6b2a3b0d1f9956f519c46c8396543 |