this is an example of a simple data pipeline
Project description
tgedr-simplepipe
[[TOC]]
data pipeline use case: ticker news and its sentiment
We want to create a data pipeline that retrieves news about a specific set of tickers and applies an NLP model defining the sentiment of each news article.
graph TD
Start([Start]) --> Process1[fetch news
create articles]
Process1 --> Process2[sentiment analysis
data quality validation
append data]
Process2 --> End([End])
Tasks
- get news
- apply sentiment + validate data + store data
The solution involves
- business logic
- the news extraction feature is created in this project module
src/tgedr/simplepipe/newsfor simplicity, in an organisation setting we can think of this as something provided by a different team, having ownership on this business logic this team could eventually package this feature in a separate library that we could consume here in the pipeline; - the sentiment analysis is tipically owned by a data science/ML team, that can provide this feature in
a separate library. In this pipeline we consume an external public library for the NLP sentiment analysis implementation,
even if not from an internal team it ends being a simple example of what can be achieved with this rationale
(see
src/tgedr/simplepipe/etl/sentiment_etl.py);
- the news extraction feature is created in this project module
- business logic coupling
- business logic features should work as
pure functionsas much as possible, it should be provided with the data and tools required to perform its transformation but kept uncoupled fromhowandwhereit is provided; - pipeline tasks enclosing business logic are defined as implementations of an ETL abstract class
(see
src/tgedr/simplepipe/etl/etl.py). The main purpose of this abstract class is to provide scaffolding for any kind of data transformation, whether its data science or pure data engineering. (check the etl implementations:src/tgedr/simplepipe/etl/news_etl.pyandsrc/tgedr/simplepipe/etl/sentiment_etl.py)
- business logic features should work as
- data quality validation -
Great Expectationslibrary provides solutions for data quality validation, check the component insrc/tgedr/simplepipe/utils/validation/data_validation.pyand its pandas specific implementation (src/tgedr/simplepipe/utils/validation/pandas_validation.py) that uses a legacy GE library to easily validate data expectations against a json specification; - data storage: for simplicity the data is being stored in the repository
runtime/data/folder and there is a persistenceStoreabstract component with a specific parquet implementation (src/tgedr/simplepipe/store/parquet_store.py) that is used to persist the data
The pipeline
- is defined by a sequence of tasks in a
github actionspipeline namedexecution(see.github/workflows/execution.yml) that can be triggered manually in the repository page - tasks implementation are bundled in a library published to PyPi
- the tasks are invoked in the
executionpipeline using the library entrypoint that allows to parameterize the module, class and params to be used:.venv/bin/run --module tgedr.simplepipe.etl.news_etl \ --classname NewsEtl \ --callable run \ --classparams "{\"configuration\": {\"tickers\": \"NVDA,ARM\"}}" - the final step in the pipeline run is to commit and push the updated data back to the repository
Further improvements to be made (regardless of simplistic approach):
- storage:
- currently data is being appended and there will be duplicates if the pipeline is run in overlapping time windows
- data contracts implementation
- observability: should send telemetry to a collector somewhere, the integration of logger, metrics and tracer should be abstracted in the ETL component with a conventional interface (OpenTelemetry)
- coupling ds/ml team code and ETL component
- this needs collaboration to create the simplest possible solution that provides abstraction and does not stiffle team development specificities, preventing wheel reinvention and maintainability while keeping the development agile as possible;
- example? currently there are the ETL class implementations, we could liaise with the teams and try a mixin approach;
- main goal: improve secure and fast change while keeping maintainability
development
-
clone the repository:
git clone git@github.com:jtviegas/simplepipe
-
open VSCode in the repository folder
-
check the operations sequence in the
cicd(see.github/workflows/cicd.yml) pipeline using thehelper.shbash script -
if you can run all up to
./helper.sh buildthen your system is ready to develop -
the test coverage is checked for 100%
-
to publish a new version to PyPi, bump it up in
pyproject.tomlto versionX, push the code, and then tag it with the latest commit hash using the./helper.shscript as./helper.sh tag X <COMMIT_HASH>
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tgedr_simplepipe-0.0.11.tar.gz.
File metadata
- Download URL: tgedr_simplepipe-0.0.11.tar.gz
- Upload date:
- Size: 16.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b39eb6bbb1c7fe9af68f6900335175dd42c6f0fcd7f9effcea2dc4c55bc8b42
|
|
| MD5 |
cf2e89cd1659cdc334e905de7702c87b
|
|
| BLAKE2b-256 |
c04e2e5da0d378a18f69146f367f5dc5ef2ff8552f84fc68b47a87c267919d1f
|
File details
Details for the file tgedr_simplepipe-0.0.11-py3-none-any.whl.
File metadata
- Download URL: tgedr_simplepipe-0.0.11-py3-none-any.whl
- Upload date:
- Size: 18.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
33d5b60bd256d7a02e5591a53c07268a290868e19b53e2f58a22a9774d8715e0
|
|
| MD5 |
b468ff34a2f1bb1ca2bbc4acf28f05e3
|
|
| BLAKE2b-256 |
45014afbd24e210c853927975c631bad13843618d12e7304e8c35990f5a083ab
|