A basic data pipeline tools for data engineer to handle the CRM or loyalty data
Project description
About CETL
CETL is a Python library that provides a comprehensive set of tools for building and managing data pipelines. It is designed to assist data engineers in handling Extract, Transform, and Load (ETL) tasks more effectively by simplifying the process and reducing the amount of manual labor involved.
CETL is particularly useful for Python developers who work with data on a regular basis. It uses popular data containers such as pandas dataframes, JSON objects, and PySpark dataframes to provide a familiar interface for developers. This allows users to easily integrate CETL into their existing data pipelines and workflows.
The library is intended to make the ETL process more straightforward by automating many of the technical details involved in data processing and movement. CETL includes a wide range of functions and tools for handling complex data formats, such as CSV, Excel, and JSON files, as well as for working with a variety of data sources, including databases, APIs, and cloud storage services.
One of the key benefits of CETL is its ability to handle large datasets, making it suitable for use in high-performance data processing environments. CETL also includes features for data profiling, data validation, data transformation, and data mapping, allowing users to build sophisticated data pipelines that can handle a wide range of data processing tasks.
Overall, CETL is a powerful data pipeline tool that can help data engineers to improve their productivity and streamline the ETL process. By providing a comprehensive set of functions and tools for working with data, CETL makes it easier to develop and maintain complex ETL pipelines, reducing the amount of time and effort required to manage data processing tasks.
User Guide
Example 1
GenerateDataFrame is a Python class object designed to represent a transformation step in a data pipeline. This object can be used to generate a dummy dataframe without reading actual data from a file. The main purpose of this object is to assist developers in testing their data processing pipelines.
With GenerateDataFrame, developers can quickly and easily create test data that mimics the structure of their actual data. This can be particularly useful when working with large datasets or when data is not readily available. By generating dummy data, developers can test their pipeline's functionality without having to rely on real data sources.
GenerateDataFrame is particularly useful in situations where developers need to test their pipeline's ability to handle different types of data and perform various data transformations. This can include testing the pipeline's ability to handle missing data, data outliers, and data formatting issues.
Overall, GenerateDataFrame is a powerful tool that can help developers to streamline the testing process and ensure the accuracy and efficiency of their data processing pipelines. By allowing developers to generate dummy data, it provides a quick and easy way to test their pipeline's functionality and identify any potential issues before deploying to production.
from cetl import make_pipeline
from cetl.pandas_modules import generateDataFrame
pipe = make_pipeline(generateDataFrame())
df = pipe.transform("")
print(df)
customer_id | first_name | last_name | title | |
---|---|---|---|---|
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
Example 2
from cetl import build_pipeline
from cetl.pandas_modules import generateDataFrame, unionAll
from cetl.functional_modules import dummyStart, parallelTransformer
pipe = build_pipeline( dummyStart(),
parallelTransformer([generateDataFrame(), generateDataFrame()]),
unionAll())
df = pipe.transform("")
print(df)
customer_id | first_name | last_name | title | |
---|---|---|---|---|
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
Alternatively, you can perform the same by using json configuration to the DataPipeline object
from cetl import DataPipeline
cfg = {"pipeline":[ {"type":"dummyStart", "module_type":"functional"},
{"type":"parallelTransformer", "transformers":[
{"type":"generateDataFrame"},
{"type":"generateDataFrame"}
]},
{"type":"unionAll"}
]}
pipe = DataPipeline(cfg)
df = pipe.transform("")
print(df)
customer_id | first_name | last_name | title | |
---|---|---|---|---|
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
Example 3: using kafka for data transfer between transformers
fernet_key = "20230315"
pipe_topic_name = "kafka_media_test"
cfg = { "pipeline": [ {"type":"dummyStartEmpty"},
{"type":"parallelTransformer", "transformers":[
{"type":"generateDataFrame"},
{"type":"generateDataFrame"},
{"type":"generateDataFrame"}
]},
{"type":"unionAll"}],
"pipeline_settings":{ "print_cfg":1,
"print_task_result":1,
"exchange_media":"kafka",
"bootstrap_servers":["localhost:9092"],
"fernet_key":f"{fernet_key}",
"pipe_topic_name":f"{pipe_topic_name}"}}
cfg["pipeline_settings"]["exchange_media"]="kafka"
kafka_cfg = cfg
pipe = DataPipeline(kafka_cfg)
result = pipe.transform("")
# get the output of transformer by task_id
task_id = b"5.unionAll"
df = pipe.kafka_media.read_kafka(task_id=task_id)
# # get the output of final
print(df)
customer_id | first_name | last_name | title | |
---|---|---|---|---|
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
0 | 111 | peter | Hong | Mr. |
1 | 222 | YuCheung | Wong | Mr. |
2 | 333 | Cindy | Wong | Mrs. |
Render the graph
Note: please make sure the graphviz executable file is installed.
both png file and the svg file will be exported
pipe = pipe.build_digraph()
pipe.save_png("./sample.png")
sample.png
this version will solve the issue of UnboundLocalError: local variable 'pre_transformer_key' referenced before assignmentProject details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file cetl-0.3.0.tar.gz
.
File metadata
- Download URL: cetl-0.3.0.tar.gz
- Upload date:
- Size: 40.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | eb607693da67279fc9d0606e4a6792131fff89263420bbc10cc7b16cf20db6ef |
|
MD5 | 5c880a3139f4994eaac65af331f6fd54 |
|
BLAKE2b-256 | 386dfe6ea78354f907f566d72818408a0a54651d989dc31ca018518cb848829d |
File details
Details for the file cetl-0.3.0-py3-none-any.whl
.
File metadata
- Download URL: cetl-0.3.0-py3-none-any.whl
- Upload date:
- Size: 60.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.13
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7674b8ca2604f7f4d58e3486555a8eec0571875f9371743bf23df3ed98df450c |
|
MD5 | 24053f822b900da81826b02ab77f1437 |
|
BLAKE2b-256 | 62db9f430beb5a0d2421c384203d5e96470d6c95672fa49dba859e0a95ada878 |