A set of utilities for DCI jobs
Project description
dci_utils
=========
This package collects some useful classes to promote code reuse across ETL jobs used in DCI
Class
-------------
#### Logger
- provides a log method to log various milestones during the job execution.
```python
# credentials (dict, optional): AWS Credentials used to access CloudWatch.
# If not specified, defaults to the computer's role
# log_group_name (str): Name of the AWS Log Groupself.
# Must be the name of the job being executed.
# region (str, optional): AWS region where logs are recorded.
# If not specified us-east-1 is assumed as default.
def __init__(self, credentials=None, log_group_name, region='us-east-1'):
# message : text to be logged
def log(self, message):
```
#### MetricRecorder
- provides a record method to push metric to cloudwatch during the job execution.
```python
# credentials (dict): AWS Credentials used to access CloudWatch.
# namespace (str): Name of the AWS Metric Custom Namespace.
# region (str, optional): AWS region where logs are recorded.
# If not specified, us-east-1 is assumed as default.
def __init__(self, credentials, namespace, region='us-east-1'):
# metric_name (str): The name of the AWS metric.
# value (str): Actual value of the AWS metric.
# metric_dims (list, optional): A list of dimensions associated wit the data.
# each dimension is a dict Name - Value
# If not specified, empty list [] is assumed as default.
# metric_unit (str, optional): Unit of the AWS metric.
# If not specified, Count is assumed as default.
def record(self, metric_name, value, metric_dims=None, metric_unit='Count'):
```
Usage
-------------
```python
from pyspark.sql import SparkSession
import boto3
spark = SparkSession.builder.enableHiveSupport() \
.appName("<application_name>").getOrCreate()
spark.sparkContext.addPyFile('s3://path/to/file/aws_cloudwatch_utils.py')
import aws_cloudwatch_utils
job_name = '<job_name>'
role = 'arn:aws:iam::<aws_account>:role/<aws_role_name>'
dims = [{'Name': 'JobName', 'Value': job_name}]
sts = boto3.client('sts')
credentials = sts.assume_role(RoleArn=role, RoleSessionName='<job_name>')['Credentials']
logger = aws_cloudwatch_utils.Logger(credentials, '<job_name>')
metric_recorder = aws_cloudwatch_utils.MetricRecorder(credentials, '<job_name>')
logger.log("Job Completed Successfully")
metric_recorder.record('Success', 1, dims, 'Count')
```
=========
This package collects some useful classes to promote code reuse across ETL jobs used in DCI
Class
-------------
#### Logger
- provides a log method to log various milestones during the job execution.
```python
# credentials (dict, optional): AWS Credentials used to access CloudWatch.
# If not specified, defaults to the computer's role
# log_group_name (str): Name of the AWS Log Groupself.
# Must be the name of the job being executed.
# region (str, optional): AWS region where logs are recorded.
# If not specified us-east-1 is assumed as default.
def __init__(self, credentials=None, log_group_name, region='us-east-1'):
# message : text to be logged
def log(self, message):
```
#### MetricRecorder
- provides a record method to push metric to cloudwatch during the job execution.
```python
# credentials (dict): AWS Credentials used to access CloudWatch.
# namespace (str): Name of the AWS Metric Custom Namespace.
# region (str, optional): AWS region where logs are recorded.
# If not specified, us-east-1 is assumed as default.
def __init__(self, credentials, namespace, region='us-east-1'):
# metric_name (str): The name of the AWS metric.
# value (str): Actual value of the AWS metric.
# metric_dims (list, optional): A list of dimensions associated wit the data.
# each dimension is a dict Name - Value
# If not specified, empty list [] is assumed as default.
# metric_unit (str, optional): Unit of the AWS metric.
# If not specified, Count is assumed as default.
def record(self, metric_name, value, metric_dims=None, metric_unit='Count'):
```
Usage
-------------
```python
from pyspark.sql import SparkSession
import boto3
spark = SparkSession.builder.enableHiveSupport() \
.appName("<application_name>").getOrCreate()
spark.sparkContext.addPyFile('s3://path/to/file/aws_cloudwatch_utils.py')
import aws_cloudwatch_utils
job_name = '<job_name>'
role = 'arn:aws:iam::<aws_account>:role/<aws_role_name>'
dims = [{'Name': 'JobName', 'Value': job_name}]
sts = boto3.client('sts')
credentials = sts.assume_role(RoleArn=role, RoleSessionName='<job_name>')['Credentials']
logger = aws_cloudwatch_utils.Logger(credentials, '<job_name>')
metric_recorder = aws_cloudwatch_utils.MetricRecorder(credentials, '<job_name>')
logger.log("Job Completed Successfully")
metric_recorder.record('Success', 1, dims, 'Count')
```
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dci_utils-0.0.18.tar.gz
(1.7 kB
view hashes)