Skip to main content

A custom MMS Analytics module for Python3 by the Touchpoint Analytics & Data Discovery

Project description

Doku - mms-pip

Public Python module from the Omnichannel-Analytics Team of MediaMarktSaturn Technology.

!!!!!!!! ATTENTION: BETA VERSION !!!!!!!!!!

pip install mms-pip --upgrade

Topics:

  1. gcp_logger
  2. datastore_handler
  3. bq_handler
  4. redis_handler
  5. gcp_bearer_token
  6. mms_fifa
  7. gcp_secret_manager
  8. pubsub_handler
  9. kubeflow_handler

1. gcp_logger

Log Module for standardized log purposes on GCP.

How to use:

1.1 Import module:

Import the module your app is running on GCP later on - at the moment the following products are supported:

  • app_engine_logger
  • cloud_function_logger
  • cloud_run_logger
  • composer_logger
  • compute_engine_logger
  • dataproc_logger
  • kubernetes_logger
# e.g. for using the cloud run logger

from mms.logger.cloud_run_logger import CloudRunLogger

1.2 Initalize the logger:

# Cloud Run (Serverless):
logger = CloudRunLogger(project_id, local_run)
logger = CloudRunLogger(project_id='my-project', local_run=True/False)

# **IMPORTANT**
# In every API route the following methods need to be called during initialization - currently only Flask is supported for this logger: 
from flask import Flask, request

logger.update_request_header(api_request=request)
logger.update_trace_id(str(uuid.uuid4()))

# Cloud Function Logging:
logger = CloudFunctionLogger(service_name, trace_id, project_id, function_name)
logger = CloudFunctionLogger(service_name='my-service', trace_id='lksjdf2', project_id='my-project-id', function_name='ppx-price-updates-de-gcs-bq')

# App Engine Logging:
logger = AppEngineLogger(service_name, trace_id, project_id, module_id, version_id)
logger = AppEngineLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', module_id='app-flex-sample-service', version_id='v0.0.1')

# Compute Engine:
logger = ComputeEngineLogger(service_name, trace_id, project_id)
logger = ComputeEngineLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore')

# Kubernetes Engine: 
logger = KubernetesLogger(service_name, trace_id, project_id, cluster_name, container_name, location, namespace_name)
logger = KubernetesLogger(service_name='my-service', trace_id='id12345', project_id='v135-5683-alice-ksk-explore', cluster_name='jg-k8-testcluster', container_name=CONTAINER_NAME, location=ZONE, namespace_name='default')

# Dataproc:
logger = DataprocLogger(service_name, trace_id, project_id, cluster_name, location)
logger = DataprocLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', cluster_name='my-cluster', location='europe-west4')

# Cloud Composer:
logger = ComposerLogger(service_name, trace_id, project_id, environment_name, location)
logger = ComposerLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', environment_name='my-composer-environment', location='europe-west4')

1.3 Use the logger:

logger.info('your message')
logger.warning('your message')
logger.error('your message')
logger.critical('your message')
logger.debug('your message')

Cloud Run logger -> you have the possibility to add custom attributes to the log entry just add them to the function call

logger.info('your message', custom_attribute="test")
logger.warning('your message', custom_attribute=test)
logger.error('your message', custom_attribute=test)
logger.critical('your message', custom_attribute=test)
logger.debug('your message', custom_attribute=test)

The logs are visible in Stackdriver Logging via:

  • GAE Application -> Module_id -> Version_id for App Engine.
  • Or under Cloudfunctions -> Function_id
  • Or under GCE VM Instance -> Instance_id
  • Or under Kubernetes Container -> cluster_name -> namespace_name -> container_name
  • Or under Cloud Run Revision -> service_name -> revision_name
  • Or under Global

Important

This log tool only works in App Engine Standard/Flexible, Cloud Function, Compute Engine and Kubernetes, Cloud Run (Serverless) environment.

For local testing please set the boolean flag 'local_run' during initialization to 'True'

1.4 How we log

We initialize the logger only in the "app.py" file. From there every log entry will be written - Modules used within app.py need to return the exceptions to the caller so error etc. get logged at one central point within app.py.

2. datastore_handler

Python module for easy interacting with Google Cloud Datastore (DS)

How to use:

2.1 Initialize the datastore handler:

from mms.datastore_handler import Datastore

ds_handler = Datastore("your-projectid")

2.2 Use the Datastore handler:

You can set/get an id or a name in Datastore. For this package if you specify the parameter name as a string, than it's a name in Datastore. If you specify the parameter name as an integer, than it's an id in Datastore.

Creating a new Entity:

ds_handler.put_new_entity(kind='kind_name', prop_df={'property1': 1, 'property2'='Test String'}, name='The Name/ID of the Entity', exclude_from_indexes=("property1", "property2"))

kind and prop_df is required, name and exclude_from_indexes is optional (Google generates an ID for you, by default all fields are indexed)

Getting and deleting Entity:

result_entitiy_dict = ds_handler.get_entity(kind='kind_name', name='The Name/ID of the Entity')
ds_handler.delete_entity(kind='kind_name', name='The Name/ID of the Entity')

Updating Entity:

ds_handler.update_entity(kind='kind_name', name='The Name/ID of the Entity', prop_df={'property1': 1, 'property2'='Test String'},  exclude_from_indexes=("property1", "property2"))

You cannot update specific properties of an entity. The Updates must include all properties that should be persisted. Whenever you need to do an update, you need to first retrieve the existing entity as a whole, then update one or more properties by setting new values on prop_df and update the entity.

Get all Entities of a specific kind:

result = ds_handler.get_all_of_a_kind('kind_name')

Query Datastore:

  • With only one Filter:

    with_one_filter = ["property1", "=", 1]
    result = ds_handler.query(kind="TestKind1", filter=with_one_filter, limit=None, extract_results=True)
    
  • With two or more Filter:

    with_more_filter = [["property1", "=", 1], ["property3", "<=", 3.33]]
    result = ds_handler.query(kind="TestKind1", filter=with_more_filter)
    

limit (int, how many entities you want to receive) and extract_results (bool, true: list as result, false: generator object result) are optional. Default values: limit=None, extract_results=True.

3. bq_handler

Python module for easy interacting with Google Cloud BigQuery (BQ)

3.1 Initialize the BigQuery handler:

There are two Options:

  • Without default values:

    from mms.bq_handler import BQ
    
    bq_handler = BQ("your-projectid")
    
  • With default values (i. e. if you only work with one dataset or table):

    from mms.bq_handler import BQ
    
    bq_handler = BQ("your-projectid", dataset_id='default_dataset_id', table_id='default_table_id')
    

    Every time a dataset_id and/or a table_id is not specified in a specific method of the BQ class, the method will use the default dataset_id and/or table_id of the bq_handler.

You can also specify cred_file_path (credential file path) if you don't want to use your default credentials from your gcloud sdk.

3.2 Use the BigQuery handler:

Creating a new Dataset:

# With default values from bq_handler
bq_handler.create_dataset()

# Without defaults:
bq_handler.create_dataset('test_dataset_id')

Creating a new Table (dataset have to exists):

from google.cloud import bigquery
SCHEMA = [bigquery.SchemaField('full_name', 'STRING', mode='required', description="Visitor's Name"),
          bigquery.SchemaField('visit_time', 'TIMESTAMP', mode='required', description="Visit Time"),
          bigquery.SchemaField('visit_length', 'INT64', mode='required', description="Length of Visit in Seconds"),
          bigquery.SchemaField('sentiment', 'FLOAT64', mode='required', description="Calculated Happiness Score")]

# With default values from bq_handler
bq_handler.create_table(SCHEMA)

# Without defaults:
bq_handler.create_table(SCHEMA, dataset_id='test_dataset_id', table_id='test_table_id')

# You can also create a table with day partitioning and/or with clustering:
bq_handler.create_table(SCHEMA, dataset_id='test_dataset_id', table_id='test_table_id', partitioning_field='_PARTITIONTIME', require_partition_filter=True, clustering_fields=['visit_time'])
# _PARTITIONTIME is the default partitioning of BigQuery

Checking if a table exists or not (ATTENTION: See source code.):

# With default values from bq_handler
check = bq_handler.check_if_table_exists()

# Without defaults:
check = bq_handler.check_if_table_exists(dataset_id='test_dataset_id', table_id='test_table_id')

# if check is True -> table exists

Getting a schema of an existing table:

# With default values from bq_handler
schema = bq_handler.get_schema()

# Without defaults:
schema = bq_handler.get_schema(dataset_id='test_dataset_id', table_id='test_table_id')

Streaming a row into an existing BQ table (append to table):

# Create Dictionary:  
data = {'full_name': 'Max Mustermann', 'visit_time': '2019-07-23 13:45:07.372826 UTC', 'visit_length': 100, 'sentiment': 1.111}

# With default values from bq_handler
errors = bq_handler.streaming_insert_single_json(data)

# Without defaults:
errors = bq_handler.streaming_insert_single_json(data, dataset_id='test_dataset_id', table_id='test_table_id')

Running a query and get results as a dataframe:

query = '''
SELECT * FROM `v135-5683-playground-goppold.test_dataset_id.test_table_id`
'''
df = bq_handler.run_query(query)

4. redis_handler

Python module for easy interacting with a redis instance.

How to use:

4.1 Initialize the redis handler:

from mms.redis_handler import Redis

redis_client = Redis(host, port, password, exp_s)

4.2 Use the redis handler:

There are only three methods available.

redis_client.set_redis(set_key, set_value)
redis_client.get_redis(get_key)
redis_client.delete_key(key_to_delete)

5. gcp_bearer_token

We support 5 different ways to create a bearer token:

  1. "gcloud-sdk": from local machine via gcloud sdk:

    from mms.gcp_bearer_token import GCPBearerToken
    token_handler = GCPBearerToken("gcloud-sdk")
    token = token_handler.get_token()
    
  2. "service-account-file": from a local service-account json file:

    from mms.gcp_bearer_token import GCPBearerToken
    
    token_handler = GCPBearerToken("service-account-file",
                                    target_url="https://example-cloud-run-service-q25crkrupq-ew.a.run.app",
                                    path_to_service_account_file="./sa.json")
    token = token_handler.get_token()
    
  3. "service-account-dictionary": from a parsed service-account json file:

    from mms.gcp_bearer_token import GCPBearerToken
    
    sa_dict = {
          "type": "service_account",
          "project_id": "...",
          "...": "..."
     }
    
    token_handler = GCPBearerToken("service-account-dictionary",
                                    target_url="https://example-cloud-run-service-q25crkrupq-ew.a.run.app",
                                    service_account_dictionary=sa_dict)
    token = token_handler.get_token()
    
  4. "cloudrun-to-cloudrun": This works only from a GCP CloudRun Service to another one. (https://cloud.google.com/run/docs/authenticating/service-to-service)

    from mms.gcp_bearer_token import GCPBearerToken
    
    token_handler = GCPBearerToken("cloudrun-to-cloudrun",
                                    target_url="https://example-cloud-run-service-q25crkrupq-ew.a.run.app")
    token = token_handler.get_token()
    
  5. "compute-default": App Engine, Cloud Run, ‘Compute Engine‘, or has application default credentials set via GOOGLE_APPLICATION_CREDENTIALS (os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./sa.json") environment variable

    from mms.gcp_bearer_token import GCPBearerToken
    
    token_handler = GCPBearerToken("compute-default",
                                    target_url="https://example-cloud-run-service-q25crkrupq-ew.a.run.app")
    token = token_handler.get_token()
    

The GCPBearerToken class saves the token and the expiration of the token as instance variables. The get_token() checks at every call, if there was a token already generated and if the token is still valid. Only if not, it generates a new one.

6. mms_fifa

Python module for easy interacting with the mms fifa bouncer.

How to use:

6.1 Initialize the mms fifa handler:

from mms.mms_fifa import Fifa

fifa_handler = Fifa()

# Optional paramaters (with default values):
#   - jwk_url="https://auth.mediamarktsaturn.com/.well-known/jwks.json",
#   - fifa_token_url="https://auth.mediamarktsaturn.com/oauth/token",
#   - max_num_retries=5

6.2 Use the fifa handler:

Get fifa jwt token:

error, token = fifa_handler.get_jwt_token(client_id="...", client_secret="...")
# (If error is None -> everything is ok)

Verify fifa jwt token:

error, payload = fifa_handler.verify_jwt_token(jwt_token=token, audience="..")
# (If error is None -> everything is ok)

audience can be a string or a tuple of strings.

7. gcp_secret_manager

Python module for easy interacting with the GCP Secret Manager.

How to use:

First, enable the secret manager API on GCP console

7.1 Initialize the secret manager handler:

from mms.gcp_secret_manager import SecretManager

# Init secretmanager object with your the default credentials from your environment:
secretmanager_handler = SecretManager("v135-5683-playground-goppold")
# Init secretmanager object with your a service account json file - specify the path to your file:
secretmanager_handler = SecretManager("v135-5683-playground-goppold", "test.json")

7.2 Use the secret manager handler:

Create Secret Value:

# Create a secret value:
secret_name = secretmanager_handler.create_secret("my-secret")
print(secret_name)
# Create a secret with value
secret_version_name = secretmanager_handler.create_secret("my-secret-with-value", "This is my secret value")
print(secret_version_name)

By default: regions=["europe-west4"] and labels=None. Please specify this params of you need to.

Add a secret value to a existing secret:

# Add a secret value to a existing secret:
secret_version_name_1 = secretmanager_handler.add_secret_version("my-secret", "This is another secret value")
print(secret_version_name_1)
secret_version_name_2 = secretmanager_handler.add_secret_version("my-secret", "This is a new verison of my secret value")
print(secret_version_name_2)

Accessing value of a secret version:

!!!WARNING: Do not print the secret in a production environment!!!

value_1 = secretmanager_handler.access_secret_version("my-secret", 1)
print(value_1)
value_2 = secretmanager_handler.access_secret_version("my-secret", 2)
print(value_2)
value_3 = secretmanager_handler.access_secret_version("my-secret-with-value", 1)
print(value_3)

List secrets and secret versions:

# List all secrets in my project:
secrets = secretmanager_handler.list_secrets()
print(secrets)

# List all version of a specified secret:
secret_verions = secretmanager_handler.list_secret_versions("my-secret")
print(secret_verions)

Get metadata of a specific secret:

# Get metadata of a specific secret:
metadata = secretmanager_handler.get_secret("my-secret")
print(metadata.name)

Delete secrets:

# Delete a secret with the given name and all of its versions:
secretmanager_handler.delete_secret("my-secret")
secretmanager_handler.delete_secret("my-secret-with-value")

8. pubsub_handler

Python module for easy interacting with the GCP Pubsub.

How to use:

First, enable the pubsub API on GCP console

8.1 Initialize the pubsub handler:

from mms.pubsub_handler import Pubsub

pubsub_handler = Pubsub()
'''
Optional default args in Pubsub(): 
project_id: str, default project_id
publish_batch_max_messages=100
publish_batch_max_baytes=1024 (in kB)
publish_batch_max_latency=10 (in ms)
'''

8.2 Use the pubsub handler:

Create Pubsub Topic:

# Create Pubsub Topic in default project (only if specified):
topic = pubsub_handler.create_topic("test-topic")
print(topic)

# Create Pubsub Topic in other project:
topic = pubsub_handler.create_topic("test-topic", "spielwiese-tobias")
print(topic)

Create pull subscription:

# Create pull Subscription in default project (only if specified):
subscription = pubsub_handler.create_pull_subscription(subscription_name="test-subs",
                                                       topic_path="projects/playground-josef/topics/temp-test",
                                                       ack_deadline_seconds=100)
print(subscription)

# Create pull Subscription in other project:
subscription = pubsub_handler.create_pull_subscription(subscription_name="test-subs",
                                                       topic_path="projects/spielwiese-tobias/topics/test-topic",
																											 project_id="spielwiese-tobias",
                                                       ack_deadline_seconds=30)
print(subscription)

Publish messages:

# Publish messages - batch (if one bundle to publish) - optional: encoding (default="utf-8"), wait_until_all_published (default=True):
data = [{"column1": i, "column2": "value {}".format(i)} for i in range(10000)]
pubsub_handler.publish_messages(data=data,
                                topic_path="projects/playground-josef/topics/temp-test")

# Publish messages - batch (if more than one bundle to publish) - optional: encoding (default="utf-8"):
data = [{"column1": i, "column2": "value {}".format(i)} for i in range(10000)]
for i in range(10):
		pubsub_handler.publish_messages(data=data,
                                    topic_path="projects/playground-josef/topics/temp-test",
                                    wait_until_all_published=False)
pubsub_handler.wait_for_publishing()

# Publish singe message - waiting until message is published - optional: encoding (default="utf-8")
data = {"column1": 1, "column2": "value 1"}
message_id = pubsub_handler.publish_message(data=data, topic_path="projects/playground-josef/topics/temp-test")
print(message_id)

9. kubeflow_handler

Can be used to load kubelflow pipeline components from a file placed in a specific GCS Bucket

9.1 init handler

kf = KubeFlowHandler()

9.2 load component from file

Returns components from kfp.v2

component = kf.load_kfp_component(filename: str, bucket_id: str = 'tadd-kubeflow-components')

TODOs:

  • complete and improve README
  • Add Docstrings to Python scripts

CHANGELOG

see changelog.txt


Josef Goppold, Tobias Hoke - 2021-07-20

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mms-pip-0.8.4.7.tar.gz (24.9 kB view hashes)

Uploaded Source

Built Distribution

mms_pip-0.8.4.7-py3-none-any.whl (26.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page