Use Case

In this tutorial, we will put things learnt from previous sections together, to show how one can use AzureDSVM to solve a clustering problem by using kmeans algorithm.

It is common for data scientists to perform clustering or segmentation of data sets. This is particularly useful when there is abundant amount of observations but no prior knowledge (e.g., label) available in the data sets. Clustering analysis is a type of unsupervised machine learning problem. One of the most popular algorithms to cluster unlabelled data sets into segments is kmeans. Basically it iteratively find centers of clusters, and within each of the cluster, the sum of squares of distances from data points to the center are minimized. Performance of kmeans clustering algorithm is heavily dependent on initial positions of centroids, so it is usually run several times and the model with minimized error is selected as the optimal one.

The demo in this tutorial shows how to perform kmeans clustering on remote DSVMs. Considering scalability of the solution, data used in the demo is saved in xdf format, with which one can use Microsoft RevoScaleR functions to enable external memory computation.

Setup

Similar to the previous sections, credentials for authentication are required to fire up the DSVMs.

library(AzureDSVM)
library(AzureSMR)
library(dplyr)
library(stringr)
library(stringi)
library(magrittr)
library(readr)
library(rattle)
library(ggplot2)
# Load the required subscription resources: TID, CID, and KEY.
# Also includes the ssh PUBKEY for the user.

USER <- Sys.info()[['user']]

source(paste0(USER, "_credentials.R"))
COUNT <- 4  # Number of VMs to deploy.

SIZE <- "Standard_DS2_v2"

BASE  <- 
  runif(4, 1, 26) %>%
  round() %>%
  letters[.] %>%
  paste(collapse="") %T>%
  {sprintf("Base name:\t\t%s", .) %>% cat("\n")}

RG <-
  paste0("my_dsvm_", BASE,"_rg_sea") %T>%
  {sprintf("Resource group:\t\t%s", .) %>% cat("\n")}

# Choose a data centre location.

LOC <-
  "southeastasia"  %T>%
  {sprintf("Data centre location:\t%s", .) %>% cat("\n")}

# Include the random BASE in the hostname to reducely likelihood of
# conflict.

HOST <-
  paste0("my", BASE) %T>%
  {sprintf("Hostname:\t\t%s", .) %>% cat("\n")}

cat("\n")

Deploy a cluster of DSVMs if there is no existing, otherwise start the machines.

# --------------------------------------------------------------------------
# Azure data science resource management
# --------------------------------------------------------------------------

# Connect to the Azure subscription and use this as the context for
# all of our activities.

context <- createAzureContext(tenantID=TID, clientID=CID, authKey=KEY)

# Check if the resource group already exists. Take note this script
# will not remove the resource group if it pre-existed.

rg_pre_exists <- existsRG(context, RG, LOC) %T>% print()

# Create Resource Group

if (! rg_pre_exists)
{
  # Create a new resource group into which we create the VMs and
  # related resources. Resource group name is RG. 

  # Note that to create a new resource group one needs to add access
  # control of Active Directory application at subscription level.

  azureCreateResourceGroup(context, RG, LOC)

}

vm <- AzureSMR::azureListVM(context, RG)

if (!is.null(vm))
{

  AzureDSVM::operateDSVM(context, RG, vm$name, operation="Check")

  # start machines if they exist in the resource group.

  AzureDSVM::operateDSVM(context, RG, vm$name, operation="Start")

} else
{

  # Create a cluster of Linux Data Science Virtual Machines.

  deployDSVMCluster(context, 
                    resource.group=RG, 
                    size=SIZE,
                    location=LOC, 
                    hostname=BASE,
                    username=USER, 
                    authen="Key",
                    pubkey=PUBKEY,
                    count=COUNT)

  # Confirm that each VM exists.

  for (i in 1:COUNT) {
    vm <- cluster[i, "name"]
    fqdn <- paste(cluster[i, "name"],
                  cluster[i, "location"],
                  "cloudapp.azure.com",
                  sep=".")

    cat(vm, "\n")

    operateDSVM(context, RG, vm, operation="Check")

    # Send a simple system() command across to the new server to test
    # its existence. Expect a single line with an indication of how long
    # the server has been up and running.

    cmd <- paste("ssh -q",
                 "-o StrictHostKeyChecking=no",
                 "-o UserKnownHostsFile=/dev/null\\\n   ",
                 fqdn,
                 "uptime") %T>%
                 {cat(., "\n")}
    cmd
    system(cmd)
    cat("\n")
  }
}

The data used in this demonstration records a number of credit card transactions, some of which are fradulent. The original data is available on kaggle website or directly from [togaware]{https://access.togaware.com/creditcard.xdf} in XDF format. The original data are labelled and so in our clustering analysis the label is removed.

The R code for clustering is available from github as [workerCluster.R]{./test/workerCluster.R}. The analysis basically normalises the credit transaction data and then performs 10 repeated clustering analyses (targeting 2 clusters) for each using the k-means algorithm. The repetition is completed in parallel with the specified computing context which is available in RevoScaleR package. Note the computing context information will be automatically added by the executeScript function given a specified computing context.

The script can then be saved and later on path to the script is used as reference. For example, in this demo, the script is saved with name "workerCluster.R" which is located in the "vignettes/test" directory.

The following code is to run the clustering analysis on a specified computing environment. This is achieved by setting computing context. For comparison purpose, two contexts, "localParallel" and "clusterParallel" are used in the demo. In the former context script is run in parallel by using the available cores while in the latter it is run across available computing nodes of a cluster.

The following is the configuration of computing cluster which is needed for specifying a "clusterParallel" computing context.

# specify machine names, master, and slaves.

machines <- unlist(cluster$name)
dns_list <- paste0(machines, ".", LOC, ".cloudapp.azure.com")
master <- dns_list[1]
slaves <- dns_list[-1]

The following script run the analytics of the worker script in a "local parallel" computing context, and obtain results from remote master node to local R session.

# parallel the analytics with local parallel computing context.

time_1 <- Sys.time()

executeScript(context=context, 
              resource.group=RG, 
              hostname=machines[1], 
              remote=master, 
              username=USER, 
              script="test/workerCluster.R", 
              compute.context="localParallel")

time_2 <- Sys.time()

# get results from remote

fileTransfer(from=paste0(master, ":~"), 
             to=".", 
             user=USER, 
             file="results.RData")

load("./results.RData") 
results_local <- 
  results %T>%
  print()

For comparison purpose, the same analysis is run in the "cluster parallel" context again.

# parallel the analytics across cluster.

time_3 <- Sys.time()

AzureDSVM::executeScript(context=context, 
                         resource.group=RG, 
                         hostname=machines, 
                         remote=master, 
                         username=USER, 
                         script="test/workerCluster.R", 
                         master=master, 
                         slaves=slaves, 
                         compute.context="clusterParallel")

time_4 <- Sys.time()

# get results from remote

AzureDSVM::fileTransfer(from=paste0(master, ":~"), 
                       to=".", 
                       user=USER, 
                       file="results.RData")

load("./results.RData") 
results_cluster <- results

Save time points for later reference

runtime <- list(time_1, time_2, time_3, time_4)
save(runtime, file = "./elapsed.RData")

Do some visualization with data.

DATA_URL <- "https://zhledata.blob.core.windows.net/mldata/creditcard.xdf"

credit_data <- rxImport(inData=DATA_URL,
                        missingValueString="M",
                        stringsAsFactors=FALSE,
                        overwrite=TRUE)

# select one clustering result from results_local.

cluster_local <- 
  results_local[[1]] %>%
  factor()

# visualize first two dimensions as jitter plot.

ggplot(data=credit_data, aes(x=V1, y=V2, color=cluster_local)) +
  geom_jitter() +
  stat_ellipse(geom="polygon", alpha=0.5, aes(fill=cluster_local)) 

Once finishing the analysis, switch off DSVMs.

# stop machines after the analysis.

for (vm in machines) {
  AzureDSVM::operateDSVM(context, RG, vm, operation="Stop")
}

The cost of running the above analytics can be obtained with expenseCalculation function, but one thing worthing noting is that there is usually delay between execution of jobs and record of data consumption. The delay varies across regions of data centers, so it is recommended to save starting and ending time points of analytical jobs for reference so that later on expenseCalculator can be safely called for retrieving results.

# calculate expense on computations.

load("./elapsed.RData")

time1 <- runtime[[1]]
time2 <- runtime[[2]]
time3 <- runtime[[3]]
time4 <- runtime[[4]]

cost <- 0

if (length(vm$name) == 1) {
  cost <- AzureDSVM::expenseCalculator(context=context,
                                      instance=as.character(vm$name[1]), 
                                      time.start=time_1,
                                      time.end=time_2,
                                      granularity="Hourly",
                                      currency="currency",
                                      locale="your_locale",
                                      offerId="your_offer_id",
                                      region="your_location")
} else {
  for (name in as.character(vm$name)) {
    cost <- cost + AzureDSVM::expenseCalculator(context=context,
                                               instance=name, 
                                               time.start=time_1,
                                               time.end=time_2,
                                               granularity="Hourly",
                                               currency="currency",
                                               locale="your_locale",
                                               offerId="your_offer_id",
                                               region="your_location")
  }
}

Clean-up

Stop or delete computing resources if they are no longer needed to avoid unnecessary cost.

if (! rg_pre_exists)
  azureDeleteResourceGroup(context, RG)


Azure/AzureDSVM documentation built on May 20, 2019, 2:03 p.m.