Skip to main content

A basic data pipeline tools for data engineer to handle the CRM or loyalty data

Project description

About CETL

CETL is a Python library that provides a comprehensive set of tools for building and managing data pipelines. It is designed to assist data engineers in handling Extract, Transform, and Load (ETL) tasks more effectively by simplifying the process and reducing the amount of manual labor involved.

CETL is particularly useful for Python developers who work with data on a regular basis. It uses popular data containers such as pandas dataframes, JSON objects, and PySpark dataframes to provide a familiar interface for developers. This allows users to easily integrate CETL into their existing data pipelines and workflows.

The library is intended to make the ETL process more straightforward by automating many of the technical details involved in data processing and movement. CETL includes a wide range of functions and tools for handling complex data formats, such as CSV, Excel, and JSON files, as well as for working with a variety of data sources, including databases, APIs, and cloud storage services.

One of the key benefits of CETL is its ability to handle large datasets, making it suitable for use in high-performance data processing environments. CETL also includes features for data profiling, data validation, data transformation, and data mapping, allowing users to build sophisticated data pipelines that can handle a wide range of data processing tasks.

Overall, CETL is a powerful data pipeline tool that can help data engineers to improve their productivity and streamline the ETL process. By providing a comprehensive set of functions and tools for working with data, CETL makes it easier to develop and maintain complex ETL pipelines, reducing the amount of time and effort required to manage data processing tasks.


User Guide

Example 1

GenerateDataFrame is a Python class object designed to represent a transformation step in a data pipeline. This object can be used to generate a dummy dataframe without reading actual data from a file. The main purpose of this object is to assist developers in testing their data processing pipelines.

With GenerateDataFrame, developers can quickly and easily create test data that mimics the structure of their actual data. This can be particularly useful when working with large datasets or when data is not readily available. By generating dummy data, developers can test their pipeline's functionality without having to rely on real data sources.

GenerateDataFrame is particularly useful in situations where developers need to test their pipeline's ability to handle different types of data and perform various data transformations. This can include testing the pipeline's ability to handle missing data, data outliers, and data formatting issues.

Overall, GenerateDataFrame is a powerful tool that can help developers to streamline the testing process and ensure the accuracy and efficiency of their data processing pipelines. By allowing developers to generate dummy data, it provides a quick and easy way to test their pipeline's functionality and identify any potential issues before deploying to production.

from cetl import make_pipeline
from cetl.pandas_modules import generateDataFrame
pipe = make_pipeline(generateDataFrame())
df = pipe.transform("")
print(df)
customer_id first_name last_name title
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.

Example 2

from cetl import build_pipeline
from cetl.pandas_modules import generateDataFrame, unionAll
from cetl.functional_modules import dummyStart, parallelTransformer

pipe = build_pipeline(  dummyStart(),
                        parallelTransformer([generateDataFrame(), generateDataFrame()]), 
                        unionAll())
df = pipe.transform("")
print(df)
customer_id first_name last_name title
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.

Alternatively, you can perform the same by using json configuration to the DataPipeline object

from cetl import DataPipeline
cfg = {"pipeline":[ {"type":"dummyStart", "module_type":"functional"},
                    {"type":"parallelTransformer", "transformers":[
                        {"type":"generateDataFrame"},
                        {"type":"generateDataFrame"}
                    ]},
                    {"type":"unionAll"}
]}

pipe = DataPipeline(cfg)
df = pipe.transform("")
print(df)
customer_id first_name last_name title
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.

Example 3: using kafka for data transfer between transformers

fernet_key = "20230315"
pipe_topic_name = "kafka_media_test"

cfg = { "pipeline": [   {"type":"dummyStartEmpty"},
                        {"type":"parallelTransformer", "transformers":[
                            {"type":"generateDataFrame"},
                            {"type":"generateDataFrame"},
                            {"type":"generateDataFrame"}
                        ]},
                        {"type":"unionAll"}],
                    
        "pipeline_settings":{   "print_cfg":1, 
                                "print_task_result":1, 
                                "exchange_media":"kafka",
                                "bootstrap_servers":["localhost:9092"],
                                "fernet_key":f"{fernet_key}",
                                "pipe_topic_name":f"{pipe_topic_name}"}}


cfg["pipeline_settings"]["exchange_media"]="kafka"
kafka_cfg = cfg
pipe = DataPipeline(kafka_cfg)
result = pipe.transform("")

# get the output of transformer by task_id
task_id = b"5.unionAll"
df = pipe.kafka_media.read_kafka(task_id=task_id)

# # get the output of final
print(df)
customer_id first_name last_name title
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.
0 111 peter Hong Mr.
1 222 YuCheung Wong Mr.
2 333 Cindy Wong Mrs.

Render the graph

Note: please make sure the graphviz executable file is installed.
both png file and the svg file will be exported

pipe = pipe.build_digraph()
pipe.save_png("./sample.png")

sample.png

this version will solve the issue of UnboundLocalError: local variable 'pre_transformer_key' referenced before assignment

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cetl-0.3.0.tar.gz (40.9 kB view details)

Uploaded Source

Built Distribution

cetl-0.3.0-py3-none-any.whl (60.0 kB view details)

Uploaded Python 3

File details

Details for the file cetl-0.3.0.tar.gz.

File metadata

  • Download URL: cetl-0.3.0.tar.gz
  • Upload date:
  • Size: 40.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.13

File hashes

Hashes for cetl-0.3.0.tar.gz
Algorithm Hash digest
SHA256 eb607693da67279fc9d0606e4a6792131fff89263420bbc10cc7b16cf20db6ef
MD5 5c880a3139f4994eaac65af331f6fd54
BLAKE2b-256 386dfe6ea78354f907f566d72818408a0a54651d989dc31ca018518cb848829d

See more details on using hashes here.

File details

Details for the file cetl-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: cetl-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 60.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.13

File hashes

Hashes for cetl-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7674b8ca2604f7f4d58e3486555a8eec0571875f9371743bf23df3ed98df450c
MD5 24053f822b900da81826b02ab77f1437
BLAKE2b-256 62db9f430beb5a0d2421c384203d5e96470d6c95672fa49dba859e0a95ada878

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page