Skip to main content

A tool for end2end data tests

Project description

DDataFlow

DDataFlow is an end2end tests and local development solution for machine learning and data pipelines using pyspark. It samples the data as an approach to get slow pipelines run fast in the CI.

You can find our documentation in the docs folder. And see the complete code reference here.

Features

  • Read a subset of our data so to speed up the running of the pipelines during tests
  • Write to a test location our artifacts so you don't pollute production
  • Download data for enabling local machine development

Enables to run on the pipelines in the CI

1. Install Ddataflow

pip install ddataflow 

ddataflow --help will give you an overview of the available commands.

Getting Started (<5min Tutorial)

This tutorial aims to show you the core features though, for the complete reference see the integration manual in the docs.

1. Setup some synthetic data

See the examples folder.

2. Create a ddataflow_config.py file

The command ddtaflow setup_project creates a file like this for you.

from ddataflow import DDataflow

config = {
    # add here your tables or paths with customized sampling logic
    "data_sources": {
        "demo_tours": {
            "source": lambda spark: spark.table('demo_tours'),
            "filter": lambda df: df.limit(500)
        }
        "demo_locations": {
            "source": lambda spark: spark.table('demo_locations'),
            "default_sampling": True,
        }
    },
    "project_folder_name": "ddataflow_demo",
}

# initialize the application and validate the configuration
ddataflow = DDataflow(**config)

3. Use ddataflow in a pipeline

# filename: pipeline.py
from pyspark.sql import SparkSession
from ddataflow_config import ddataflow

spark = SparkSession.builder.getOrCreate()

# register the tables to mimick a real environment 
# when you use ddatflow for real you will have your production tables in place already
spark.read.parquet("/tmp/demo_locations.parquet").registerTempTable("demo_locations")
spark.read.parquet("/tmp/demo_tours.parquet").registerTempTable("demo_tours")

# pyspark code using a different source name
total_locations = spark.table(ddataflow.name('demo_locations')).count()
# sql code also works
total_tours = spark.sql(f""" SELECT COUNT(1) from {ddataflow.name('demo_tours')}""").collect()[0]['count(1)']
print("Totals follow below:")
print({
    "total_locations": total_locations,
    "total_tours": total_tours,
})

Now run it twice and observe the difference in the amount of records: python pipeline.py

ENABLE_DDATAFLOW=True python pipeline.py

You will see that the dataframes are sampled when ddataflow is enabled and full when the tool is disabled.

You completed the short demo!

How to develop

The recommended approach to use ddataflow is to use the offline mode, which allows you to test your pipelines without the need for an active cluster. This is especially important for development and debugging purposes, as it allows you to quickly test and identify any issues with your pipelines.

Alternatively, you can use Databricks Connect to test your pipelines on an active cluster. However, our experience with this approach has not been great, memory issues are common and there is the risk of overriding production data, so we recommend using the offline mode instead.

If you have any questions or need any help, please don't hesitate to reach out. We are here to help you get the most out of ddataflow.

Support

In case of questions feel free to reach out or create an issue.

Check out our FAQ in case of problems

Contributing

This project requires manual release at the moment. See the docs and request a pypi access if you want to contribute.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ddataflow-1.1.11.tar.gz (16.3 kB view details)

Uploaded Source

Built Distribution

ddataflow-1.1.11-py3-none-any.whl (19.7 kB view details)

Uploaded Python 3

File details

Details for the file ddataflow-1.1.11.tar.gz.

File metadata

  • Download URL: ddataflow-1.1.11.tar.gz
  • Upload date:
  • Size: 16.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.10.6 Linux/5.15.0-1034-azure

File hashes

Hashes for ddataflow-1.1.11.tar.gz
Algorithm Hash digest
SHA256 8c2ef1deb5c79503a24d4a7e6469ab04963115def3b95897d60334032f6fa57a
MD5 5b639c5b071457a75da627ad14b4d670
BLAKE2b-256 976cd957f6b5b1eaa33f61663272cac3e5b17fa62e1f0f498259e26a79b959f9

See more details on using hashes here.

Provenance

File details

Details for the file ddataflow-1.1.11-py3-none-any.whl.

File metadata

  • Download URL: ddataflow-1.1.11-py3-none-any.whl
  • Upload date:
  • Size: 19.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.10.6 Linux/5.15.0-1034-azure

File hashes

Hashes for ddataflow-1.1.11-py3-none-any.whl
Algorithm Hash digest
SHA256 614b491a2d345c840ca8420fbaff05a4445f6378549226b2c4525f7bdaae69f5
MD5 a4cd2efea60338edb2c2877c0c10dc69
BLAKE2b-256 127567a704fb4a2d9fa1056522e8516ecc27c24fc2c8b34aed617b736b8ca464

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page