R/RDD-OOK_Parallel_wrap_up.R

Defines functions get_mesh

get_mesh <- function(.df, .bl, .tr_ar =  0.01) {
  #This function provides a mesh with given data, border.length and maximum triangle area
  # .df = data.file
  # .bl = border.length
  # .tr_ar = max triangle file
  matrix(c(1:.bl,
           2:.bl, 1),
         .bl, 2) %>%
    pslg(P = .df[, 1:2], S = .) %>%
    RTriangle::triangulate(., a = .tr_ar)
}

get_initial_variogram <- function(.d, .bl, .t, .n){
  # .d = data
  # .bl = border.length
  # .t = t_step
  # .n = nsub
  #This function provides initial variogram, punto_lag and initial fstat
  .id <- which(!is.na(.d$z1))
  .v <- create.trvcloud(data   = .d[.id, ],
                       n1     = .n - .bl,
                       t_step = .t)


  # Creat sFstat object

  .sFstat  <-  fstat(NULL, 'Z1',
                    as.data.frame(.d[.id, c(1, 2)]),
                    as.data.frame(t(.d[.id, -c(1, 2)])))
  .tmp <- bbox(as.matrix(.d[, c(1, 2)]))
  .cutoff <- sqrt(diff(.tmp[1, ])^2 + diff(.tmp[2, ])^2)/3
  .sFstat <- fvariogram(formula = "~." ,
                        .sFstat        ,           # fstat structure.
                        Nlags        = 12   ,     # number of lags.
                        LagMax       = .cutoff,    # maximum lag distance.
                        directions   = "omni",    # it computes empirical variograms along each input direction.
                        ArgStep      = t_step,    # sampling step of the functions.
                        useResidual  = TRUE,
                        comments     = FALSE)

  # Initialize other parameters used below
  .vB <- .vk <- .sFstat$variogram
  .width <- .cutoff/(length(.vB$dist))
  .punto_lag <- cbind(c(0, (.5*(.vB$dist[-1] + .vB$dist[-length(.vB$dist)]))),
                     c((.5*(.vB$dist[-1] + .vB$dist[-length(.vB$dist)])), .cutoff))

  return(list(v = .v,
              sFstat = .sFstat,
              vk = .vk,
              punto_lag = .punto_lag))

}


get_distances_and_kernels <- function(data, grid, mesh) {
  #This function provides distances between grid points, vertex points and observed points and associated kernel values
  #data = data.file
  graph.distance.complete <- distance.on.graph.constrained(data[, c(1, 2)], mesh = mesh)
  output.grid2triangle    <- grid2triangle.constrained(grid.matrix = grid,
                                                        data.matrix = data[ ,c(1, 2)],
                                                        mesh = mesh)
  assign.matrix <- output.grid2triangle[[1]]
  no.assg.grid  <- output.grid2triangle[[2]]


  data.grid.distance      <- loccoords(coords = data[ ,c(1, 2)],
                                  locations = grid)

  vertices.grid.distances <- loccoords(coords = mesh$P[, 1:2],
                                       locations = grid)

  graph.distance <- graph.distance.complete[1:nsub, 1:nsub]

  grid_df <-
    data.frame(triangle = which(assign.matrix == T) %/% nrow(grid) +1,
               id       = which(assign.matrix == T) %%  nrow(grid) ) %>%
    mutate(id = ifelse(id == 0, nrow(grid), id)) %>%
    arrange(id)

  #Calculating distances.list  from each grid point
  dist_from_grid_to_vertex <-
    data.frame(t(apply(grid_df, 1, function(x){
      vertices.grid.distances[ mesh$T[x[1], ], x[2]]
      }))) %>%
    mutate(id = grid_df$id)
  #matrix of  distances.list to the points

  dist_from_grid_to_point_graph <- apply(dist_from_grid_to_vertex, 1, function(x) {
    temp_m <- graph.distance.complete[mesh$T[grid_df$triangle[grid_df$id == as.numeric(x[4])], ], ] + as.numeric(x[1:3])
    res <- apply(temp_m, 2, min)
  }) %>%
    `colnames<-`(c(dist_from_grid_to_vertex$id))

   dist_from_grid_to_point_graph <- matrix(NA, nrow(mesh$P), length(no.assg.grid)) %>%
     'colnames<-'(c(no.assg.grid)) %>%
     cbind(dist_from_grid_to_point_graph) %>%
     .[, order(colnames(.)
              %>% as.numeric()) ]

  #Calculating Gaussian kernel of distances.list between grid points and data points and data points and data points
  kernel_grid_points_dist <-
    vapply(dist_from_grid_to_point_graph, function(x) {
              exp(-1/(2*ker^2)*(x^2))
            },
            numeric(1)) %>%
    matrix( dim(dist_from_grid_to_point_graph))

  kernel_data_points_dist <-
    vapply(graph.distance.complete, function(x){
              exp(-1/(2*ker^2)*(x^2))
            }, numeric(1)) %>%
    matrix(dim(graph.distance.complete))

  return(list(kernel_grid_points_dist       = kernel_grid_points_dist,
              kernel_data_points_dist       = kernel_data_points_dist,
              dist_from_grid_to_point_graph = dist_from_grid_to_point_graph,
              graph.distance                = graph.distance,
              graph.distance.complete       = graph.distance.complete,
              no.assg.grid                  = no.assg.grid
              ))
  #ADD Later:
  #already.observed <- which(data.grid.distance == 0,
  #                         arr.ind = TRUE)
  #is.observed <- rep(0, ngrid)
  #is.observed[already.observed[, 2]] <- already.observed[, 1]
}


get_rdd_kriging <- function(data, grid, model = model, ker,
                            bootstrap_subdomains,
                            initial.variogram.list,
                            distances.list,
                            nugget_ini_user = NULL,
                            sill_ini_user = NULL,
                            range_ini_user = NULL,
                            tol = 1e6,
                            parallel = T, type = 'foreach', NU_cores = 0,
                            suppressMes = F){
  # Kriging
  # INPUT:
  # data - data
  # grid - grid points
  # ker - ker.width
  # initiail.variogram.list - get_initial_variogram output
  # distances.list - graph to data points distances/ data points to data points distances
  # type - foreach (parallel) or future_map (furrr)
  # OUTPUT:
  # subdomain kriging for all subdomains
              v <- initial.variogram.list$v
         sFstat <- initial.variogram.list$sFstat
      punto_lag <- initial.variogram.list$punto_lag
      kernel_data_points_dist <- distances.list$kernel_data_points_dist


      if (parallel == T) {
        if (type == 'foreach') {

          detectCores()
          cl <- detectCores() - NU_cores

          doParallel::registerDoParallel(cl)
          kriging <- foreach(subdomains_list = bootstrap_subdomains,
                             .combine = list,
                             .export = c("v",'data', 'model','grid','sFstat','kernel_data_points_dist','ker','punto_lag','nugget_ini_user',
                                         'sill_ini_user','range_ini_user', 'ini_variopar','suppressMes','subdomain_kriging'),
                             .packages = c('dplyr','fdagstat'),
                             .multicombine = T,
                             .maxcombine = length(bootstrap_subdomains))%dopar%{
                               subdomain_kriging(v, data, model, grid, sFstat,kernel_data_points_dist, ker, subdomains_list, punto_lag,
                                                 nugget_ini_user, sill_ini_user, range_ini_user,  suppressMes, tol=tol)
                             }

          doParallel::stopImplicitCluster()
        } else {
          plan(multiprocess)
          kriging <- furrr::future_map(bootstrap_subdomains,
                                       function(subdomains_list){
                                         subdomain_kriging(v, data, model, grid, sFstat,kernel_data_points_dist, ker, subdomains_list, punto_lag,
                                                           nugget_ini_user, sill_ini_user, range_ini_user, suppressMes = F, tol = tol)
                                       })

        }
      }

      if (parallel == F) {
        kriging <- purrr::map(bootstrap_subdomains,
                              function(subdomains_list){
                                subdomain_kriging(v, data, model, grid, sFstat,kernel_data_points_dist, ker, subdomains_list, punto_lag,
                                                  nugget_ini_user, sill_ini_user, range_ini_user, suppressMes = F, tol = tol)
                              })
      }

      return(kriging)
    }

get_domain_results <- function(kriging, bootstrap_subdomains,
                               list_of_domains_by_id,
                               parallel = F,
                               no.assg.grid,
                               type = 'foreach',
                               NU_cores = 0)  {
  if (length(bootstrap_subdomains) == 1) {
    parallel <- F

  }
  # Aggregating kriging results
  # INPUT:
  # kriging - subdomains kriging results
  # bootstrap_subdomains - list of subdomains data
  # ...
  # OUTPUT:
  # kriging_domain_aggregation for each bootstrap iteration
  if (parallel == T) {
    if (type == 'foreach') {
      detectCores()
      cl <- detectCores() - NU_cores

      doParallel::registerDoParallel(cl)
      .domain_results <- foreach(id = list_of_domains_by_id,
                                         .combine = list,
                                         .export = c('kriging', 'bootstrap_subdomains','kriging_domains_aggregation','no.assg.grid'),
                                         .packages = c('dplyr',"magrittr"),
                                         .multicombine = T,
                                         .maxcombine = length(bootstrap_names$id))%dopar%{
                                           return(kriging_domain_aggregation(id, bootstrap_subdomains, kriging, no.assg.grid ))
                                         }

      doParallel::stopImplicitCluster()
    }
    else {
      plan(multiprocess)
      .domain_results <- furrr::future_map(list_of_domains_by_id,
                                                   function(id){
                                                     kriging_domain_aggregation(id, bootstrap_subdomains, kriging, no.assg.grid )
                                                   })

    }
  }

  if (parallel == F) {
    .domain_results <- purrr::map(list_of_domains_by_id,
                                          function(id){
                                            kriging_domain_aggregation(id, bootstrap_subdomains, kriging, no.assg.grid )
                                          })

  }


  return(.domain_results)
}


get_kriging_variance <- function(kriging_domain_results) {
  #Calculating kriging variance
  # INPUT:
  # kriging results for each bootstrap iteration
  # OUTPUT:
  # kriging variance
  sapply(kriging_domain_results, '[', 'variance') %>% do.call('rbind', .) %>% colMeans(.)
}

get_bootstrap_variance <- function(kriging_domain_results, aggregated_results, B, t_step){
  #Calculating bootstrap variance

  # INPUT:
  # kriging results for each bootstrap iteration
  # aggregated kriging results
  # OUTPUT:
  # bootstrap variance

  ndim_aggr <- aggregated_results %>% dim()
  ndim_kr <- kriging_domain_results[[1]]$data %>% dim()
  start <- ndim_aggr[1] - ndim_kr[1] + 1

  tens_aggr_domain_results <- array(aggregated_results[start:ndim_aggr[1], ], c(ndim_kr[1], ndim_kr[2], B))
  tens_kriging_domain_results <- simplify2array(sapply(kriging_domain_results, '[', 'data'))

  return(apply((tens_aggr_domain_results - tens_kriging_domain_results)^2,
               c(1, 3),
               function(x)
                 trapzc(t_step, x))%>% rowSums()*1/(B-1)
         )

}

get_aggregated_results <- function(kriging_domain_results, no.assg.grid) {
  # Aggregate
  # INPUT:
  # kriging_domain_results
  # grid points out of domain
  # OUTPUT:
  # aggregated results
  if (length(kriging_domain_results) == 1) {
    return(kriging_domain_results[[1]]$data)
  } else {

  return(
    sapply(kriging_domain_results, '[','data') %>%
    simplify2array(.) %>%
    apply(c(1, 2), mean, na.rm = T)
  )
  }
}

get_rdd_kriging_general <- function(data.file, grid, border.length, model, t_step,
                                sill_ini = NULL, range_ini = NULL,
                                nugget_ini = NULL,
                                nk_min = 2,  b = 10, K = 8, parallel = F, type = 'foreach',
                                NU_cores = 0, max_triangle_area = 0.01, SuppresMes = F, tol = 1e6){


  #Making RDD_OOK for a given data, grid, subdomains number and bootstrap iterations number
  #This function returns all important results: information about bootstrap iterations, kriging results and variance, grid prediction e.t.c.
  #
  # INPUT:
  # data.file - data
  # grid - grid points
  # ker - ker.width
  # initiail.variogram.list - get_initial_variogram output
  # distances.list - graph to data points distances/ data points to data points distances
  # type - foreach (parallel) or future_map (furrr)
  # OUTPUT:
  # subdomain kriging for all subdomains
  data <- data.frame(data.file)
  colnames(data) <- c('x', 'y', paste0('z', 1:(ncol(data)-2)))
  nsub <- nrow(data)

  mesh <- matrix(c(1:border.length,
                     2:border.length, 1),
                   border.length,
                   2) %>%
      pslg(P = data.file[, 1:2], S = .) %>%
      triangulate(., a = max_triangle_area )


  graph.distance.complete <- distance.on.graph.constrained(data.file[, c(1, 2)], mesh = mesh)
  output.grid2triangle <- grid2triangle.constrained(grid.matrix = grid,
                                                      data.matrix = data.file[ ,c(1, 2)],
                                                      mesh = mesh)
  assign.matrix <- output.grid2triangle[[1]]
  no.assg.grid <- output.grid2triangle[[2]]

  data.grid.distance <- loccoords(coords = data.file[ ,c(1, 2)],
                                    locations = grid)
  already.observed <- which(data.grid.distance == 0,
                              arr.ind = TRUE)
  is.observed <- rep(0, ngrid)
  is.observed[already.observed[, 2]] <- already.observed[, 1]

  vertices.grid.distances <- loccoords(coords = mesh$P[, 1:2],
                                         locations = grid)

  graph.distance <- graph.distance.complete[1:nsub, 1:nsub]

  # Create variogram cloud
  border.length <- sum(is.na(data$z1))
  v <- create.trvcloud(data = data[which(!is.na(data$z1)),],
                       n1 = nsub - border.length, t_step = t_step)
  # Creat sFstat object
  sFstat  <-  fstat(NULL, 'Z1',
                    as.data.frame(data[which(!is.na(data$z1)), c(1, 2)]),
                    as.data.frame(t(data[which(!is.na(data$z1)), -c(1, 2)])))
  tmp <- bbox(as.matrix(data[, c(1, 2)])) #
  cutoff <- sqrt(diff(tmp[1, ])^2 + diff(tmp[2, ])^2)/3
  sFstat <- fvariogram(formula = "~." ,
                       sFstat        ,           # fstat structure.
                       Nlags        = 12   ,     # number of lags.
                       LagMax       = cutoff,    # maximum lag distance.
                       directions   = "omni",    # it computes empirical variograms along each input direction.
                       ArgStep      = t_step,    # sampling step of the functions.
                       useResidual  = TRUE,
                       comments     = FALSE)

  # Initialize other parameters used below
  vB <- vk <- sFstat$variogram

  width <- cutoff/(length(vB$dist))
  punto_lag <- cbind(c(0, (.5*(vB$dist[-1] + vB$dist[-length(vB$dist)]))),
                     c((.5*(vB$dist[-1] + vB$dist[-length(vB$dist)])), cutoff))

  pFstat.predictTrK <- fpred <- gridk <- vfitk <- vfit <- kergrid <- list()
  ngrid <- dim(grid)[1]
  #*variogram
  nstr <- length(model)
  nugget_ini_user <- nugget_ini
  sill_ini_user <- sill_ini
  range_ini_user <- range_ini

  centers_matrix <- get_centers_matrix(graph.distance.complete,
                                       replications_number = b,
                                       replication_coef = coef,
                                       min_point_number =  nk_min,
                                       start_id = border.length +1,
                                       end_id = nsub,
                                       K  = K)

  grid_df <- data.frame(triangle = which(assign.matrix == T) %/% nrow(grid) +1,
                        id = which(assign.matrix == T) %% nrow(grid) ) %>%
    mutate(id = ifelse(id == 0, nrow(grid), id)) %>%
    arrange(id)
###
  #Calculating distances.list  from each grid point
  dist_from_grid_to_vertex <-
    data.frame(t(apply(grid_df, 1, function(x){
      vertices.grid.distances[ mesh$T[x[1], ], x[2]]
    }))) %>%
    mutate(id = grid_df$id)
  #matrix of  distances.list to the points

###

  #Calculating distances  from each grid point
  dist_from_grid_to_point_graph <- apply(dist_from_grid_to_vertex, 1, function(x) {
        temp_m <- graph.distance.complete[mesh$T[grid_df$triangle[grid_df$id == as.numeric(x[4])], ], ] + as.numeric(x[1:3])
        res    <- apply(temp_m, 2, min)
      }) %>%
      `colnames<-`(c(dist_from_grid_to_vertex$id))

  dist_from_grid_to_point_graph <-
    matrix(NA, nrow(mesh$P), length(no.assg.grid)) %>%
    'colnames<-'(c(no.assg.grid)) %>%
    cbind(dist_from_grid_to_point_graph) %>%
    .[, order(colnames(.)
              %>% as.numeric()) ]


  #Calculating Gaussian kernel of distances between grid points and data points and data points and data points

  kernel_grid_points_dist <-
    vapply(dist_from_grid_to_point_graph, function(x) {
        exp(-1/(2*ker^2)*(x^2))
    },  numeric(1)) %>%
    matrix( dim(dist_from_grid_to_point_graph))

  kernel_data_points_dist <- vapply(graph.distance.complete, function(x){
        exp(-1/(2*ker^2)*(x^2))
    }, numeric(1)) %>%
    matrix(dim(graph.distance.complete))

  graph.distance <- graph.distance.complete[1:nsub,
                                            1:nsub]

  #Creating names for each iteration
  bootstrap_names <- get_bootstrap_names(b, K)

  list_of_subdomains_by_bootstrap_iteration <- lapply(1:b, function(x) which(grepl(paste0('^',x,'_'), bootstrap_names$id)))
  bootstrap_subdomains <- get_bootstrap_subdomains(centers_matrix, dist_from_grid_to_point_graph, graph.distance, bootstrap_names, border.length)
  suppressMes <- F
  ###
  kergrid <- lapply(bootstrap_subdomains, function(x){return(kernel_grid_points_dist[x$center_id, x$grid_point_id])})

  if (parallel == T) {
    if (type == 'foreach') {

      detectCores()
      cl <- detectCores() - NU_cores

      doParallel::registerDoParallel(cl)
      kriging <- foreach(subdomains_list = bootstrap_subdomains,
                         .combine = list,
                         .export = c("v",'data', 'model','grid','sFstat','kernel_data_points_dist','ker','punto_lag','nugget_ini_user',
                                     'sill_ini_user','range_ini_user', 'ini_variopar','suppressMes','subdomain_kriging'),
                         .packages = c('dplyr','fdagstat'),
                         .multicombine = T,
                         .maxcombine = length(bootstrap_subdomains))%dopar%{
                           subdomain_kriging(v, data, model, grid, sFstat,kernel_data_points_dist, ker, subdomains_list, punto_lag,
                                             nugget_ini_user, sill_ini_user, range_ini_user,  suppressMes, tol=tol)
                         }

      doParallel::stopImplicitCluster()
    }
    else {
      plan(multiprocess)
      kriging <- furrr::future_map(bootstrap_subdomains,
                                   function(subdomains_list){
                                     subdomain_kriging(v, data, model, grid, sFstat,kernel_data_points_dist, ker, subdomains_list, punto_lag,
                                                       nugget_ini_user, sill_ini_user, range_ini_user, suppressMes = F, tol = tol)
                                   })

    }
  }

  if (parallel == F) {
    kriging <- purrr::map(bootstrap_subdomains,
                          function(subdomains_list){
                            subdomain_kriging(v, data, model, grid, sFstat,kernel_data_points_dist, ker, subdomains_list, punto_lag,
                                              nugget_ini_user, sill_ini_user, range_ini_user, suppressMes = F, tol = tol)
                          })
  }

  names(kriging) <- bootstrap_names$id
  parallel <- F

  if (parallel == T) {
    if (type == 'foreach') {
      detectCores()
      cl <- detectCores() - NU_cores

      doParallel::registerDoParallel(cl)
      kriging_domain_results <- foreach(id = list_of_domains_by_id,
                                 .combine = list,
                                 .export = c('kriging', 'bootstrap_subdomains','kriging_domains_aggregation','no.assg.grid'),
                                 .packages = c('dplyr',"magrittr"),
                                 .multicombine = T,
                                 .maxcombine = length(bootstrap_names$id))%dopar%{
                                   return(kriging_domain_aggregation(id, bootstrap_subdomains, kriging, no.assg.grid ))
                                 }

      doParallel::stopImplicitCluster()
    }
    else {
      plan(multiprocess)
      kriging_domain_results <- furrr::future_map(list_of_domains_by_id,
                                             function(id){
                                               kriging_domain_aggregation(id, bootstrap_subdomains, kriging, no.assg.grid )
                                             }
                                           )

    }
  }

  if (parallel == F) {
    kriging_domain_results <- purrr::map(list_of_subdomains_by_bootstrap_iteration,
                                    function(id){
                                      kriging_domain_aggregation(id, bootstrap_subdomains, kriging, no.assg.grid )
                                    }
                                  )

  }



  aggregated_results <- get_aggregated_results(kriging_domain_results)

  fpred.d <- t(clr2density.mv(t(aggregated_results), t, t_step))
  mean.field  <- t(mean.nr(t(fpred.d), t, t_step))

  kriging_variance <- get_kriging_variance(kriging_domain_results)
  bootstrap_variance <- get_bootstrap_variance(kriging_domain_results, aggregated_results, t_step = t_step,B = b)

  return(list(kriging_domain_res = kriging_domain_results,
              fpred = fpred.d,
              mean.field = mean.field,
              kriging_variance = kriging_variance,
              bootstrap_variance = bootstrap_variance,
              centers_matrix = centers_matrix))

}

cv_kriging_long <- function( data, type_cv = 'LOO', k_fold = 1, grid, border.length, model, t_step,
                              sill_ini = NULL, range_ini = NULL,
                              nugget_ini = NULL,
                              nk_min = 2,  b = 10, K = 8, parallel = T,
                              NU_cores = 0, max_triangle_area = 0.01, SuppresMes = F, tol = 1e6) {
  # Long Version of kriging get_rdd_kriging_general
  # based on

  if (type_cv == 'LOO') {
   lapply((border.length + 1):nrow(data), function(row_id) {
     return(get_rdd_kriging_general(data[-row_id, ], rbind(grid, data[row_id, 1:2]), border.length, model, t_step,
                      sill_ini, range_ini,  nugget_ini ,
                      nk_min,  b, K, parallel,
                      NU_cores, max_triangle_area, SuppresMes, tol))

   })
  }
  if (type_cv == 'k_fold') {
    temp_vec <- cut(sample((border.length + 1):nrow(data)), k_fold, labels = F)
    temp_list <- lapply(unique(temp_vec), function(x){which(temp_vec == x)})
    return(
        list(lapply(temp_list, function(row_id) {
          get_rdd_kriging_general(data[-row_id, ], rbind(grid, data[row_id, 1:2]), model, border.length, t_step,
                                sill_ini, range_ini,  nugget_ini ,
                              nk_min,  b, K, parallel,
                              NU_cores, max_triangle_area, SuppresMes, tol)
            }),
            temp_list
          )
    )
  }
}
alexdidkovskyi/RDDOOK3 documentation built on Oct. 16, 2019, 1:35 p.m.