A library to facilitate the testing of data inside data pipelines. Results are pushed to a messaging queue of some sort for consumption by applications, persistence, etc.
Project description
dtest
A library to facilitate the testing of data inside data pipelines. Results are pushed to a messaging queue of some sort for consumption by applications, persistence, etc.
Supported messaging queues / streaming platforms
- RabbitMQ
- MQTT
- Redis
- Kafka
- Kinesis
Installation
pip3 install dtest-framework
Unit Tests
Testing is set up using Pytest
Install Pytest with pip3 install -U pytest
Run the tests with pytest
in the root directory.
Circle CI
There is a .circleci/config.yml
file that will execute the build and the unit tests against Python 3.6.
Quick Start
from dtest.dtest import Dtest
from hamcrest import *
# If publishing to a RabbitMQ queue, specify 'queue' \
# If publishing to a key-value store, specify 'kv-store' \
# Or specify both
connectionConfig = {
"queue": {
"host": "localhost",
"username": "guest",
"password": "guest",
"exchange": "test.dtest",
"exchange_type": "fanout"
},
"kv-store": {
"api_url": "localhost:8080/api/",
"retrieve_path": "getKeyValue/",
"publish_path": "postKeyValue/"
}
}
metadata = {
"description": "This is a test of the assertCondition",
"topic": "test.dtest",
"ruleSet": "Testing some random data",
"dataSet": "random_data_set_123912731.csv"
}
dt = Dtest(connectionConfig, metadata)
dsQubert = [0,1]
dt.assert_that(dsQubert, has_length(2))
// True
dt.publish()
// Publishes test suite to MQ server
////////////////////////////////////////
// Store value in KV store for later use
dt.publishKeyValue('some-descriptor-dsQubert-length', len(dqQubert))
// Retrieve value from KV store to compare other files against
avg_count = dt.retrieveKeyValue('some-descriptor-dsQubert-length')
dt.assert_that(dsQubert, has_length(avg_count))
Custom handlers
It is possible to create custom message queue and key value store handlers. Implement a class that inherits from dtest.handler.MqHandler
or dtest.handler.KvHandler
depending on your needs.
class MqHandler:
@classmethod
def version(self): return "1.0"
@abstractmethod
def connect(self): raise NotImplementedError
@abstractmethod
def publishResults(self): raise NotImplementedError
@abstractmethod
def closeConnection(self): raise NotImplementedError
class KvHandler:
@classmethod
def version(self): return "1.0"
@abstractmethod
def retrieve(self): raise NotImplementedError
@abstractmethod
def publish(self): raise NotImplementedError
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for dtest_framework-0.1.16-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bf284a173d406e4e3abb6165f430588a6a889605f7f87f07c055737da9d9616b |
|
MD5 | 14f7b2dec9d7c176849843d556bee142 |
|
BLAKE2b-256 | f5618bf8e2fb32774d9160448c1ba075b6dff22cc95354edc301dfa328a5ed22 |