Skip to main content

A custom MMS Analytics module for Python3 by the Data Access Team

Project description

mms-pip

Public Python module from the Data-Access Team of MediaMarktSaturn Technology.

!!!!!!!! ATTENTION: BETA VERSION !!!!!!!!!!

pip install mms-pip --upgrade

Topics:

  1. gcp_logger
  2. datastore_handler
  3. bq_handler
  4. redis_handler
  5. gcp_api_authentication
  6. kms_handler
  7. gcp_secret_manager

1. gcp_logger

Log Module for standardized log purposes.

How to use:

1.1 Import module:

Import the module your app is running on GCP later on - at the moment the following products are supported:

  • app_engine_logger
  • cloud_function_logger
  • cloud_run_logger
  • composer_logger
  • compute_engine_logger
  • dataproc_logger
  • kubernetes_logger
# e.g. for using the cloud run logger

from mms.logger.cloud_run_logger import CloudRunLogger

1.2 Initalize the logger:

# Cloud Run (Serverless):
logger = CloudRunLogger(project_id, local_run)
logger = CloudRunLogger(project_id='my-project', local_run=True/False)

**IMPORTANT**

In every API route the following methods need to be called during initialization - currently only Flask is supported for this logger: 
    from flask import Flask, request

    logger.update_request_header(api_request=request)
    logger.update_trace_id(str(uuid.uuid4()))




# Cloud Function Logging:
logger = CloudFunctionLogger(service_name, trace_id, project_id, function_name)
logger = CloudFunctionLogger(service_name='my-service', trace_id='lksjdf2', project_id='my-project-id', function_name='ppx-price-updates-de-gcs-bq')

# App Engine Logging:
logger = AppEngineLogger(service_name, trace_id, project_id, module_id, version_id)
logger = AppEngineLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', module_id='app-flex-sample-service', version_id='v0.0.1')

# Compute Engine:
logger = ComputeEngineLogger(service_name, trace_id, project_id)
logger = ComputeEngineLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore')

# Kubernetes Engine: 
logger = KubernetesLogger(service_name, trace_id, project_id, cluster_name, container_name, location, namespace_name)
logger = KubernetesLogger(service_name='my-service', trace_id='id12345', project_id='v135-5683-alice-ksk-explore', cluster_name='jg-k8-testcluster', container_name=CONTAINER_NAME, location=ZONE, namespace_name='default')

# Dataproc:
logger = DataprocLogger(service_name, trace_id, project_id, cluster_name, location)
logger = DataprocLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', cluster_name='my-cluster', location='europe-west4')

# Cloud Composer:
logger = ComposerLogger(service_name, trace_id, project_id, environment_name, location)
logger = ComposerLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', environment_name='my-composer-environment', location='europe-west4')

1.3 Use the logger:

logger.info('your message')
logger.warning('your message')
logger.error('your message')
logger.critical('your message')
logger.debug('your message')

The logs are visible in Stackdriver Logging via:

  • GAE Application -> Module_id -> Version_id for App Engine.
  • Or under Cloudfunctions -> Function_id
  • Or under GCE VM Instance -> Instance_id
  • Or under Kubernetes Container -> cluster_name -> namespace_name -> container_name
  • Or under Cloud Run Revision -> service_name -> revision_name
  • Or under Global

Important

This log tool only works in App Engine Standard/Flexible, Cloud Function, Compute Engine and Kubernetes, Cloud Run (Serverless) environment.

For local testing please set the boolean flag 'local_run' during initialization to 'True'

How we log

We initialize the logger only in the "app.py" file. From there every log entry will be written - Modules used within app.py need to return the exceptions to the caller so error etc. get logged at one central point within app.py.


2. datastore_handler

Python script for easy interacting with Google Cloud Datastore (DS)

How to use:

2.1 Import Datastore class:

from mms.datastore_handler import Datastore

2.2 Initalize the datastore handler:

ds_handler = Datastore()

2.3 Use the Datastore handler:

You can set/get an id or a name in Datastore. For this package if you specify the parameter name as a string, than it's a name in Datastore. If you specify the parameter name as an integer, than it's an id in Datastore.

  • Creating a new Entity:
ds_handler.put_new_entity(kind='kind_name', prop_df={'property1': 1, 'property2'='Test String'}, name='The Name/ID of the Entity')

kind and prop_df is required, name is optional (Google generates an ID for you)

  • Getting and deleting Entity:
result_entitiy_dict = ds_handler.get_entity(kind='kind_name', name='The Name/ID of the Entity')
ds_handler.delete_entity(kind='kind_name', name='The Name/ID of the Entity')
  • Updating Entity:
ds_handler.update_entity(kind='kind_name', name='The Name/ID of the Entity', prop_df={'property1': 1, 'property2'='Test String'})

You cannot update specific properties of an entity. The Updates must include all properties that should be persisted. Whenever you need to do an update, you need to first retrieve the existing entity as a whole, then update one or more properties by setting new values on prop_df and update the entity.

  • Get all Entities of a specific kind:
result = ds_handler.get_all_of_a_kind('kind_name')
  • Query Datastore: With only one Filter:
with_one_filter = ["property1", "=", 1]
result1 = ds_handler.query(kind="TestKind1", filter=with_one_filter)

With two or more Filter:

with_more_filter = [["property1", "=", 1], ["property3", "<=", 3.33]]
result2 = ds_handler.query(kind="TestKind1", filter=with_more_filter)

3. bq_handler

Python script for easy interacting with Google Cloud BigQuery (BQ)

How to use:

3.1 Import BigQuery class:

from mms.bq_handler import BQ

3.2 Initalize the BigQuery handler:

There are two Options:

i) Without default values:

bq_handler = BQ()

ii) With default values (i. e. if you only work with one dataset or table):

bq_handler = BQ(dataset_id='default_dataset_id', table_id='default_table_id')

Every time a dataset_id and/or a table_id is not specified in a specific method of the BQ class, the method will use the default dataset_id and/or table_id of the bq_handler.

You can also specify a project_id and/or a cred_file_path (credential file path) if you need it.

3.3 Use the BigQuery handler:

  • Creating a new Dataset:
# With default values from bq_handler
bq_handler.create_dataset()

# Without defaults:
bq_handler.create_dataset('test_dataset_id')
  • Creating a new Table (dataset have to exists):
from google.cloud import bigquery
SCHEMA = [bigquery.SchemaField('full_name', 'STRING', mode='required', description="Visitor's Name"),
          bigquery.SchemaField('visit_time', 'TIMESTAMP', mode='required', description="Visit Time"),
          bigquery.SchemaField('visit_length', 'INT64', mode='required', description="Length of Visit in Seconds"),
          bigquery.SchemaField('sentiment', 'FLOAT64', mode='required', description="Calculated Happiness Score")]

# With default values from bq_handler
bq_handler.create_table(SCHEMA)

# Without defaults:
bq_handler.create_table(SCHEMA, dataset_id='test_dataset_id', table_id='test_table_id')

# You can also create a table with day partitioning and/or with clustering:
bq_handler.create_table(SCHEMA, dataset_id='test_dataset_id', table_id='test_table_id', partitioning_field='_PARTITIONTIME', require_partition_filter=True, clustering_fields=['visit_time'])
# _PARTITIONTIME is the default partitioning of BigQuery
  • Checking if a table exists or not (ATTENTION: No guarantees! See source code.):
# With default values from bq_handler
check = bq_handler.check_if_table_exists()

# Without defaults:
check = bq_handler.check_if_table_exists(dataset_id='test_dataset_id', table_id='test_table_id')
  • Getting a schema of an existing table:
# With default values from bq_handler
schema = bq_handler.get_schema()

# Without defaults:
schema = bq_handler.get_schema(dataset_id='test_dataset_id', table_id='test_table_id')
  • Streaming a row into an existing BQ table (append to table):
# Create Dictionary:  
data = {'full_name': 'Max Mustermann', 'visit_time': '2019-07-23 13:45:07.372826 UTC', 'visit_length': 100, 'sentiment': 1.111}

# With default values from bq_handler
bq_handler.streaming_insert_single_json(data)

# Without defaults:
bq_handler.streaming_insert_single_json(data, dataset_id='test_dataset_id', table_id='test_table_id')
  • Running a query and get results as a dataframe:
query = '''
SELECT * FROM `v135-5683-playground-goppold.test_dataset_id.test_table_id`
'''
df = bq_handler.run_query(query)

4. redis_handler

Python script for easy usage of the Python redis class.

How to use:

4.1 Import Redis class:

from mms.redis_handler import Redis

4.2 Initalize the redis handler:

redis-client = Redis(host, port, password)

4.3 Use the redis handler:

There are only three methods available:

  • set (there is a default expiration of 60*15 set as default)
  • get
  • delete

Reddis key

redis_client.set_redis(set_key, set_value, exp)

redis_client.get_redis(get_key)

redis_client.delete_key(key_to_delete)

5. gcp_api_authentication

5.1 gcp_api_authentication.service_account.py

Python Class for easy and secure interacting with GCP (Cloud Run) API endpoints with service accounts. All you have to do is

  • create service account,
  • download service account json file,
  • grant service account permission on cloud run service -> add service account email as Cloud Run Invoker to your service.

After that you can make a GET or/and POST request to your routes on your cloud run service:

from mms.gcp_api_authentication.service_account import APICalls
import json 

# Init:
sa_api_auth_handler = APICalls(service_url="https://test-api-h3e6iof3xq-ew.a.run.app", sa_key_dict={ ... service accout json file as dictionary ... })
# or
sa_api_auth_handler = APICalls(service_url="https://test-api-h3e6iof3xq-ew.a.run.app", sa_key_path="path/to/your/service_account.json")

# POST request: 
request_body = {"Your Request Body": "For Your API"}
r1 = sa_api_auth_handler.post_request(url="https://test-api-h3e6iof3xq-ew.a.run.app/apis/test1", request_body=request_body)
response_body = json.loads(r1.content.decode('utf-8'))
status_code = r1.status_code

# GET request: 
r2 = sa_api_auth_handler.get_request(url="https://test-api-h3e6iof3xq-ew.a.run.app/apis/test2")
response_body = json.loads(r2.content.decode('utf-8'))
status_code = r2.status_code

5.2 gcp_api_authentication.service_to_service.py

TODO

https://cloud.google.com/run/docs/authenticating/service-to-service

First, you'll need to configure the receiving service to accept requests from the calling service: Grant the Cloud Run Invoker (roles/run.invoker) role to the calling service identity on the receiving service.


6. KMS handler

There is a module available to easily use KMS:

6.1 Import KMS class:

from mms.kms_handler import KmsService

6.2 Use the KMS client:

decrpyted = KmsService.decrypt(project_id, location_id, key_ring_id, crypto_key_id, ciphertext)

encrypted = KmsService.encrypt(project_id, location_id, key_ring_id, crypto_key_id, plaintext)

7. GCP Secret Manager:

First, enable the secret manager API on GCP console

7.1 Init Secret Manager:

from mms.gcp_secret_manager import SecretManager
# Init secretmanager object with your the default credential from your environment:
secretmanager = SecretManager("v135-5683-playground-goppold")
# Init secretmanager object with your a service account json file - specify the path to your file:
#secretmanager = SecretManager("v135-5683-playground-goppold", "test.json")

7.2 Create Secret Value:

# Create a secret value:
secret_name = secretmanager.create_secret("my-secret")
print(secret_name)
# Create a secret with value
secret_version_name = secretmanager.create_secret("my-secret-with-value", "This is my secret value")
print(secret_version_name)

By default: regions=["europe-west4"] and labels=None. Please specify this params of you need to.

7.3 Add a secret value to a existing secret:

# Add a secret value to a existing secret:
secret_version_name_1 = secretmanager.add_secret_version("my-secret", "This is another secret value")
print(secret_version_name_1)
secret_version_name_2 = secretmanager.add_secret_version("my-secret", "This is a new verison of my secret value")
print(secret_version_name_2)

7.4 Accessing value of a secret version:

!!!WARNING: Do not print the secret in a production environment!!!

value_1 = secretmanager.access_secret_version("my-secret", 1)
print(value_1)
value_2 = secretmanager.access_secret_version("my-secret", 2)
print(value_2)
value_3 = secretmanager.access_secret_version("my-secret-with-value", 1)
print(value_3)

7.5 List secrets and secret versions:

# List all secrets in my project:
secrets = secretmanager.list_secrets()
print(secrets)

# List all version of a specified secret:
secret_verions = secretmanager.list_secret_versions("my-secret")
print(secret_verions)

7.6 Get metadata of a specific secret:

# Get metadata of a specific secret:
metadata = secretmanager.get_secret("my-secret")
print(metadata.name)

7.7 Delete secrets:

# Delete a secret with the given name and all of its versions:
secretmanager.delete_secret("my-secret")
secretmanager.delete_secret("my-secret-with-value")

7.8 Update Secret Version:

!! Attention !! This method is only applicable if there is only one version of the secret. It's not possible to update specific secret versions

secretmanager.update_secret_version('my-secret', 'new value')

TODOs

  • Complete README
  • Add Docstrings to Python scripts

CHANGELOG:

see changelog.txt


Tobias Hoke - Josef Goppold - 17.06.2020

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mms-pip-0.8.3.1.tar.gz (21.7 kB view hashes)

Uploaded Source

Built Distribution

mms_pip-0.8.3.1-py3-none-any.whl (24.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page