Skip to main content

A Python package to do cloud migration Coding challenge

Project description


Python Test Migrate Challenge

A Python package to migrate cloud data

GitHub Link


Use the package manager [pip] ( to install Python-Test-Migration-Challenge

Command to Install

pip install Python-Test-Migration-Challenge

Project Structure

Python_Test_Migration (Root folder)\
|__Provider Classes serving for object Creation, backend structing\
|__Service Layer for business logic incorporation, calling backend Provider classes only through Service Layer, contains list of functions\
|__Customer facing code to access Service Layer fucntions for writing Migration logics\
|__Modules to help store Classes in filesystems\
   |__Unit Test cases for Provider Layer\
   |__Unit Test cases for Service Layer


Example 1: Tier one Layer to do migration

from Python_Test_Migration.service.app_service import *
from Python_Test_Migration.provider.constant import IP_ADDRESS
from Python_Test_Migration.persistence.make_dill import dump_classes
from Python_Test_Migration.persistence.un_dill import load_classes
import os

# Loading persistent classes
if os.stat("python_test_migration").st_size != 0:

# List of sources
sources = []

# Creates credentials object for source 1
ipaddress1 = IP_ADDRESS('')
credentials1 = credentials_create('test', 'password', ipaddress1)

# Creates credentials object for source 2
ipaddress2 = IP_ADDRESS('')
credentials2 = credentials_create('test2', 'password', ipaddress2)

# Create Source Mount points for target source 1
mountpoint1 = mountpoint_create('C:\\', 25)
mountpoint2 = mountpoint_create('D:\\', 25)
mountpoint3 = mountpoint_create('E:\\', 25)

# Create Source Mount points for target source 2
mountpoint4 = mountpoint_create('C:\\', 25)
mountpoint5 = mountpoint_create('N:\\', 25)

# Create Mountpoint list for source1
mountpoint_list1 = mountpoints_list(mountpoint1, mountpoint2, mountpoint3)

# Create Mountpoint list for source2
mountpoint_list2 = mountpoints_list(mountpoint4, mountpoint5)

# Create Workload for source 1
source1 = workload_create(credentials1, mountpoint_list1)
# Create Workload for source 2
source2 = workload_create(credentials2, mountpoint_list2)

# Create Target VM
cloud_type = 'aws'
ipaddress_cloud = IP_ADDRESS('')
creds_cloud = Credentials('mukul', '234234', ipaddress_cloud.get_ipaddress())
cloud_workload = Workload(creds_cloud)

targetVM = migrationTarget_Create(cloud_type,
if targetVM.cloudtype == 'Not Supported':

# Run Migration
# Check for constrains i.e. no source has same IP address and and ip address remains same during workload
# Above checks are being added to Service Layer Migration function

# Created selected mount types
selected_mountpoint = mountpoint_create('C:\\', 25)
selected_mountpoint_list = [selected_mountpoint.get_mountPoint_instances()]

run_migration = migration(selected_mountpoint_list,
                          source1, targetVM,
                          ipaddress1, sources, mountpoint_list1)

# Saving classes to filesystem

Example 1: Service Layers functions available to call Provider Classes

import sys
from provider.python_test_migration \
    import Credentials, MountPoint, Workload, MigrationTarget, Migration

def credentials_create(username, password, ipaddress):
    """Service Layer to Create Credentials"""
    credentials = Credentials(username, password, ipaddress.get_ipaddress())
    return credentials

def credentials_get(credentials):
    """Service Layer to get credentials"""
    return credentials.get_Credentials()

def mountpoint_create(name, size):
    """Service Layer to create mountpoint"""
    mountpoint = MountPoint(name, size)
    return mountpoint

def mountpoint_get(mountpoint):
    """Service Layer to get Mountpoint details"""
    return mountpoint.get_mountPoint_instances()

def mountpoints_list(*args):
    """Service Layer to get mountpoint list"""
    mountpoints_list = []
    for instance in args:
    return mountpoints_list

def workload_create(creds, MountPoint):
    """Service Layer to create workload"""
    source = Workload(creds, MountPoint)
    return source

def workload_get(workload):
    """Service Layer to get workload details"""
    workload = workload.get_workload()
    return workload

def constrains(source, ipaddress):
    """Service Layer to check constrains"""
    return source.constrains(ipaddress.get_ipaddress())

def check_duplicate_ip(sources):
    """Service Layer to check sources from duplicate IP addresses"""
    sources_ips = []
    for instance in sources:
    return sources[0].checksourceip(sources_ips)

def migrationTarget_Create(cloud_type, creds_cloud, cloud_workload):
    """Service Layer to create Migration target"""
    targetVM = MigrationTarget(cloud_type, creds_cloud, cloud_workload)
    return targetVM

def migrationTarget_get(targerVM):
    """Service Layer to get Migration Target details"""
    return targerVM.get_Migration_Target()

def check_Mounts(selected_mountpoint_list, mountpoint_list1):
    """Service Layer to check mounts if selected mount is present in provided mountpoints or no"""
    selected_mountpoint = []
    mountpoints_provided = []
    for item in selected_mountpoint_list:
    for item in mountpoint_list1:

    for i in selected_mountpoint:
        for j in mountpoints_provided:
            if i == j:
                return {
                    'key': 'success',
                    'message': 'Selected Mount type present in MountTypes provided'
                return {
                    'key': 'error',
                    'message': 'Selected Mount type not present in MountTypes provided'

def migration(selected_mountpoint, source,
              targetVM, ipaddress,
              sources, mountpoint_list1):
    """Service Layer to run migrations"""
    check_ipchanged = constrains(source, ipaddress)
    if check_ipchanged['key'] == 'error':
        return {
            'key': 'error',
            'message': check_ipchanged['message'],
            'migration_State': 'error'

    check_mul_ips = check_duplicate_ip(sources)
    if check_mul_ips['key'] == 'error':
        return {
            'key': 'error',
            'message': check_mul_ips['message'],
            'migration_State': 'error'

    check_mount = check_Mounts(selected_mountpoint, mountpoint_list1)
    if check_mount['key'] == 'error':
        return {
            'key': 'error',
            'message': check_mount['message'],
            'migration_State': 'error'

    run_migration = Migration(selected_mountpoint, source, targetVM)
    status =, targetVM)
    return status

Example 3: Provider Layer Classes

import time
import copy

class Credentials:
    """Class to create Credentials Objects"""

    def __init__(self, username=None, password=None, domain=None):
        """Init method for initializing variables"""
        self.username = username
        self.password = password
        self.domain = domain

    def get_Credentials(self):
        """Method to get credentials"""
        return {'username': self.username,
                'password': self.password,
                'domain': self.domain}

class MountPoint:
    """Class to create MountPoint Object"""

    def __init__(self, mount_point_name=None, size=None):
        """Init method to initialize MountPoint object"""
        self.mount_point_name = mount_point_name
        self.size = size

    def get_mountPoint_instances(self):
        """Method to get MountPoint instances with size"""
        return {
            'mount_name': self.mount_point_name,
            'size': self.size

class Workload(object):
    """Class to create workload Object"""

    def __init__(self, credentials=None, mountpoint=None):
        """Init method to initialize Workload Object"""
        self.Credentials = credentials = []
        if mountpoint is not None:
            for instance in mountpoint:

    def constrains(self, ipaddress=None):
        """Method to identify if there is an IP address changed over Workload"""
        username = self.Credentials.username
        password = self.Credentials.password
        domain = self.Credentials.domain
        storage =
        if domain != ipaddress:
            return {'key': 'error',
                    'message': 'IP address changed'}

        if username is None or password is None or domain is None:
            return {'key': 'error',
                    'message': 'Either IP address or password or domain is None'}
        return {'key': 'success',
                'message': 'Constrains passed'}

    def checksourceip(self, source_ips):
        """Method to check if there are multiple sources with same IP address"""

        if len(source_ips) != len(set(source_ips)):
            return {'key': 'error',
                    'message': 'Duplicate IP Addresses exist'}
        return {'key': 'success',
                'message': 'All Sources have separate IP\'s'}

    def get_credentials(self):
        """Method to get credentials"""
        return {
            'username': self.Credentials.username,
            'password': self.Credentials.password

    def get_workload(self):
        """Method to get workload details"""
        return {
            'username': self.Credentials.username,
            'password': self.Credentials.password,
            'domain': self.Credentials.domain,

class MigrationTarget(object):
    """Class to create MigrationTarget object"""

    def __init__(self, cloudtype=None, credentials=None, workload=None):
        """Init method to initialize MigrationTarget object"""
        self.cloudtype = cloudtype
        self.Credentials = credentials
        self.Workload = workload

        if self.cloudtype != 'aws':
            if self.cloudtype != 'azure':
                if self.cloudtype != 'vsphere':
                    if self.cloudtype != 'vcloud':
                        self.cloudtype = 'Not Supported'

    def get_Migration_Target(self):
        """Method to get MigrationTarget Details"""
        storage =
        return {
            'cloud_type': self.cloudtype, \
            'username': self.Credentials.username, \
            'password': self.Credentials.password, \
            'domain': self.Credentials.domain, \
            'mountpoint': storage}

class Migration(object):
    """Class to create Migration object"""

    def __init__(self, mountpoint=None, sourceWorkload=None, targetMigrationTarget=None):
        """Init method for initializing Migration object"""
        self.mountpoint = mountpoint
        self.sourceWorkload = sourceWorkload
        self.targetMigrationTarget = targetMigrationTarget
        self.migration_State = 'not_started'

    def run(self, sourceWorkload=None, targetMigrationTarget=None):
        """run method to start migration"""

        selected_mountpoints = []
        for i in range(0, len(self.mountpoint)):

        # Checking if C:\\ is allowed to migrate
        for directory in selected_mountpoints:
            if 'C:\\' == directory:
                allowed = True
                allowed = False

        migration_State = 'not_started'

        if allowed:
                migration_State = 'running'

                # Copying source to TargetVM for migration
                targetMigrationTarget = copy.copy(sourceWorkload)

                # Logic to make sure target only has selected mountpoints
                credentials = Credentials(targetMigrationTarget.get_credentials()['username'],

                targetMigrationTarget = Workload(credentials, self.mountpoint)
                migration_State = 'success'
                return {'key': 'success',
                        'message': 'Migration successfully completed',
                        'migration_State': migration_State}
                migration_State = 'error'
                return {'key': 'error',
                        'message': 'Error happened during migration',
                        'migration_State': migration_State}

            migration_State = 'error'
            return {'key': 'error',
                    'message': 'C:\\ not allowed.',
                    'migration_State': migration_State}

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for Python-Test-Migration-Challenge, version 1.0.2
Filename, size File type Python version Upload date Hashes
Filename, size Python_Test_Migration_Challenge-1.0.2-py3-none-any.whl (16.2 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size Python-Test-Migration-Challenge-1.0.2.tar.gz (13.1 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page