A Python package to do cloud migration Coding challenge
Project description
Instructions
Python Test Migrate Challenge
A Python package to migrate cloud data
GitHub Link
https://github.com/alhadbhadekar/python-test-migrate-challenge
Installation
Use the package manager [pip] (https://pip.pypa.io/en/stable/) to install Python-Test-Migration-Challenge
Command to Install
pip install Python-Test-Migration-Challenge
Project Structure
Python_Test_Migration (Root folder)\ Provider\ |__Provider Classes serving for object Creation, backend structing\ Service\ |__Service Layer for business logic incorporation, calling backend Provider classes only through Service Layer, contains list of functions\ Tier_one\ |__Customer facing code to access Service Layer fucntions for writing Migration logics\ Persistence\ |__Modules to help store Classes in filesystems\ test\ |__Provider\ |__Unit Test cases for Provider Layer\ |__Service\ |__Unit Test cases for Service Layer
Usage
Example 1: Tier one Layer to do migration
#!/usr/bin/python
from Python_Test_Migration.service.app_service import *
from Python_Test_Migration.provider.constant import IP_ADDRESS
from Python_Test_Migration.persistence.make_dill import dump_classes
from Python_Test_Migration.persistence.un_dill import load_classes
import os
# Loading persistent classes
if os.stat("python_test_migration").st_size != 0:
load_classes()
# List of sources
sources = []
# Creates credentials object for source 1
ipaddress1 = IP_ADDRESS('1.1.1.1')
credentials1 = credentials_create('test', 'password', ipaddress1)
# Creates credentials object for source 2
ipaddress2 = IP_ADDRESS('2.2.2.2')
credentials2 = credentials_create('test2', 'password', ipaddress2)
# Create Source Mount points for target source 1
mountpoint1 = mountpoint_create('C:\\', 25)
mountpoint2 = mountpoint_create('D:\\', 25)
mountpoint3 = mountpoint_create('E:\\', 25)
# Create Source Mount points for target source 2
mountpoint4 = mountpoint_create('C:\\', 25)
mountpoint5 = mountpoint_create('N:\\', 25)
# Create Mountpoint list for source1
mountpoint_list1 = mountpoints_list(mountpoint1, mountpoint2, mountpoint3)
# Create Mountpoint list for source2
mountpoint_list2 = mountpoints_list(mountpoint4, mountpoint5)
# Create Workload for source 1
source1 = workload_create(credentials1, mountpoint_list1)
sources.append(source1)
# Create Workload for source 2
source2 = workload_create(credentials2, mountpoint_list2)
sources.append(source2)
# Create Target VM
cloud_type = 'aws'
ipaddress_cloud = IP_ADDRESS('3.3.3.3')
creds_cloud = Credentials('mukul', '234234', ipaddress_cloud.get_ipaddress())
cloud_workload = Workload(creds_cloud)
targetVM = migrationTarget_Create(cloud_type,
creds_cloud,
cloud_workload)
if targetVM.cloudtype == 'Not Supported':
sys.exit()
# Run Migration
# Check for constrains i.e. no source has same IP address and and ip address remains same during workload
# Above checks are being added to Service Layer Migration function
# Created selected mount types
selected_mountpoint = mountpoint_create('C:\\', 25)
selected_mountpoint_list = [selected_mountpoint.get_mountPoint_instances()]
run_migration = migration(selected_mountpoint_list,
source1, targetVM,
ipaddress1, sources, mountpoint_list1)
print(run_migration)
# Saving classes to filesystem
dump_classes()
Example 1: Service Layers functions available to call Provider Classes
#!/usr/bin/python
import sys
sys.path.append("..")
from provider.python_test_migration \
import Credentials, MountPoint, Workload, MigrationTarget, Migration
def credentials_create(username, password, ipaddress):
"""Service Layer to Create Credentials"""
credentials = Credentials(username, password, ipaddress.get_ipaddress())
return credentials
def credentials_get(credentials):
"""Service Layer to get credentials"""
return credentials.get_Credentials()
def mountpoint_create(name, size):
"""Service Layer to create mountpoint"""
mountpoint = MountPoint(name, size)
return mountpoint
def mountpoint_get(mountpoint):
"""Service Layer to get Mountpoint details"""
return mountpoint.get_mountPoint_instances()
def mountpoints_list(*args):
"""Service Layer to get mountpoint list"""
mountpoints_list = []
for instance in args:
mountpoints_list.append(instance.get_mountPoint_instances())
return mountpoints_list
def workload_create(creds, MountPoint):
"""Service Layer to create workload"""
source = Workload(creds, MountPoint)
return source
def workload_get(workload):
"""Service Layer to get workload details"""
workload = workload.get_workload()
return workload
def constrains(source, ipaddress):
"""Service Layer to check constrains"""
return source.constrains(ipaddress.get_ipaddress())
def check_duplicate_ip(sources):
"""Service Layer to check sources from duplicate IP addresses"""
sources_ips = []
for instance in sources:
sources_ips.append(instance.get_workload()['domain'])
return sources[0].checksourceip(sources_ips)
def migrationTarget_Create(cloud_type, creds_cloud, cloud_workload):
"""Service Layer to create Migration target"""
targetVM = MigrationTarget(cloud_type, creds_cloud, cloud_workload)
return targetVM
def migrationTarget_get(targerVM):
"""Service Layer to get Migration Target details"""
return targerVM.get_Migration_Target()
def check_Mounts(selected_mountpoint_list, mountpoint_list1):
"""Service Layer to check mounts if selected mount is present in provided mountpoints or no"""
selected_mountpoint = []
mountpoints_provided = []
for item in selected_mountpoint_list:
selected_mountpoint.append(item['mount_name'])
for item in mountpoint_list1:
mountpoints_provided.append(item['mount_name'])
for i in selected_mountpoint:
for j in mountpoints_provided:
if i == j:
return {
'key': 'success',
'message': 'Selected Mount type present in MountTypes provided'
}
else:
return {
'key': 'error',
'message': 'Selected Mount type not present in MountTypes provided'
}
def migration(selected_mountpoint, source,
targetVM, ipaddress,
sources, mountpoint_list1):
"""Service Layer to run migrations"""
check_ipchanged = constrains(source, ipaddress)
if check_ipchanged['key'] == 'error':
return {
'key': 'error',
'message': check_ipchanged['message'],
'migration_State': 'error'
}
check_mul_ips = check_duplicate_ip(sources)
if check_mul_ips['key'] == 'error':
return {
'key': 'error',
'message': check_mul_ips['message'],
'migration_State': 'error'
}
check_mount = check_Mounts(selected_mountpoint, mountpoint_list1)
if check_mount['key'] == 'error':
return {
'key': 'error',
'message': check_mount['message'],
'migration_State': 'error'
}
run_migration = Migration(selected_mountpoint, source, targetVM)
status = run_migration.run(source, targetVM)
return status
Example 3: Provider Layer Classes
#!/usr/bin/python
import time
import copy
class Credentials:
"""Class to create Credentials Objects"""
def __init__(self, username=None, password=None, domain=None):
"""Init method for initializing variables"""
self.username = username
self.password = password
self.domain = domain
def get_Credentials(self):
"""Method to get credentials"""
return {'username': self.username,
'password': self.password,
'domain': self.domain}
class MountPoint:
"""Class to create MountPoint Object"""
def __init__(self, mount_point_name=None, size=None):
"""Init method to initialize MountPoint object"""
self.mount_point_name = mount_point_name
self.size = size
def get_mountPoint_instances(self):
"""Method to get MountPoint instances with size"""
return {
'mount_name': self.mount_point_name,
'size': self.size
}
class Workload(object):
"""Class to create workload Object"""
def __init__(self, credentials=None, mountpoint=None):
"""Init method to initialize Workload Object"""
self.Credentials = credentials
self.storage = []
if mountpoint is not None:
for instance in mountpoint:
self.storage.append(instance)
def constrains(self, ipaddress=None):
"""Method to identify if there is an IP address changed over Workload"""
username = self.Credentials.username
password = self.Credentials.password
domain = self.Credentials.domain
storage = self.storage
if domain != ipaddress:
return {'key': 'error',
'message': 'IP address changed'}
if username is None or password is None or domain is None:
return {'key': 'error',
'message': 'Either IP address or password or domain is None'}
return {'key': 'success',
'message': 'Constrains passed'}
def checksourceip(self, source_ips):
"""Method to check if there are multiple sources with same IP address"""
if len(source_ips) != len(set(source_ips)):
return {'key': 'error',
'message': 'Duplicate IP Addresses exist'}
return {'key': 'success',
'message': 'All Sources have separate IP\'s'}
def get_credentials(self):
"""Method to get credentials"""
return {
'username': self.Credentials.username,
'password': self.Credentials.password
}
def get_workload(self):
"""Method to get workload details"""
return {
'username': self.Credentials.username,
'password': self.Credentials.password,
'domain': self.Credentials.domain,
'mount_point': self.storage
}
class MigrationTarget(object):
"""Class to create MigrationTarget object"""
def __init__(self, cloudtype=None, credentials=None, workload=None):
"""Init method to initialize MigrationTarget object"""
self.cloudtype = cloudtype
self.Credentials = credentials
self.Workload = workload
if self.cloudtype != 'aws':
if self.cloudtype != 'azure':
if self.cloudtype != 'vsphere':
if self.cloudtype != 'vcloud':
self.cloudtype = 'Not Supported'
def get_Migration_Target(self):
"""Method to get MigrationTarget Details"""
storage = self.Workload.storage
return {
'cloud_type': self.cloudtype, \
'username': self.Credentials.username, \
'password': self.Credentials.password, \
'domain': self.Credentials.domain, \
'mountpoint': storage}
class Migration(object):
"""Class to create Migration object"""
def __init__(self, mountpoint=None, sourceWorkload=None, targetMigrationTarget=None):
"""Init method for initializing Migration object"""
self.mountpoint = mountpoint
self.sourceWorkload = sourceWorkload
self.targetMigrationTarget = targetMigrationTarget
self.migration_State = 'not_started'
def run(self, sourceWorkload=None, targetMigrationTarget=None):
"""run method to start migration"""
time.sleep(1)
selected_mountpoints = []
#print(len(self.mountpoint))
for i in range(0, len(self.mountpoint)):
selected_mountpoints.append(self.mountpoint[i]['mount_name'])
# Checking if C:\\ is allowed to migrate
for directory in selected_mountpoints:
if 'C:\\' == directory:
allowed = True
break
else:
allowed = False
migration_State = 'not_started'
if allowed:
try:
migration_State = 'running'
# Copying source to TargetVM for migration
targetMigrationTarget = copy.copy(sourceWorkload)
# Logic to make sure target only has selected mountpoints
credentials = Credentials(targetMigrationTarget.get_credentials()['username'],
targetMigrationTarget.get_credentials()['password'])
targetMigrationTarget = Workload(credentials, self.mountpoint)
migration_State = 'success'
return {'key': 'success',
'message': 'Migration successfully completed',
'migration_State': migration_State}
except:
migration_State = 'error'
return {'key': 'error',
'message': 'Error happened during migration',
'migration_State': migration_State}
else:
migration_State = 'error'
return {'key': 'error',
'message': 'C:\\ not allowed.',
'migration_State': migration_State}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file Python-Test-Migration-Challenge-1.0.2.tar.gz.
File metadata
- Download URL: Python-Test-Migration-Challenge-1.0.2.tar.gz
- Upload date:
- Size: 13.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
54b12b4d907d763e9bbe18595301e57348b202c9da3bb196c81f40aae43e59b8
|
|
| MD5 |
b58b70be9c9d8354be3d800eb701b8d9
|
|
| BLAKE2b-256 |
85e491e7f2617f5ff98f6af18dd6bd872109f957fd71c556afa931f0d2e51458
|
File details
Details for the file Python_Test_Migration_Challenge-1.0.2-py3-none-any.whl.
File metadata
- Download URL: Python_Test_Migration_Challenge-1.0.2-py3-none-any.whl
- Upload date:
- Size: 16.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1859ff54e8303731e13135daff7edc77149407e6e9e2459c6b48a6ae72171297
|
|
| MD5 |
d6ec1e85370037f92005f7b6488eb808
|
|
| BLAKE2b-256 |
dec73b5dceff8483b688c39c06fac328558586c33a1d41a1513d360dd2ce0670
|