A Python Package for interacting with Cloudera Data Engineering Clusters
Project description
cdepy Package
cdepy is a package for interacting with Cludera Data Engineering Virtual Clusters.
You can find out more about Cloudera Data Engineering in the Cloudera Documentation.
Usage
You can install this package using
pip install cdepy
Features
- CDE Resources: create resources of type Files and Python-Environment
- CDE Jobs: create jobs of type Airflow and Spark
- Job Observability: monitor job status
Examples
BASICS
from cdepy import cdeconnection
from cdepy import cdejob
from cdepy import cdemanager
from cdepy import cderesource
from cdepy import utils
Establish Connection to CDE Virtual Cluster
JOBS_API_URL = "https://<YOUR-CLUSTER>.cloudera.site/dex/api/v1"
WORKLOAD_USER = "<Your-CDP-Workload-User>"
WORKLOAD_PASSWORD = "<Your-CDP-Workload-Password>"
myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
Create CDE Files Resource Definition
CDE_RESOURCE_NAME = "myFilesCdeResource"
myCdeFilesResource = cderesource.CdeFilesResource(CDE_RESOURCE_NAME)
myCdeFilesResourceDefinition = myCdeFilesResource.createResourceDefinition()
Create a CDE Spark Job Definition
CDE_JOB_NAME = "myCdeSparkJob"
APPLICATION_FILE_NAME = "pysparksql.py"
myCdeSparkJob = cdejob.CdeSparkJob(myCdeConnection)
myCdeSparkJobDefinition = myCdeSparkJob.createJobDefinition(CDE_JOB_NAME, CDE_RESOURCE_NAME, APPLICATION_FILE_NAME, executorMemory="2g", executorCores=2)
Create Resource and Job in CDE Cluster
LOCAL_FILE_PATH = "examples"
LOCAL_FILE_NAME = "pysparksql.py"
myCdeClusterManager = cdemanager.CdeClusterManager(myCdeConnection)
myCdeClusterManager.createResource(myCdeFilesResourceDefinition)
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
myCdeClusterManager.createJob(myCdeSparkJobDefinition)
Run Job with Default Configurations
myCdeClusterManager.runJob(CDE_JOB_NAME)
Update Runtime Configurations
overrideParams = {"spark": {"executorMemory": "4g"}}
myCdeClusterManager.runJob(CDE_JOB_NAME, SPARK_OVERRIDES=overrideParams)
Validate Job Runs
jobRuns = myCdeClusterManager.listJobRuns()
json.loads(jobRuns)
Download Spark Event Logs
JOB_RUN_ID = "1"
logTypes = myCdeClusterManager.showAvailableLogTypes(JOB_RUN_ID)
json.loads(logTypes)
LOGS_TYPE = "driver/event"
sparkEventLogs = myCdeClusterManager.downloadJobRunLogs(JOB_RUN_ID, LOGS_TYPE)
sparkEventLogsClean = utils.sparkEventLogParser(sparkEventLogs)
print(sparkEventLogsClean)
Delete Job and Validate Deletion
CDE_JOB_NAME = "myCdeSparkJob"
myCdeClusterManager.deleteJob(CDE_JOB_NAME)
myCdeClusterManager.listJobs()
Describe Cluster Meta
myCdeClusterManager.describeResource(CDE_RESOURCE_NAME)
Remove Files from Files Resource
RESOURCE_FILE_NAME = "pysparksql.py"
myCdeClusterManager.removeFileFromResource(CDE_RESOURCE_NAME, RESOURCE_FILE_NAME)
Upload File to Resource
myCdeClusterManager.uploadFileToResource(CDE_RESOURCE_NAME, LOCAL_FILE_PATH, LOCAL_FILE_NAME)
Download File from Resource
myPySparkScript = myCdeClusterManager.downloadFileFromResource(CDE_RESOURCE_NAME, RESOURCE_FILE_NAME)
from pprint import pprint
pprint(myPySparkScript)
Pause Single Job
myCdeClusterManager.pauseSingleJob(CDE_JOB_NAME)
Delete Resource
CDE_RESOURCE_NAME = "myFilesCdeResource"
myCdeClusterManager.deleteResource(CDE_RESOURCE_NAME)
CDE AIRFLOW PYTHON ENVIRONMENTS
NB: There is only one Airflow Python Environment per CDE Virtual Cluster.
from cdepy import cdeconnection
from cdepy import cdeairflowpython
import os
import json
Connect via CdeConnection Object
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
Use CdeAirflowPythonEnv object to manage Airflow Python Environments
myAirflowPythonEnvManager = cdeairflowpython.CdeAirflowPythonEnv(myCdeConnection)
Create a Maintenance Session in order to perform any Airflow Python Environments related actions
myAirflowPythonEnvManager.createMaintenanceSession()
First Create a pip repository
myAirflowPythonEnvManager.createPipRepository()
Check on Status of Maintenance Session
myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
###### STATUS SHOULD BE {"status":"pip-repos-defined"}
Load requirements.txt file
pathToRequirementsTxt = "/examples/requirements.txt"
myAirflowPythonEnvManager.buildAirflowPythonEnv(pathToRequirementsTxt)
###### requirements.txt file must be customized
myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
###### RESPONSE STATUS SHOULD BE {"status":"building"}
###### AFTER 2 MINUTES REPEAT THE REQUEST. RESPONSE STATUS SHOULD EVENTUALLY BE {"status":"built"}
Validate status of Python environment
myAirflowPythonEnvManager.getAirflowPythonEnvironmentDetails()
Explore Maintenace Session logs
myAirflowPythonEnvManager.viewMaintenanceSessionLogs()
Activate the Python environment
myAirflowPythonEnvManager.activateAirflowPythonEnv()
Check on Python environment build status
myAirflowPythonEnvManager.checkAirflowPythonEnvStatus()
###### AT FIRST RESPONSE STATUS SHOULD BE {"status":"activating"}
###### AFTER A COUPLE OF MINUTES THE MAINTENANCE SESSION WILL AUTOMATICALLY END. THIS MEANS THE AIRFLOW PYTHON ENV HAS ACTIVATED.
Optional: Create a new session and then delete the Python environment
myAirflowPythonEnvManager.deleteAirflowPythonEnv()
Optional: End the Maintenance Session once you have deleted the Python environment
myAirflowPythonEnvManager.deleteMaintenanceSession()
CDE REPOSITORIES
from cdepy import cdeconnection
from cdepy import cderepositories
import os
import json
JOBS_API_URL = "<myJobsAPIurl>"
WORKLOAD_USER = "<myusername>"
WORKLOAD_PASSWORD = "<mypwd>"
Connect via CdeConnection Object
myCdeConnection = cdeconnection.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
myCdeConnection.setToken()
Instantiate Repository Manager
myRepoManager = cderepositories.CdeRepositoryManager(myCdeConnection)
Provide example git repository information. This repository is available for tests.
repoName = "exampleGitRepository"
repoPath = "https://github.com/pdefusco/cde_git_repo.git"
Create CDE Repository from Git Repository
myRepoManager.createRepository(repoName, repoPath, repoBranch="main")
Show available CDE repositories
myRepoManager.listRepositories()
Show CDE Repository Metadata
myRepoManager.describeRepository(repoName)
Download file from CDE Repository
filePath = "simple-pyspark-sql.py"
myRepoManager.downloadFileFromRepo(repoName, filePath)
Delete CDE Repository
myRepoManager.deleteRepository(repoName)
Validate CDE Repository Deletion
myRepoManager.listRepositories()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
cdepy-0.1.10.tar.gz
(12.4 kB
view details)
Built Distribution
cdepy-0.1.10-py3-none-any.whl
(13.3 kB
view details)
File details
Details for the file cdepy-0.1.10.tar.gz
.
File metadata
- Download URL: cdepy-0.1.10.tar.gz
- Upload date:
- Size: 12.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 645d5f637a30c9ee57da1b4a9d4d0f7de92342e0b54ff3d59d444a834bcb9ed4 |
|
MD5 | aa28b833f1ded5ba53fe7a6f076c1523 |
|
BLAKE2b-256 | 73860b3d40528bc188e823825239e9650e99ca273e2c88242ea5e4fa57f7aab1 |
File details
Details for the file cdepy-0.1.10-py3-none-any.whl
.
File metadata
- Download URL: cdepy-0.1.10-py3-none-any.whl
- Upload date:
- Size: 13.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4b3138d0e6ba7bcac9357912f039782aebd0bbe95b65734e23dbb9df4849a296 |
|
MD5 | 865b92a17672c9bae6ce3fbc20b57f5d |
|
BLAKE2b-256 | cad24c3802d91bc2beb807af75e5e02d54196369365ac85891e9795d109aad4e |