Skip to main content

this is an example of a simple data pipeline

Project description

tgedr-simplepipe

[[TOC]]

data pipeline use case: ticker news and its sentiment

We want to create a data pipeline that retrieves news about a specific set of tickers and applies an NLP model defining the sentiment of each news article.

graph TD
    Start([Start]) --> Process1[fetch news
    create articles]
    Process1 --> Process2[sentiment analysis
    data quality validation
    append data]
    Process2 --> End([End])

Tasks

  • get news
    • currently financial news in the last 24 hours about tickers NVDA and ARM
  • apply sentiment + validate data + store data

The solution involves

  • business logic
    • the news extraction feature is created in this project module src/tgedr/simplepipe/news for simplicity, in an organisation setting we can think of this as something provided by a different team, having ownership on this business logic this team could eventually package this feature in a separate library that we could consume here in the pipeline;
    • the sentiment analysis is tipically owned by a data science/ML team, that can provide this feature in a separate library. In this pipeline we consume an external public library for the NLP sentiment analysis implementation, even if not from an internal team it ends being a simple example of what can be achieved with this rationale (see src/tgedr/simplepipe/etl/sentiment_etl.py);
  • business logic coupling
    • business logic features should work as pure functions as much as possible, it should be provided with the data and tools required to perform its transformation but kept uncoupled from how and where it is provided;
    • pipeline tasks enclosing business logic are defined as implementations of an ETL abstract class (see src/tgedr/simplepipe/etl/etl.py). The main purpose of this abstract class is to provide scaffolding for any kind of data transformation, whether its data science or pure data engineering. (check the etl implementations: src/tgedr/simplepipe/etl/news_etl.py and src/tgedr/simplepipe/etl/sentiment_etl.py)
  • data quality validation - Great Expectations library provides solutions for data quality validation, check the component in src/tgedr/simplepipe/utils/validation/data_validation.py and its pandas specific implementation (src/tgedr/simplepipe/utils/validation/pandas_validation.py) that uses a legacy GE library to easily validate data expectations against a json specification;
  • data storage: for simplicity the data is being stored in the repository runtime/data/ folder and there is a persistence Store abstract component with a specific parquet implementation (src/tgedr/simplepipe/store/parquet_store.py) that is used to persist the data

The pipeline

  • is defined by a sequence of tasks in a github actions pipeline named execution (see .github/workflows/execution.yml) that can be triggered manually in the repository page
  • tasks implementation are bundled in a library published to PyPi
  • the tasks are invoked in the execution pipeline using the library entrypoint that allows to parameterize the module, class and params to be used:
    .venv/bin/run --module tgedr.simplepipe.etl.news_etl \
          --classname NewsEtl \
          --callable run \
          --classparams "{\"configuration\": {\"tickers\": \"NVDA,ARM\"}}" 
    
  • the final step in the pipeline run is to commit and push the updated data back to the repository

Further improvements to be made (regardless of simplistic approach):

  • storage:
    • currently data is being appended and there will be duplicates if the pipeline is run in overlapping time windows
    • data contracts implementation
  • observability: should send telemetry to a collector somewhere, the integration of logger, metrics and tracer should be abstracted in the ETL component with a conventional interface (OpenTelemetry)
  • coupling ds/ml team code and ETL component
    • this needs collaboration to create the simplest possible solution that provides abstraction and does not stiffle team development specificities, preventing wheel reinvention and maintainability while keeping the development agile as possible;
    • example? currently there are the ETL class implementations, we could liaise with the teams and try a mixin approach;
    • main goal: improve secure and fast change while keeping maintainability

development

  • clone the repository:

    git clone git@github.com:jtviegas/simplepipe
    
  • open VSCode in the repository folder

  • check the operations sequence in the cicd (see .github/workflows/cicd.yml) pipeline using the helper.sh bash script

  • if you can run all up to ./helper.sh build then your system is ready to develop

  • the test coverage is checked for 100%

  • to publish a new version to PyPi, bump it up in pyproject.toml to version X, push the code, and then tag it with the latest commit hash using the ./helper.sh script as ./helper.sh tag X <COMMIT_HASH>

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tgedr_simplepipe-0.0.11.tar.gz (16.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tgedr_simplepipe-0.0.11-py3-none-any.whl (18.5 kB view details)

Uploaded Python 3

File details

Details for the file tgedr_simplepipe-0.0.11.tar.gz.

File metadata

  • Download URL: tgedr_simplepipe-0.0.11.tar.gz
  • Upload date:
  • Size: 16.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for tgedr_simplepipe-0.0.11.tar.gz
Algorithm Hash digest
SHA256 3b39eb6bbb1c7fe9af68f6900335175dd42c6f0fcd7f9effcea2dc4c55bc8b42
MD5 cf2e89cd1659cdc334e905de7702c87b
BLAKE2b-256 c04e2e5da0d378a18f69146f367f5dc5ef2ff8552f84fc68b47a87c267919d1f

See more details on using hashes here.

File details

Details for the file tgedr_simplepipe-0.0.11-py3-none-any.whl.

File metadata

  • Download URL: tgedr_simplepipe-0.0.11-py3-none-any.whl
  • Upload date:
  • Size: 18.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.26 {"installer":{"name":"uv","version":"0.9.26","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for tgedr_simplepipe-0.0.11-py3-none-any.whl
Algorithm Hash digest
SHA256 33d5b60bd256d7a02e5591a53c07268a290868e19b53e2f58a22a9774d8715e0
MD5 b468ff34a2f1bb1ca2bbc4acf28f05e3
BLAKE2b-256 45014afbd24e210c853927975c631bad13843618d12e7304e8c35990f5a083ab

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page