#' HPDS Internals
#'
#' Functions to assist with distributed cluster computing and local parallelization.
#'
#' @param N Number of clusters
#' @param DT A \code{data.table} object
#' @param vec A vector or list
#'
#' @import rlang
#' @import data.table
#' @import magrittr
#' @importFrom easydata splitn
#' @importFrom packrat search_path
#'
#' @name HDPS_INTERNAL
NULL
#' @describe HDPS_INTERNAL Get packages loaded by user and visible on search path
#' @export
sp_loaded <- function(){
spDF <- packrat::search_path()
usrPkg <- spDF$package[spDF$lib.dir == Sys.getenv("R_LIBS_USER")]
usrPkg
}
#' @describe HDPS_INTERNAL Initialize a fork. This function returns the path to the
#' cluster (or forked) main directory
#' @export
sp_fork_init <- function(){
fknam <- paste0("fk_session_", sample(1:10000, 1))
fkdir <- paste0(tempdir(), "\\", fknam)
stopifnot(dir.create(fkdir, recursive = TRUE))
message("save global variable to track sessions")
normalizePath(fkdir, "/")
}
#' @describe HDPS_INTERNAL Returns the namespace environments (repeated for n clusters), for every
#' pkg returned by \code{sp_loaded}
#' @export
splitNS <- function(n){
pkgnam <- sp_loaded()
nsList <- sapply(pkgnam, rlang::ns_env)
sapply(1:n, function(i) list(nsList))
}
#' @describe HDPS_INTERNAL Given a \code{data.table} object, this splits the data into \code{n}
#' environments for cluster distribution
#' @export
splitDT <- function(DT, n){
INDEX <- easydata::splitn(1:nrow(DT), n)
DT_env <- lapply(INDEX, function(i) rlang::new_env(data = DT[i]))
DT_env
}
# @describe HDPS_INTERNAL Given a \code{vector} object, this splits the data into \code{n} environments
# for cluster distribution
# @export
# splitVector <- function(DT, n){
# INDEX <- easydata::splitn(1:nrow(DT), n)
# DT_env <- lapply(INDEX, function(i) rlang::new_env(data = DT[i]))
# DT_env
# }
#' @describe HDPS_INTERNAL Given a data.table and desired number of clusters, this function does most of
#' initialization work and returns a path to a temporary directory. NOTE: This temp directory
#' is destroyed when temp session ends.
#' @export
clusterDT <- function(DT, n){
envList <- splitDT(DT, n) # split data.table into envList
nsList <- splitNS(n) # split namespaces into identical lists also
lapply(1:n, function(i){
e <- envList[[i]] # grab data environment for cluster i
n <- nsList[[i]] # grab package namespaces for cluster i (same for all)
fkPath <- fork_init() # start a new session directory
dtPath <- paste0(fkPath, "/DATA.csv") # path to data saved as csv
nsPath <- paste0(fkPath, "/loadns") # path to ns binary file
DT <- as.list.environment(e) %$% setDT # extract data from env and set as DT
fwrite(DT, dtPath, sep = ",", nThread = n) # save DT as csv with fast fwrite
saveRDS(ns, nsPath) # save namespaces as binary file
return(fkPath) # return the directory path
})
}
# env <- DATA %))% FUN
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.