src_Case_Study/parallel_kriging.R

library(fdagstat)
library(dplyr)
library(parallel)
library(doParallel)

### Required functions
ini_variopar <- function(v,
                         model,
                         nugget_ini_user,
                         sill_ini_user,
                         range_ini_user,
                         nstr = 1) {
  # This function updates initial variogram and nugget, sill, range in case of NULL input
  # INPUT:
  # v - variogram
  # model - model type
  # nugget_ini_user, sill_ini_user, range_ini_user - initial values
  # nstr - number of models
  # OUTPUT:
  # v - variogram
  # nugget_ini_user, sill_ini_user, range_ini_user - updated values

  # If the initial parameters are not given, estimate from the data
  if (is.null(nugget_ini_user)) {
    nugget_ini <- .1*median(c(rep(v$gamma[1], v$np[1]),
                              rep(v$gamma[2], v$np[2])))
  }

  if (is.null(sill_ini_user)) {
    sill_ini <- median(unlist(purrr::map2(v$gamma[(length(v$gamma)-1):(length(v$gamma)-4)],
                                          v$np[(length(v$gamma)-1):(length(v$gamma)-4)], rep))) - nugget_ini
  }

  if (is.null(range_ini_user)) {
    if (nstr > 1) {
      # if nested structure are given, initial ranges are asked to the user
      print("Please provide initial range for variogram estimation")
      return(-1)
    }

    if (model == 'Exp') {
      range_ini <- v$dist[min(which(v$gamma > .95*(sill_ini + nugget_ini)))]/3
    } else {
      range_ini <- v$dist[min(which(v$gamma > (sill_ini + nugget_ini)))]
    }
  }

  # Build initial variogram
  vm <- vgm(sill_ini, model[1], range_ini[1], nugget_ini[1])
  nstr <- length(model)

  if (nstr > 1) {
    for (j in 2:nstr)
      vm <- vgm(psill = sill_ini,
                model = model[j],
                range = range_ini[j],
                add.to = vm)
  }

  return(list(vm = vm,
              sill_ini = sill_ini,
              range_ini = range_ini,
              nugget_ini = nugget_ini))
}


subdomain_kriging <- function(v,
                              data,
                              model,
                              grid,
                              sFstat,
                              kernel_data_points_dist,
                              ker.width,
                              subdomains_list,
                              punto_lag,
                              nugget_ini_user = NULL,
                              sill_ini_user = NULL,
                              range_ini_user = NULL,
                              suppressMes = F,
                              tol = 1e6){
  # This function provides kriging for a particular subdomain
  # INPUT:
  # v - initial variogram
  # model - kriging model
  # grid - grid points locations
  # ...
  # OUTPUT:
  # list variogram fit values and grid predictoins

  nstr <- length(model)
  datak <- data[subdomains_list$data_point_id, ]
  coordinates.datak <- datak[, 1:2]      # extract coordinates
  ### ------------- 1.1 Create object
  sFstatk  <-  fstat(NULL, 'Z1',
                     coordinates.datak,
                     datak[, -c(1, 2)] %>% t() %>% as.data.frame())
  sFstatk$KrigFormula <- sFstat$KrigFormula

  ### ------------- 1.2 Estimate empirical variogram
  if(ker.width == 0) {
    # no kernel is used
    sFstatk <- sFstat
    vkB <- vk <- sFstat$variogram
  }

  if(ker.width > 0) {
    # else compute the weighted estimator
    vkB <- sFstat$variogram

    KER <- kernel_data_points_dist[subdomains_list$center_id, 1:nrow(data)]
    KER <- KER%*%t(KER)

    for(l in 1:dim(punto_lag)[1]) {
      inVh <- which(v$dist >= punto_lag[l, 1] &
                      v$dist < punto_lag[l, 2])
      indKER <- as.data.frame(v)[inVh, 6:7]%>% as.matrix()
      vkB$gamma[l] <- sum(v$gamma[inVh]*KER[indKER])/(sum(KER[indKER]))
    }

    vkB <- vkB[complete.cases(vkB), ]
    sFstatk$variogram <- vk <- vkB
  }

  ### ------------- 2.3 Fit valid model
  # Initialize variogram
  #**new**
  vm.ini <- ini_variopar(vk, model, nugget_ini_user, sill_ini_user, range_ini_user, nstr)
  vm <- vm.ini$vm
  sill_ini <- vm.ini$sill_ini
  nugget_ini <- vm.ini$nugget_ini
  range_ini <- vm.ini$range_ini
  #**end new**

  # Fit model
  #-- attempt 1
  sFstatk <- fitVariograms(sFstatk,
                           model = vm,
                           fitSills = TRUE,
                           forceNugget = FALSE)

  #-- attempt 2
  if(sFstatk$model$omni$Z1$psill[1] < 0 |
     sFstatk$model$omni$Z1$psill[2] < 0 |
     sFstatk$model$omni$Z1$range[2] > tol) {
    if(suppressMes == F){
      #print("Trying with fixed nugget")
    }
    sill_ini <- sill_ini+nugget_ini
    nugget_ini <- 0
    vm <- vgm(sill_ini, model[1], range_ini[1], nugget_ini[1])
    nstr <- length(model)
    if(nstr > 1){
      for(j in 2:nstr)
        vm <- vgm(psill = sill_ini,
                  model = model[j],
                  range = range_ini[j],
                  add.to = vm)
    }

    sFstatk.tmp <- try(fitVariograms(sFstatk,
                                     model = vm,
                                     fitSills = TRUE,
                                     forceNugget = TRUE))
    if(class(sFstatk.tmp)[1] == "try-error"){
      if(suppressMes == F)
        print('Warning: unable to fit the variogram, using the initial one')
      sFstatk$model$omni$Z1 <- vm
    } else sFstatk <- sFstatk.tmp
  }

  #-- use initial variogram model
  sFstatk$model$omni$Z1$range[which(sFstatk$model$omni$Z1$range < 0)] <- range_ini[which(sFstatk$model$omni$Z1$range < 0) - 1]
  if (sum(sFstatk$model$omni$Z1$psill) == 0 |
      sFstatk$model$omni$Z1$psill[1] < 0 |
      sFstatk$model$omni$Z1$psill[2] < 0 |
      sFstatk$model$omni$Z1$range[2] < 0 |
      is.null(sFstatk$model) ){
    if (suppressMes == F)
      print('Warning: unable to fit the variogram, using the initial one')
    sFstatk$model$omni$Z1 <- vm
  }

  # ------------- 2. Perform kriging within the neighborhood
  # Add covariance
  cov.tmp <- try(addCovariance(sFstatk, 'omni'))

  if (class(cov.tmp)[1] == "try-error") {
    if (suppressMes == F)
      print('Warning: unable to fit the variogram, using the initial one')
    sFstatk$model$omni$Z1 <- vm
    sFstatk <- addCovariance(sFstatk, 'omni')
  }

  if (class(cov.tmp)[1] != "try-error") {
    sFstatk <- cov.tmp
  }

  if (is.null(sFstatk$covariance)) {
    if (suppressMes == F)
      print('Warning: unable to fit the variogram, using the initial one')
    sFstatk$model$omni$Z1 <- vm
    sFstatk <- addCovariance(sFstatk, 'omni')
  }

  # Kriging
  pFstat.predictTr <- predictFstat(.g=sFstatk,
                                   .newCoordinates = grid[subdomains_list$grid_point_id, ],
                                   .what = 'Z1',
                                   .type = "OK")
  fpred <- c()
  vfitk <- sFstatk$model$omni$Z1

  if (length(subdomains_list$grid_point_id) > 0) {
    vfit_res_list <- list(val = c(vfitk[1,  2],
                                  vfitk[-1, 2],
                                  vfitk[-1, 3]),
                          nrow = length(subdomains_list$grid_point_id),
                          ncol = 1 + 2*nstr)

    if (dim(pFstat.predictTr$Forecast)[1] > dim(data)[2] - 2) {
      fpred[['values']] <-  pFstat.predictTr$Forecast[-c(1, 2), ]
    }
    else {
      fpred[['values']] <- pFstat.predictTr$Forecast
    }
    fpred[['variance']] <- pFstat.predictTr$Variance
  }

  return(list(vfit = vfit_res_list, fpred = fpred ))
}


### Reading and adding to the global environment
initial_list <- readRDS("data/data_for_tests/initial_data.RDS")
list2env(initial_list, globalenv())
list2env(initial_variogram_list, globalenv())
list2env(distances_list, globalenv())
rm(initial_list, distances_list, initial_variogram_list)
suppressMes <- F


### Parallelization part
detectCores()
cl <- detectCores() - NU_cores
doParallel::registerDoParallel(cl)
#### This part should be parallelized using FPGA instead of CPU
t <- Sys.time()
kriging <- foreach(subdomains_list = bootstrap_subdomains,
                   .combine = list,
                   .export = c("v",'data', 'model','grid','sFstat','kernel_data_points_dist','ker','punto_lag','nugget_ini_user',
                               'sill_ini_user','range_ini_user', 'ini_variopar','suppressMes','subdomain_kriging'),
                   .packages = c('dplyr','fdagstat'),
                   .multicombine = T,
                   .maxcombine = length(bootstrap_subdomains))%dopar%{
                     subdomain_kriging(v, data, model, grid, sFstat, kernel_data_points_dist, ker, subdomains_list, punto_lag,
                                       nugget_ini_user, sill_ini_user, range_ini_user,  suppressMes, tol = tol)
                   }

print(Sys.time() - t )
doParallel::stopImplicitCluster()
alexdidkovskyi/RDDOOK3 documentation built on Oct. 16, 2019, 1:35 p.m.