Skip to main content

th2_data_services

Project description

Table of Contents

1. Introduction

This repository is a library for creating th2-data-services applications.

Data Services is a tool for analyzing stream data from "Report Data Provider" via aggregate operations. The tool allows the user to manipulate the workflow to analyze the required data.

Current capabilities:

  • Filtering stream data
  • Transforming stream data

2. Getting started

2.1. Installation

  • From PyPI (pip)
    This package can be found on PyPI.

    pip install th2-data-services
    
  • From Source

    git clone https://github.com/th2-net/th2-data-services
    pip install th2-data-services/
    

2.2. Example

A good, short example is worth a thousand words.

This example works with Events, but you also can do the same actions with Messages.

The same example in the file.

from th2_data_services.data_source import DataSource
from th2_data_services.data import Data
from datetime import datetime

# [1] Create DataSource object to connect to rpt-data-provider.
DEMO_HOST = "10.64.66.66"  # th2-kube-demo  Host port where rpt-data-provider is located.
DEMO_PORT = "30999"  # Node port of rpt-data-provider.
data_source = DataSource(F"http://{DEMO_HOST}:{DEMO_PORT}")

START_TIME = datetime(year=2021, month=6, day=17, hour=12, minute=44, second=41, microsecond=692724)
END_TIME = datetime(year=2021, month=6, day=17, hour=15, minute=45, second=49, microsecond=28579)

# [2] Get events from START_TIME to END_TIME.
events: Data = data_source.get_events_from_data_provider(
    startTimestamp=START_TIME,
    endTimestamp=END_TIME,
    metadataOnly=False,
    attachedMessages=True,
)

# [3] Work with your Data object.
# [3.1] Filter.
filtered_events: Data = events.filter(lambda e: e['body'] != [])  # Filter events with empty body.

# [3.2] Map.
def transform_function(record):
    return {
        "eventName": record["eventName"],
        "successful": record["successful"]
    }

filtered_and_mapped_events = filtered_events.map(transform_function)

# [3.3] Data pipeline.
#       Instead of doing data transformations step by step you can do it in one line.
filtered_and_mapped_events_by_pipeline = events\
                                            .filter(lambda e: e['body'] != [])\
                                            .map(transform_function)

# Content of these two Data objects should be equal.
assert list(filtered_and_mapped_events) == list(filtered_and_mapped_events_by_pipeline)

# [3.4] Sift. Skip the first few items or limit them.
events_from_11_to_end: Data = events.sift(skip=10)
only_first_10_events: Data = events.sift(limit=10)

# [3.5] Walk through data.
for event in events:
    # Do something with event (event is a dict).
    print(event)

# [3.6] Get number of the elements in the Data object.
number_of_events = len(events)

# [3.7] Convert Data object to the list of elements(events or messages).
# Be careful, this can take too much memory.
events_list = list(events)

# [3.8] Get event/message by id.
desired_event = '9ce8a2ff-d600-4366-9aba-2082cfc69901:ef1d722e-cf5e-11eb-bcd0-ced60009573f'
desired_events = [
    'deea079b-4235-4421-abf6-6a3ac1d04c76:ef1d3a20-cf5e-11eb-bcd0-ced60009573f',
    'a34e3cb4-c635-4a90-8f42-37dd984209cb:ef1c5cea-cf5e-11eb-bcd0-ced60009573f',
]
desired_message = 'demo-conn1:first:1619506157132265837'
desired_messages = [
    'demo-conn1:first:1619506157132265836',
    'demo-conn1:first:1619506157132265833',
]

data_source.find_events_by_id_from_data_provider(desired_event)  # Returns 1 event (dict).
data_source.find_events_by_id_from_data_provider(desired_events)  # Returns 2 events list(dict).

data_source.find_messages_by_id_from_data_provider(desired_message)  # Returns 1 message (dict).
data_source.find_messages_by_id_from_data_provider(desired_messages)  # Returns 2 messages list(dict).

2.3. Theory

The library provides stream data and some tools for data manipulation.

What’s the definition of a stream?
A short definition is "a sequence of elements from a source that supports aggregate operations."

  • Data object: An object of Data class which is wrapper under stream.
  • Sequence of elements: A Data object provides an interface to a sequenced set of values of a specific element type. Stream inside the Data object don’t actually store elements; they are computed on demand.
  • DataSource: Streams consume from a data-providing source (Report Data Provider) but it also can be collections, arrays, or I/O resources. DataSource object provides connection to th2-rpt-provider or read csv files from cradle-viewer.
  • Aggregate operations: Common operations such as filter, map, find and so on.

Furthermore, stream operations have two fundamental characteristics that make them very different from collection operations:

  • Pipelining: Many stream operations return a stream themselves. This allows operations to be chained to form a larger pipeline.

Data stream pipeline

  • Internal iteration: In contrast to collections, which are iterated explicitly (external iteration), stream operations do the iteration behind the scenes for you. Note, it doesn’t mean you cannot iterate the Data object.

2.4. Links

3. API

Documentation

4. Examples

4.1. Notebooks

4.2. *.py

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

th2_data_services-0.5.0.dev1294968162.tar.gz (17.1 kB view details)

Uploaded Source

File details

Details for the file th2_data_services-0.5.0.dev1294968162.tar.gz.

File metadata

  • Download URL: th2_data_services-0.5.0.dev1294968162.tar.gz
  • Upload date:
  • Size: 17.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for th2_data_services-0.5.0.dev1294968162.tar.gz
Algorithm Hash digest
SHA256 8eef26b1a4b12c2d297f3818971d1ca7ee57a48d191089e1f794bafdc596f1ac
MD5 5c3e8b8eba91bb175f68f33e61909a46
BLAKE2b-256 f715373f68ae06f51771c5dae6a13684b23d14e121a2623bd66026b2dc6cefc9

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page