Skip to main content

A custom MMS Thor module for python by Data Access

Project description

mms-pip

Public Python module from the Data-Access Team of MediaMarktSaturn Technology.

!!!!!!!! ATTENTION: BETA VERSION !!!!!!!!!!

pip install mms-pip --upgrade

Topics:

  1. gcp_logger
  2. datastore_handler
  3. bq_handler
  4. redis_handler
  5. gcp_api_authentication
  6. kms_handler

1. gcp_logger

Log Module for standardized log purposes.

How to use:

1.1 Import module:

Import the module your app is running on GCP later on - at the moment the following products are supported:

  • app_engine_logger
  • cloud_function_logger
  • cloud_run_logger
  • composer_logger
  • compute_engine_logger
  • dataproc_logger
  • kubernetes_logger
# e.g. for using the cloud run logger

from mms.logger.cloud_run_logger import CloudRunLogger

1.2 Initalize the logger:

# Cloud Run (Serverless):
logger = CloudRunLogger(project_id, local_run)
logger = CloudRunLogger(project_id='my-project', local_run=True/False)

**IMPORTANT**

In every API route the following methods need to be called during initialization - currently only Flask is supported for this logger: 
    from flask import Flask, request

    logger.update_request_header(api_request=request)
    logger.update_trace_id(str(uuid.uuid4()))




# Cloud Function Logging:
logger = CloudFunctionLogger(service_name, trace_id, project_id, function_name)
logger = CloudFunctionLogger(service_name='my-service', trace_id='lksjdf2', project_id='my-project-id', function_name='ppx-price-updates-de-gcs-bq')

# App Engine Logging:
logger = AppEngineLogger(service_name, trace_id, project_id, module_id, version_id)
logger = AppEngineLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', module_id='app-flex-sample-service', version_id='v0.0.1')

# Compute Engine:
logger = ComputeEngineLogger(service_name, trace_id, project_id)
logger = ComputeEngineLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore')

# Kubernetes Engine: 
logger = KubernetesLogger(service_name, trace_id, project_id, cluster_name, container_name, location, namespace_name)
logger = KubernetesLogger(service_name='my-service', trace_id='id12345', project_id='v135-5683-alice-ksk-explore', cluster_name='jg-k8-testcluster', container_name=CONTAINER_NAME, location=ZONE, namespace_name='default')

# Dataproc:
logger = DataprocLogger(service_name, trace_id, project_id, cluster_name, location)
logger = DataprocLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', cluster_name='my-cluster', location='europe-west4')

# Cloud Composer:
logger = ComposerLogger(service_name, trace_id, project_id, environment_name, location)
logger = ComposerLogger(service_name='my-service', trace_id='lksjdfl98', project_id='v135-5683-alice-ksk-explore', environment_name='my-composer-environment', location='europe-west4')

1.3 Use the logger:

logger.info('your message')
logger.warning('your message')
logger.error('your message')
logger.critical('your message')
logger.debug('your message')

The logs are visible in Stackdriver Logging via:

  • GAE Application -> Module_id -> Version_id for App Engine.
  • Or under Cloudfunctions -> Function_id
  • Or under GCE VM Instance -> Instance_id
  • Or under Kubernetes Container -> cluster_name -> namespace_name -> container_name
  • Or under Cloud Run Revision -> service_name -> revision_name
  • Or under Global

Important

This log tool only works in App Engine Standard/Flexible, Cloud Function, Compute Engine and Kubernetes, Cloud Run (Serverless) environment.

For local testing please set the boolean flag 'local_run' during initialization to 'True'

How we log

We initialize the logger only in the "app.py" file. From there every log entry will be written - Modules used within app.py need to return the exceptions to the caller so error etc. get logged at one central point within app.py.


2. datastore_handler

Python script for easy interacting with Google Cloud Datastore (DS)

How to use:

2.1 Import Datastore class:

from mms.datastore_handler import Datastore

2.2 Initalize the datastore handler:

ds_handler = Datastore()

2.3 Use the Datastore handler:

You can set/get an id or a name in Datastore. For this package if you specify the parameter name as a string, than it's a name in Datastore. If you specify the parameter name as an integer, than it's an id in Datastore.

  • Creating a new Entity:
ds_handler.put_new_entity(kind='kind_name', prop_df={'property1': 1, 'property2'='Test String'}, name='The Name/ID of the Entity')

kind and prop_df is required, name is optional (Google generates an ID for you)

  • Getting and deleting Entity:
result_entitiy_dict = ds_handler.get_entity(kind='kind_name', name='The Name/ID of the Entity')
ds_handler.delete_entity(kind='kind_name', name='The Name/ID of the Entity')
  • Updating Entity:
ds_handler.update_entity(kind='kind_name', name='The Name/ID of the Entity', prop_df={'property1': 1, 'property2'='Test String'})

You cannot update specific properties of an entity. The Updates must include all properties that should be persisted. Whenever you need to do an update, you need to first retrieve the existing entity as a whole, then update one or more properties by setting new values on prop_df and update the entity.

  • Get all Entities of a specific kind:
result = ds_handler.get_all_of_a_kind('kind_name')
  • Query Datastore: With only one Filter:
with_one_filter = ["property1", "=", 1]
result1 = ds_handler.query(kind="TestKind1", filter=with_one_filter)

With two or more Filter:

with_more_filter = [["property1", "=", 1], ["property3", "<=", 3.33]]
result2 = ds_handler.query(kind="TestKind1", filter=with_more_filter)

3. bq_handler

Python script for easy interacting with Google Cloud BigQuery (BQ)

How to use:

3.1 Import BigQuery class:

from mms.bq_handler import BQ

3.2 Initalize the BigQuery handler:

There are two Options:

i) Without default values:

bq_handler = BQ()

ii) With default values (i. e. if you only work with one dataset or table):

bq_handler = BQ(dataset_id='default_dataset_id', table_id='default_table_id')

Every time a dataset_id and/or a table_id is not specified in a specific method of the BQ class, the method will use the default dataset_id and/or table_id of the bq_handler.

You can also specify a project_id and/or a cred_file_path (credential file path) if you need it.

3.3 Use the BigQuery handler:

  • Creating a new Dataset:
# With default values from bq_handler
bq_handler.create_dataset()

# Without defaults:
bq_handler.create_dataset('test_dataset_id')
  • Creating a new Table (dataset have to exists):
from google.cloud import bigquery
SCHEMA = [bigquery.SchemaField('full_name', 'STRING', mode='required', description="Visitor's Name"),
          bigquery.SchemaField('visit_time', 'TIMESTAMP', mode='required', description="Visit Time"),
          bigquery.SchemaField('visit_length', 'INT64', mode='required', description="Length of Visit in Seconds"),
          bigquery.SchemaField('sentiment', 'FLOAT64', mode='required', description="Calculated Happiness Score")]

# With default values from bq_handler
bq_handler.create_table(SCHEMA)

# Without defaults:
bq_handler.create_table(SCHEMA, dataset_id='test_dataset_id', table_id='test_table_id')

# You can also create a table with day partitioning and/or with clustering:
bq_handler.create_table(SCHEMA, dataset_id='test_dataset_id', table_id='test_table_id', partitioning_field='_PARTITIONTIME', require_partition_filter=True, clustering_fields=['visit_time'])
# _PARTITIONTIME is the default partitioning of BigQuery
  • Checking if a table exists or not (ATTENTION: No guarantees! See source code.):
# With default values from bq_handler
check = bq_handler.check_if_table_exists()

# Without defaults:
check = bq_handler.check_if_table_exists(dataset_id='test_dataset_id', table_id='test_table_id')
  • Getting a schema of an existing table:
# With default values from bq_handler
schema = bq_handler.get_schema()

# Without defaults:
schema = bq_handler.get_schema(dataset_id='test_dataset_id', table_id='test_table_id')
  • Streaming a row into an existing BQ table (append to table):
# Create Dictionary:  
data = {'full_name': 'Max Mustermann', 'visit_time': '2019-07-23 13:45:07.372826 UTC', 'visit_length': 100, 'sentiment': 1.111}

# With default values from bq_handler
bq_handler.streaming_insert_single_json(data)

# Without defaults:
bq_handler.streaming_insert_single_json(data, dataset_id='test_dataset_id', table_id='test_table_id')
  • Running a query and get results as a dataframe:
query = '''
SELECT * FROM `v135-5683-playground-goppold.test_dataset_id.test_table_id`
'''
df = bq_handler.run_query(query)

4. redis_handler

Python script for easy usage of the Python redis class.

How to use:

4.1 Import Redis class:

from mms.redis_handler import Redis

4.2 Initalize the redis handler:

redis-client = Redis(host, port, password)

4.3 Use the redis handler:

There are only three methods available:

  • set (there is a default expiration of 60*15 set as default)
  • get
  • delete

Reddis key

redis_client.set_redis(set_key, set_value, exp)

redis_client.get_redis(get_key)

redis_client.delete_key(key_to_delete)

5. gcp_api_authentication

5.1 gcp_api_authentication.service_account.py

Python Class for easy and secure interacting with GCP (Cloud Run) API endpoints with service accounts. All you have to do is

  • create service account,
  • download service account json file,
  • grant service account permission on cloud run service -> add service account email as Cloud Run Invoker to your service.

After that you can make a GET or/and POST request to your routes on your cloud run service:

from mms.gcp_api_authentication.service_account import APICalls

# Init:
sa_api_auth_handler = APICalls(service_url="https://test-api-h3e6iof3xq-ew.a.run.app", sa_key_dict={ ... service accout json file as dictionary ... })
# or
sa_api_auth_handler = APICalls(service_url="https://test-api-h3e6iof3xq-ew.a.run.app", sa_key_path="path/to/your/service_account.json")

# POST request: 
request_body = {"Your Request Body": "For Your API"}
r1 = auth_handler.post_request(url="https://test-api-h3e6iof3xq-ew.a.run.app/apis/test1", request_body=request_body)
response_body = json.loads(r1.content.decode('utf-8'))
status_code = r1.status_code

# GET request: 
r2 = sa_api_auth_handler.get_request(url="https://test-api-h3e6iof3xq-ew.a.run.app/apis/test2")
response_body = json.loads(r2.content.decode('utf-8'))
status_code = r2.status_code

5.2 gcp_api_authentication.service_to_service.py

TODO

https://cloud.google.com/run/docs/authenticating/service-to-service

First, you'll need to configure the receiving service to accept requests from the calling service: Grant the Cloud Run Invoker (roles/run.invoker) role to the calling service identity on the receiving service.


6. KMS handler

There is a module available to easily use KMS:

6.1 Import KMS class:

from mms.kms_handler import KmsService

6.2 Use the KMS client:

decrpyted = KmsService.decrypt(project_id, location_id, key_ring_id, crypto_key_id, ciphertext)

encrypted = KmsService.encrypt(project_id, location_id, key_ring_id, crypto_key_id, plaintext)

TODOs

  • Complete README
  • Add Docstrings to Python scripts

CHANGELOG:

see changelog.txt


Tobias Hoke - Josef Goppold - 10.02.2020

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mms-pip-0.8.2.3.tar.gz (17.0 kB view hashes)

Uploaded Source

Built Distribution

mms_pip-0.8.2.3-py3-none-any.whl (18.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page