R/development/review/parallel.R

Defines functions sp_loaded sp_fork_init splitNS splitDT clusterDT

#' HPDS Internals
#'
#' Functions to assist with distributed cluster computing and local parallelization.
#'
#' @param N Number of clusters
#' @param DT A \code{data.table} object
#' @param vec A vector or list
#'
#' @import rlang
#' @import data.table
#' @import magrittr
#' @importFrom easydata splitn
#' @importFrom packrat search_path
#'
#' @name HDPS_INTERNAL
NULL

#' @describe HDPS_INTERNAL Get packages loaded by user and visible on search path
#' @export
sp_loaded <- function(){
  spDF <- packrat::search_path()
  usrPkg <- spDF$package[spDF$lib.dir == Sys.getenv("R_LIBS_USER")]
  usrPkg
}


#' @describe HDPS_INTERNAL Initialize a fork. This function returns the path to the
#'     cluster (or forked) main directory
#' @export
sp_fork_init <- function(){
  fknam <- paste0("fk_session_", sample(1:10000, 1))
  fkdir <- paste0(tempdir(), "\\", fknam)
  stopifnot(dir.create(fkdir, recursive = TRUE))
  message("save global variable to track sessions")
  normalizePath(fkdir, "/")
}


#' @describe HDPS_INTERNAL Returns the namespace environments (repeated for n clusters), for every
#'     pkg returned by \code{sp_loaded}
#' @export
splitNS <- function(n){
  pkgnam <- sp_loaded()
  nsList <- sapply(pkgnam, rlang::ns_env)
  sapply(1:n, function(i) list(nsList))
}


#' @describe HDPS_INTERNAL Given a \code{data.table} object, this splits the data into \code{n}
#'     environments for cluster distribution
#' @export
splitDT <- function(DT, n){
  INDEX  <- easydata::splitn(1:nrow(DT), n)
  DT_env <- lapply(INDEX, function(i) rlang::new_env(data = DT[i]))
  DT_env
}


# @describe HDPS_INTERNAL Given a \code{vector} object, this splits the data into \code{n} environments
#     for cluster distribution
# @export
# splitVector <- function(DT, n){
#   INDEX  <- easydata::splitn(1:nrow(DT), n)
#   DT_env <- lapply(INDEX, function(i) rlang::new_env(data = DT[i]))
#   DT_env
# }


#' @describe HDPS_INTERNAL Given a data.table and desired number of clusters, this function does most of
#'    initialization work and returns a path to a temporary directory. NOTE: This temp directory
#'    is destroyed when temp session ends.
#' @export
clusterDT <- function(DT, n){
  envList <- splitDT(DT, n)                    # split data.table into envList
  nsList  <- splitNS(n)                        # split namespaces into identical lists also

  lapply(1:n, function(i){
    e  <- envList[[i]]                         # grab data environment for cluster i
    n  <-  nsList[[i]]                         # grab package namespaces for cluster i (same for all)

    fkPath <- fork_init()                      # start a new session directory

    dtPath <- paste0(fkPath, "/DATA.csv")      # path to data saved as csv
    nsPath <- paste0(fkPath, "/loadns")        # path to ns binary file

    DT <- as.list.environment(e) %$% setDT     # extract data from env and set as DT

    fwrite(DT, dtPath, sep = ",", nThread = n) # save DT as csv with fast fwrite
    saveRDS(ns, nsPath)                        # save namespaces as binary file
    return(fkPath)                             # return the directory path
  })
}





# env <- DATA %))% FUN
bfatemi/ninjar documentation built on Sept. 8, 2019, 7:37 p.m.