Skip to main content

th2_data_services

Project description

Table of Contents

1. Introduction

This repository is a library for creating th2-data-services applications.

Data Services is a tool for analyzing stream data from "Report Data Provider" via aggregate operations. The tool allows the user to manipulate the workflow to analyze the required data.

Current capabilities:

  • Filtering stream data
  • Transforming stream data

2. Getting started

2.1. Installation

  • From PyPI (pip)
    This package can be found on PyPI.

    pip install th2-data-services
    
  • From Source

    git clone https://github.com/th2-net/th2-data-services
    pip install th2-data-services/
    

2.2. Example

A good, short example is worth a thousand words.

This example works with Events, but you also can do the same actions with Messages.

The following example as a file.

from collections import Generator
from datetime import datetime
from typing import Tuple, List, Optional
from th2_data_services import Data
from th2_data_services.events_tree import EventsTree
from th2_data_services.provider.v5.data_source.http import HTTPProvider5DataSource
from th2_data_services.provider.v5.commands import http
from th2_data_services.filter import Filter
from th2_data_services.provider.v5.events_tree import EventsTreeCollectionProvider5, ParentEventsTreeCollectionProvider5


# [1] Create DataSource object to connect to rpt-data-provider.
DEMO_HOST = "10.64.66.66"  # th2-kube-demo  Host port where rpt-data-provider is located.
DEMO_PORT = "30999"  # Node port of rpt-data-provider.
data_source = HTTPProvider5DataSource(f"http://{DEMO_HOST}:{DEMO_PORT}")

START_TIME = datetime(
    year=2021, month=6, day=17, hour=9, minute=44, second=41, microsecond=692724
)  # object given in utc format
END_TIME = datetime(year=2021, month=6, day=17, hour=12, minute=45, second=49, microsecond=28579)

# [2] Get events or messages from START_TIME to END_TIME.
# [2.1] Get events.
events: Data = data_source.command(
    http.GetEvents(
        start_timestamp=START_TIME,
        end_timestamp=END_TIME,
        attached_messages=True,
        # Use Filter class to apply rpt-data-provider filters.
        filters=[Filter("name", "ExecutionReport"), Filter("type", "Send message")],
    )
)

# [2.2] Get messages.
messages: Data = data_source.command(
    http.GetMessages(
        start_timestamp=START_TIME,
        attached_events=True,
        stream=["demo-conn2"],
        filters=Filter("body", "195"),
    )
)

# [3] Work with your Data object.
# [3.1] Filter.
filtered_events: Data = events.filter(lambda e: e["body"] != [])  # Filter events with empty body.


# [3.2] Map.
def transform_function(record):
    return {"eventName": record["eventName"], "successful": record["successful"]}


filtered_and_mapped_events = filtered_events.map(transform_function)

# [3.3] Data pipeline.
#       Instead of doing data transformations step by step you can do it in one line.
filtered_and_mapped_events_by_pipeline = events.filter(lambda e: e["body"] != []).map(transform_function)

# Content of these two Data objects should be equal.
assert list(filtered_and_mapped_events) == list(filtered_and_mapped_events_by_pipeline)

# [3.4] Sift. Skip the first few items or limit them.
events_from_11_to_end: Generator = events.sift(skip=10)
only_first_10_events: Generator = events.sift(limit=10)

# [3.5] Changing cache status.
events.use_cache(True)

# [3.6] Walk through data.
for event in events:
    # Do something with event (event is a dict).
    print(event)
# After first iteration the events has a cache file.
# Now they will be used the cache in following iteration.

# [3.7] Get number of the elements in the Data object.
number_of_events = events.len

# [3.8] Check that Data object isn't empty.
# The data source should be not empty.
assert events.is_empty is False

# [3.9] Convert Data object to the list of elements(events or messages).
# Be careful, this can take too much memory.
events_list = list(events)

# [3.10] Get event/message by id.
desired_event = "9ce8a2ff-d600-4366-9aba-2082cfc69901:ef1d722e-cf5e-11eb-bcd0-ced60009573f"
desired_events = [
    "deea079b-4235-4421-abf6-6a3ac1d04c76:ef1d3a20-cf5e-11eb-bcd0-ced60009573f",
    "a34e3cb4-c635-4a90-8f42-37dd984209cb:ef1c5cea-cf5e-11eb-bcd0-ced60009573f",
]
desired_message = "demo-conn1:first:1619506157132265837"
desired_messages = [
    "demo-conn1:first:1619506157132265836",
    "demo-conn1:first:1619506157132265833",
]

data_source.command(http.GetEventById(desired_event))  # Returns 1 event (dict).
data_source.command(http.GetEventsById(desired_events))  # Returns 2 events list(dict).

data_source.command(http.GetMessageById(desired_message))  # Returns 1 message (dict).
data_source.command(http.GetMessagesById(desired_messages))  # Returns 2 messages list(dict).

# [3.11] The cache inheritance.
# Creates a new Data object that will use cache from the events Data object.
events_with_batch = events.filter(lambda record: record.get("batchId"))

# New Data objects don't use their own cache by default but use the cache of the parent Data object.
# Use use_cache method to activate caching. After that, the Data object will create its own cache file.
events_with_batch.use_cache(True)

list(events_with_batch)

events_types_with_batch = events_with_batch.map(lambda record: {"eventType": record.get("eventType")})

events_without_types_with_batch = events_types_with_batch.filter(lambda record: not record.get("eventType"))
events_without_types_with_batch.use_cache(True)

# [4] Working with EventsTree and EventsTreeCollection.
# [4.1] Building the EventsTreeCollection.

# If you don't specify data_source for the tree then it won't recover detached events.
collection = EventsTreeCollectionProvider5(events)

# Detached events isn't empty.
assert collection.detached_events

collection = EventsTreeCollectionProvider5(events, data_source=data_source)
# Detached events are empty because they were recovered.
assert not collection.detached_events

# The collection has EventsTrees each with a tree of events.
# Using Collection and EventsTrees, you can work flexibly with events.

# [4.1.1] Get leaves of all trees.
leaves: Tuple[dict] = collection.get_leaves()

# [4.1.2] Get roots ids of all trees.
roots: List[str] = collection.get_roots_ids()

# [4.1.3] Find an event in all trees.
find_event: Optional[dict] = collection.find(lambda event: "Send message" in event["eventType"])

# [4.1.4] Find all events in all trees. There is also iterable version 'findall_iter'.
find_events: List[dict] = collection.findall(lambda event: event["successful"] is True)

# [4.1.5] Find an ancestor of the event.
ancestor: Optional[dict] = collection.find_ancestor(
    "8bbe3717-cf59-11eb-a3f7-094f904c3a62", filter=lambda event: "RootEvent" in event["eventName"]
)

# [4.1.6] Get children of the event. There is also iterable version 'get_children_iter'.
children: Tuple[dict] = collection.get_children("814422e1-9c68-11eb-8598-691ebd7f413d")

# [4.1.7] Get subtree for specified event.
subtree: EventsTree = collection.get_subtree("8e23774d-cf59-11eb-a6e3-55bfdb2b3f21")

# [4.1.8] Get full path to the event.
# Looks like [ancestor_root, ancestor_level1, ancestor_level2, event]
event_path: List[dict] = collection.get_full_path("8e2524fa-cf59-11eb-a3f7-094f904c3a62")

# [4.1.9] Get parent of the event.
parent = collection.get_parent("8e2524fa-cf59-11eb-a3f7-094f904c3a62")

# [4.1.10] Append new event to the collection.
collection.append_event(
    event={
        "eventId": "a20f5ef4-c3fe-bb10-a29c-dd3d784909eb",
        "parentEventId": "8e2524fa-cf59-11eb-a3f7-094f904c3a62",
        "eventName": "StubEvent",
    }
)

# [4.1.11] Show the entire collection.
collection.show()

# [4.2] Working with the EventsTree.
# EventsTree has the same methods as EventsTreeCollection, but only for its own tree.

# [4.2.1] Get collection trees.
trees: List[EventsTree] = collection.get_trees()
tree: EventsTree = trees[0]

# But EventsTree provides a work with the tree, but does not modify it.
# If you want to modify the tree, use EventsTreeCollections.

# [4.3] Working with ParentlessTree.
# ParentlessTree is EventsTree which has detached events with stubs.
parentless_trees: List[EventsTree] = collection.get_parentless_trees()

# [4.4] Working with ParentEventsTreeCollection.
# ParentEventsTreeCollection is a tree like EventsTreeCollection but it has only events that have references.
collection = ParentEventsTreeCollectionProvider5(events, data_source=data_source)

collection.show()

2.3. Theory

The library provides stream data and some tools for data manipulation.

What’s the definition of a stream?
A short definition is "a sequence of elements from a source that supports aggregate operations."

Terms

  • Data object: An object of Data class which is wrapper under stream.

  • Sequence of elements: A Data object provides an interface to a sequenced set of values of a specific element type. Stream inside the Data object don’t actually store elements; they are computed on demand.

  • DataSource: Streams consume from a data-providing source (Report Data Provider) but it also can be collections, arrays, or I/O resources. DataSource object provides connection to th2-rpt-provider or read csv files from cradle-viewer.

  • Aggregate operations: Common operations such as filter, map, find and so on.

Data caching

The Data object provides the ability to use cache. The cache works for each Data object, that is, you choose which Data object you want to save. The Data object cache is saved after the first iteration, but the iteration source may be different.

If you don't use the cache, your source will be the data source you have in the Data Object. But if you use cache, your source can be the data source, the parent cache, or own cache:

  • The data source: If the "Data Object" doesn't have a parent cache and its cache.
  • The parent cache: If the "Data Object" has a parent cache. It doesn't matter what position the parent cache has in inheritance. "Data Object" understands whose cache it is and executes the part of the workflow that was not executed.
  • The own cache: If it is not the first iteration of this Data object.

Note that the cache state of the Data object is not inherited.

Stream operations

Stream operations have two fundamental characteristics that make them very different from collection operations: Pipelining and Internal iteration.

Pipelining

Many stream operations return a stream themselves. This allows operations to be chained to form a larger pipeline.

Data stream pipeline

Internal iteration

In contrast to collections, which are iterated explicitly (external iteration), stream operations do the iteration behind the scenes for you. Note, it doesn't mean you cannot iterate the Data object.

EventsTree and collections

EventsTree

EventsTree is a tree-based data structure of events. It allows you get children and parents of event, display tree, get full path to event etc.

Details:

  • EventsTree contains all events in memory.
  • To reduce memory usage an EventsTreeCollection delete the 'body' field from events, but you can preserve it specify 'preserve_body'.
  • Tree has some important terms:
    1. Ancestor is any relative of the event up the tree (grandparent, parent etc.).
    2. Parent is only the first relative of the event up the tree.
    3. Child is the first relative of the event down the tree.

Take a look at the following HTML tree to understand them.

 <body> <!-- ancestor (grandparent), but not parent -->
     <div> <!-- parent & ancestor -->
         <p>Hello, world!</p> <!-- child -->
         <p>Goodbye!</p> <!-- sibling -->
     </div>
 </body>

Collections

EventsTreeCollection is a collection of EventsTrees. The collection builds a few EventsTree by passed Data object. Although you can change the tree directly, it's better to do it through collections because they are aware of detached_events and can solve some events dependencies. The collection has similar features like a single EventsTree but applying them for all EventsTrees.

ParentEventsTreeCollection is a collection similar to EventsTreeCollection but containing only parent events that are referenced in the data stream. It will be working data in the collection and trees of collection. The collection has features similar to EventsTreeCollection.

Details:

  • The collection has a feature to recover events. All events that are not in the received data stream, but which are referenced will be loaded from the data source.
  • If you haven't passed a DataSource object then the recovery of events will not occur.
  • You can take detached_events to see which events are missing. It looks like {parent_id: [events are referenced]}
  • If you want, you can build parentless trees where the missing events are stubbed instead. Just use get_parentless_trees().

Requirements:

  1. Events have to have event_name, event_id, parent_event_id fields, which are described in the passed event_struct object.

Hints

  • Remove all unnecessary fields from events before passing to a collection to reduce memory usage.
  • Use show() method to print the tree in tree-like view.
  • Note that the get_x methods will raise an exception if you pass an unknown event id, unlike the find_x methods (they return None).
  • If you want to know that specified event exists, use the python in keyword (e.g. 'event-id' in events_tree).
  • Use the python len keyword to get events number in the tree.

2.4. Links

3. API

Documentation

4. Examples

4.1. Notebooks

4.2. *.py

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

th2_data_services-1.0.0.dev2034352673.tar.gz (43.8 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page