Skip to main content

A set of utilities for DCI jobs

Project description

dci_utils

This package collects some useful classes to promote code reuse across ETL jobs used in DCI

Class

Logger

  • provides a log method to log various milestones during the job execution.
# credentials (dict, optional): AWS Credentials used to access CloudWatch.
#     If not specified, defaults to the computer's role
# log_group_name (str): Name of the AWS Log Groupself.
#     Must be the name of the job being executed.
# region (str, optional): AWS region where logs are recorded.
#     If not specified us-east-1 is assumed as default.
def __init__(self, credentials=None, log_group_name, region='us-east-1'):


# message : text to be logged
def log(self, message):

MetricRecorder

  • provides a record method to push metric to cloudwatch during the job execution.
# credentials (dict): AWS Credentials used to access CloudWatch.
# namespace (str): Name of the AWS Metric Custom Namespace.
# region (str, optional): AWS region where logs are recorded.
#     If not specified, us-east-1 is assumed as default.
def __init__(self, credentials, namespace, region='us-east-1'):


# metric_name (str): The name of the AWS metric.
# value (str): Actual value of the AWS metric.
# metric_dims (list, optional): A list of dimensions associated wit the data.
#     each dimension is a dict Name - Value
#     If not specified, empty list [] is assumed as default.
# metric_unit (str, optional): Unit of the AWS metric.
#     If not specified, Count is assumed as default.
def record(self, metric_name, value, metric_dims=None, metric_unit='Count'):

Usage

from pyspark.sql import SparkSession

import boto3

spark = SparkSession.builder.enableHiveSupport() \
    .appName("<application_name>").getOrCreate()

spark.sparkContext.addPyFile('s3://path/to/file/aws_cloudwatch_utils.py')

import aws_cloudwatch_utils

job_name = '<job_name>'

role = 'arn:aws:iam::<aws_account>:role/<aws_role_name>'
dims = [{'Name': 'JobName', 'Value': job_name}]
sts = boto3.client('sts')
credentials = sts.assume_role(RoleArn=role, RoleSessionName='<job_name>')['Credentials']

logger = aws_cloudwatch_utils.Logger(credentials, '<job_name>')
metric_recorder = aws_cloudwatch_utils.MetricRecorder(credentials, '<job_name>')
logger.log("Job Completed Successfully")
metric_recorder.record('Success', 1, dims, 'Count')

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dci_utils-0.0.11.tar.gz (3.5 kB view hashes)

Uploaded Source

Built Distribution

dci_utils-0.0.11-py3-none-any.whl (4.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page