Skip to main content

Spooq is a PySpark based helper library for ETL data ingestion pipeline in Data Lakes.

Project description

Documentation Status Project License

Welcome to Spooq!

Spooq is your PySpark based helper library for ETL data ingestion pipeline in Data Lakes.

The main components are:
  • Extractors

  • Transformers

  • Loaders

Those components are independent and can be used separately or be plugged-in into a pipeline instance. You can also use the custom functions from the Mapper transformer directly with PySpark (f.e. select or withColumn).

Example of Mapper Transformer

from pyspark.sql import Row
from pyspark.sql import functions as F, types as T
from spooq.transformer import Mapper
from spooq.transformer import mapper_transformations as spq

input_df = spark.createDataFrame(
   [
      Row(
         struct_a=Row(idx="000_123_456", sts="enabled", ts="1597069446000"),
         struct_b=Row(itms="1,2,4", sts="whitelisted", ts="2020-08-12T12:43:14+0000"),
         struct_c=Row(email="abc@def.com", gndr="F", dt="2020-08-05", cmt="fine"),
      ),
      Row(
         struct_a=Row(idx="000_654_321", sts="off", ts="1597069500784"),
         struct_b=Row(itms="5", sts="blacklisted", ts="2020-07-01T12:43:14+0000"),
         struct_c=Row(email="", gndr="m", dt="2020-06-27", cmt="faulty"),
      ),
   ],
   schema="""
      a: struct<idx string, sts string, ts string>,
      b: struct<itms string, sts string, ts string>,
      c: struct<email string, gndr string, dt string, cmt string>
   """
)
input_df.printSchema()
root
 |-- a: struct (nullable = true)
 |    |-- idx: string (nullable = true)
 |    |-- sts: string (nullable = true)
 |    |-- ts: string (nullable = true)
 |-- b: struct (nullable = true)
 |    |-- itms: string (nullable = true)
 |    |-- sts: string (nullable = true)
 |    |-- ts: string (nullable = true)
 |-- c: struct (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- gndr: string (nullable = true)
 |    |-- dt: string (nullable = true)
 |    |-- cmt: string (nullable = true)


mapping = [
    # output_name     # source                # transformation
   ("index",          "a.idx",                spq.to_int),  # removes leading zeros and underline characters
   ("is_enabled",     "a.sts",                spq.to_bool),  # recognizes additional words like "on", "off", "disabled", "enabled", ...
   ("a_updated_at",   "a.ts",                 spq.to_timestamp),  # supports unix timestamps in ms or seconds and strings
   ("items",          "b.itms",               spq.str_to_array(cast="int")),  # splits a comma delimited string into an array and casts its elements
   ("block_status",   "b.sts",                spq.map_values(mapping={"whitelisted": "allowed", "blacklisted": "blocked"})),  # applies lookup dictionary
   ("b_updated_at",   "b.ts",                 spq.to_timestamp),  # supports unix timestamps in ms or seconds and strings
   ("has_email",      "c.email",              spq.has_value),  # interprets also empty strings as no value, although, zeros are values
   ("gender",         "c.gndr",               spq.apply(func=F.lower)),  # applies provided function to all values
   ("creation_date",  "c.dt",                 spq.to_timestamp(cast="date")),  # explicitly casts result after transformation
   ("processed_at",   F.current_timestamp(),  spq.as_is),  # source column is a function, no transformation to the results
   ("comment",        "c.cmt",                "string"),  # no transformation, only cast; alternatively: spq.to_str or spq.as_is(cast="string")
]
output_df = Mapper(mapping).transform(input_df)

output_df.show(truncate=False)
+------+----------+-----------------------+---------+------------+-------------------+---------+------+-------------+----------------------+-------+
|index |is_enabled|a_updated_at           |items    |block_status|b_updated_at       |has_email|gender|creation_date|processed_at          |comment|
+------+----------+-----------------------+---------+------------+-------------------+---------+------+-------------+----------------------+-------+
|123456|true      |2020-08-10 16:24:06    |[1, 2, 4]|allowed     |2020-08-12 14:43:14|true     |f     |2020-08-05   |2022-08-12 09:17:09.83|fine   |
|654321|false     |2020-08-10 16:25:00.784|[5]      |blocked     |2020-07-01 14:43:14|false    |m     |2020-06-27   |2022-08-12 09:17:09.83|faulty |
+------+----------+-----------------------+---------+------------+-------------------+---------+------+-------------+----------------------+-------+


output_df.printSchema()
root
 |-- index: integer (nullable = true)
 |-- is_enabled: boolean (nullable = true)
 |-- a_updated_at: timestamp (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- block_status: string (nullable = true)
 |-- b_updated_at: timestamp (nullable = true)
 |-- has_email: boolean (nullable = false)
 |-- gender: string (nullable = true)
 |-- creation_date: date (nullable = true)
 |-- processed_at: timestamp (nullable = false)
 |-- comment: string (nullable = true)

Features / Components

Transformers

Extractors

Loaders

Installation

pip install spooq

Online Documentation

For a more details please consult the online documentation at Online Documentation

Changelog

Changelog

Contribution

Please see Contribute for more information.

License

This library is licensed under the Project License


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spooq-4.0.1.tar.gz (41.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

spooq-4.0.1-py3-none-any.whl (51.8 kB view details)

Uploaded Python 3

File details

Details for the file spooq-4.0.1.tar.gz.

File metadata

  • Download URL: spooq-4.0.1.tar.gz
  • Upload date:
  • Size: 41.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for spooq-4.0.1.tar.gz
Algorithm Hash digest
SHA256 71b924658be3c69c9174a0a2edc5db63b1a69b319f1594ed6eb87d9fd1aecafa
MD5 6aecb3f7ab4f3301fb9486cc67acdf0c
BLAKE2b-256 9e1aa75dd180ec2548abda61f25299cdb39de47eeb58844925ce849c6c6ceb61

See more details on using hashes here.

File details

Details for the file spooq-4.0.1-py3-none-any.whl.

File metadata

  • Download URL: spooq-4.0.1-py3-none-any.whl
  • Upload date:
  • Size: 51.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for spooq-4.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 d57c3e988d2e6d8227f488b78ef6f7504cd7ec7196d4a408b2f24db8ffdc53c6
MD5 a0a56480778d8bfcdc2fb9b51ceec62c
BLAKE2b-256 42430ac784e85e5ddeb66db741524db7384e7b3b6346e83519f9765f0b3033e7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page