Skip to main content

Spooq is a PySpark based helper library for ETL data ingestion pipeline in Data Lakes.

Project description

Documentation Status

Welcome to Spooq

Spooq is a PySpark based helper library for ETL data ingestion pipeline in Data Lakes.
The main components are:

  • Extractors
  • Transformers
  • Loaders

Those components can be plugged-in into a pipeline instance or used separately.

Features / Components

Installation

pip install spooq

Examples

Pipeline with JSON source, various transformations and HIVE target database

from pyspark.sql import functions as F
import datetime
from spooq.pipeline import Pipeline
import spooq.extractor as   E
import spooq.transformer as T
import spooq.loader as      L

# Schema of input data
# 
#  |-- attributes: struct (nullable = true)
#  |    |-- birthday: string (nullable = true)
#  |    |-- email: string (nullable = true)
#  |    |-- first_name: string (nullable = true)
#  |    |-- friends: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- first_name: string (nullable = true)
#  |    |    |    |-- id: long (nullable = true)
#  |    |    |    |-- last_name: string (nullable = true)
#  |    |-- gender: string (nullable = true)
#  |    |-- ip_address: string (nullable = true)
#  |    |-- last_name: string (nullable = true)
#  |    |-- university: string (nullable = true)
#  |-- guid: string (nullable = true)
#  |-- id: long (nullable = true)
#  |-- location: struct (nullable = true)
#  |    |-- latitude: string (nullable = true)
#  |    |-- longitude: string (nullable = true)
#  |-- meta: struct (nullable = true)
#  |    |-- created_at_ms: long (nullable = true)
#  |    |-- created_at_sec: long (nullable = true)
#  |    |-- version: long (nullable = true)

#  Definition how the output table should look like and where the attributes come from:
partition_value = F.lit(datetime.date.today().strftime("%Y%m%d"))
users_mapping = [
    ("id",              "id",                                "IntegerType"),
    ("guid",            "guid",                              "StringType"),
    ("forename",        "attributes.first_name",             "StringType"),
    ("surename",        "attributes.last_name",              "StringType"),
    ("gender",          F.upper(F.col("attributes.gender")), "StringType"),
    ("has_email",       "attributes.email",                  "has_value"),
    ("has_university",  "attributes.university",             "has_value"),
    ("created_at",      "meta.created_at_ms",                "extended_string_to_timestamp"),
    ("dt",              partition_value,                     "IntegerType"),
]

#  The main object where all steps are defined:
users_pipeline = Pipeline()

#  Defining the EXTRACTION (SequenceFiles with timestamp & json string as bytecode):
users_pipeline.set_extractor(E.JSONExtractor(
    input_path="tests/data/schema_v1/sequenceFiles"
))

#  Defining the TRANSFORMATIONS:
users_pipeline.add_transformers([
    T.Mapper(mapping=users_mapping),
    T.ThresholdCleaner(
        thresholds={
            "created_at": {"min": 0, "max": 1580737513, "default": None}
        },
        column_to_log_cleansed_values="cleansed_values_threshold"),
    T.EnumCleaner(
        cleaning_definitions={
            "gender": {
                "elements": ["F", "M", "X"], 
                "mode": "allow", 
                "default": "undefined"
            }
        },
        column_to_log_cleansed_values="cleansed_values_enum"
    ),
    T.NewestByGroup(group_by="id", order_by="created_at")
])

#  Defining the LOADING:
users_pipeline.set_loader(L.HiveLoader(
    db_name="users_and_friends",
    table_name="users",
    partition_definitions=[{
        "column_name": "dt",
        "column_type": "IntegerType",
        "default_value": -1}],
    repartition_size=10,
))

#  Executing the whole ETL pipeline
users_pipeline.execute()

Structured Streaming with selected Transformers

from pyspark.sql import functions as F
import datetime
from spooq.transformer import Mapper

# Input Stream
input_stream = (
    spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
)

#  Definition how the output stream should look like and where the attributes come from:
partition_value = F.lit(datetime.date.today().strftime("%Y%m%d"))
users_mapping = [
    ("id",              "id",                                "IntegerType"),
    ("guid",            "guid",                              "StringType"),
    ("forename",        "attributes.first_name",             "StringType"),
    ("surename",        "attributes.last_name",              "StringType"),
    ("gender",          F.upper(F.col("attributes.gender")), "StringType"),
    ("has_email",       "attributes.email",                  "has_value"),
    ("has_university",  "attributes.university",             "has_value"),
    ("created_at",      "meta.created_at_ms",                "timestamp_ms_to_s"),
    ("dt",              partition_value,                     "IntegerType"),
]
mapped_stream = Mapper(mapping=users_mapping).transform(input_stream)

# Cleansing gender attribute
cleansed_stream = EnumCleaner(
    cleaning_definitions={
        "gender": {
            "elements": ["F", "M", "X"], 
            "mode": "allow", 
            "default": "undefined"
        }
    },
    column_to_log_cleansed_values="cleansed_values_enum"
).transform(mapped_stream)

# Output Stream
output_stream = (
    cleansed_stream
    .writeStream
    .outputMode("complete")
    .format("console")
    .start()
)

output_stream.awaitTermination()

Online Documentation

For a more details please consult the online documentation at spooq.readthedocs.io.

Changelog

Changelog

Contribution

Please see CONTRIBUTING.rst for more information.

License

This library is licensed under the MIT License.

========= Changelog

3.3.7 (2022-03-15)

  • [FIX] Fix long overflow in extended_string_to_timestamp

3.3.6 (2021-11-19)

  • [FIX] Fix Cleaners logs in case of field type different than string

3.3.5 (2021-11-16)

  • [ADD] Add Null Cleaner spooq.transformer.NullCleaner

3.3.4 (2021-07-21)

  • [MOD] Store null value instead of an empty Map in case no cleansing was necessary

3.3.3 (2021-06-30)

  • [MOD] Change logic for storing cleansed values as MapType Column to not break Spark (logical plan got to big)
  • [MOD] Add streaming tests (parquet & delta) for EnumCleaner unit tests.

3.3.2

  • Left out intentionally as there is already a yanked version 3.3.2 on PyPi

3.3.1 (2021-06-22)

  • [MOD] Add option to store logged cleansed values as MapType (Enum & Threshold based cleansers)
  • [FIX] Fix TOC on PyPi, add more links to PyPi

3.3.0 (2021-04-22)

  • [MOD] (BREAKING CHANGE!) rename package back to Spooq (dropping "2"). This means you need to update all imports from spooq2.xxx.yyy to spooq.xxx.yyy in your code!
  • [MOD] Prepare for PyPi release
  • [MOD] Drop official support for Spark 2.x (it most probably still works without issues, but some tests fail on Spark2 due to different columns ordering and the effort is too high to maintain both versions with respect to tests)

3.2.0 (2021-04-13)

  • [MOD] Add functionality to log cleansed values into separate struct column (column_to_log_cleansed_values)
  • [MOD] Add ignore_ambiguous_columns to Mapper
  • [MOD] Log spooq version when importing
  • [REM] Drop separate spark package (bin-folder) as pip package can now handle all tests as well
  • [ADD] Github action to test on label (test-it) or merge into master

3.1.0 (2021-01-27)

  • [ADD] EnumCleaner Transformer
  • [MOD] Add support for dynamic default values with the ThresholdCleaner

3.0.1 (2021-01-22)

  • [MOD] extended_string_to_timestamp: now keeps milli seconds (no more cast to LongType) for conversion to Timestamp

3.0.0b (2020-12-09)

  • [ADD] Spark 3 support (different handling in tests via only_sparkX decorators)
  • [FIX] Fix null types in schema for custom transformations on missing columns
  • [MOD] (BREAKING CHANGE!) set default for ignore_missing_columns of Mapper to False (fails on missing input columns)

2.3.0 (2020-11-23)

  • [MOD] extended_string_to_timestamp: it can now handle unix timestamps in seconds and in milliseconds
  • [MOD] extended_string_to_date: it can now handle unix timestamps in seconds and in milliseconds

2.2.0 (2020-10-02)

  • [MOD] Add support for prepending and appending mappings on input dataframe (Mapper)

  • [MOD] Add support for custom spark sql functions in mapper without injecting methods

  • [MOD] Add support for "on"/"off" and "enabled"/"disabled" in extended_string_to_boolean custom mapper transformations

  • [ADD] New custom mapper transformations:

    • extended_string_to_date
    • extended_string_unix_timestamp_ms_to_date
    • has_value

2.1.1 (2020-09-04)

  • [MOD] drop_rows_with_empty_array flag to allow keeping rows with empty array after explosion

  • [MOD] Additional test-cases for extended_string mappings (non string inputs)

  • [FIX] Remove STDERR logging, don't touch root logging level anymore (needs to be done outside spooq to see some lower log levels)

  • [ADD] New custom mapper transformations:

    • extended_string_unix_timestamp_ms_to_timestamp

2.1.0 (2020-08-17)

  • [ADD] Python 3 support

  • [MOD] ignore_missing_columns flag to fail on missing input columns with Mapper transformer (https://github.com/Breaka84/Spooq/pull/6)

  • [MOD] Timestamp support for threshold cleaner

  • [ADD] New custom mapper transformations:

    • meters_to_cm
    • unix_timestamp_ms_to_spark_timestamp
    • extended_string_to_int
    • extended_string_to_long
    • extended_string_to_float
    • extended_string_to_double
    • extended_string_to_boolean
    • extended_string_to_timestamp

2.0.0 (2020-05-22)

  • [UPDATE] Upgrade to use Spark 2 (tested for 2.4.3) -> will no longer work for spark 1
  • Breaking changes (severe refactoring)

0.6.2 (2019-05-13)

  • [FIX] Logger writes now to std_out and std_err & logger instance is shared across all spooq instances
  • [FIX] PyTest version locked to 3.10.1 as 4+ broke the tests
  • [MOD] Removes id_function to create names for parameters in test methods (fallback to built-in)
  • [ADD] Change SelectNewestByGroup from string eval to pyspark objects
  • [FIX] json_string is now able to None values

0.6.1 (2019-03-26)

  • [FIX] PassThrough Extractor (input df now defined at instantiation time)
  • [ADD] json_string new custom data type

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Spooq-3.3.7.tar.gz (36.2 kB view hashes)

Uploaded Source

Built Distribution

Spooq-3.3.7-py3-none-any.whl (45.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page