#' Detect number of free cores (on UNIX)
#'
#' @description Read \code{/proc/loadavg} and subtract from the number of cores
#' @param machine character, e.g. "user@@localhost".
#' @export
detectFreeCores <- function(machine = NULL) {
if (!is.null(machine)) {
output <- lapply(machine, function(m) {
occupied <- as.numeric(strsplit(system(paste("ssh", m, "cat /proc/loadavg"), intern = TRUE), split = " ", fixed = TRUE)[[1]][1])
nCores <- as.numeric(system(paste("ssh", m, "nproc"), intern = TRUE))
free <- max(c(0, round(nCores - occupied)))
list(free, nCores, occupied)
})
freeCores <- unlist(lapply(output, function(o) o[[1]]))
attr(freeCores, "ncores") <- unlist(lapply(output, function(o) o[[2]]))
attr(freeCores, "used") <- unlist(lapply(output, function(o) o[[3]]))
} else {
occupied <- as.numeric(strsplit(system("cat /proc/loadavg", intern = TRUE), split = " ", fixed = TRUE)[[1]][1])
nCores <- as.numeric(system("nproc", intern = TRUE))
freeCores <- max(c(0, round(nCores - occupied)))
attr(freeCores, "ncores") <- nCores
attr(freeCores, "used") <- occupied
}
return(freeCores)
}
#' Run an R expression in the background (only on UNIX)
#'
#' @description Generate an R code of the expression that is copied via \code{scp}
#' to any machine (ssh-key needed). Then collect the results.
#' @details \code{runbg()} generates a workspace from the \code{input} argument
#' and copies the workspace and all C files or .so files to the remote machines via
#' \code{scp}. This will only work if *an ssh-key had been generated and added
#' to the authorized keys on the remote machine*. The
#' code snippet, i.e. the \code{...} argument, can include several intermediate results
#' but only the last call which is not redirected into a variable is returned via the
#' variable \code{.runbgOutput}, see example below.
#' @param ... Some R code
#' @param filename Character, defining the filename of the temporary file. Random
#' file name ist chosen if NULL.
#' @param machine Character vector, e.g. \code{"localhost"} or \code{"knecht1.fdm.uni-freiburg.de"}
#' or \code{c(localhost, localhost)}.
#' @param input Character vector, the objects in the workspace that are stored
#' into an R data file and copied to the remove machine.
#' @param compile Logical. If \code{TRUE}, C files are copied and compiled on the remote machine.
#' Otherwise, the .so files are copied.
#' @param wait Logical. Wait until executed. If \code{TRUE}, the code checks if the result file
#' is already present in which case it is loaded. If not present, \code{runbg()} starts, produces
#' the result and loads it as \code{.runbgOutput} directly into the workspace. If \code{wait = FALSE},
#' \code{runbg()} starts in the background and the result is only loaded into the workspace
#' when the \code{get()} function is called, see Value section.
#' @param recover Logical. This option is useful to recover the three functions check(), get() and purge(),
#' e.g. when a session has crashed. Then, the three functions are recreated without restarting the job.
#' They can then be used to get the results of a job wihtout having to do it manually.
#' Requires the correct filename, so if the previous runbg was run with filename = NULL, you have to
#' specify the tmp_filename manually.
#'
#' @return List of functions \code{check}, \code{get()} and \code{purge()}.
#' \code{check()} checks, if the result is ready.
#' \code{get()} copies the result file
#' to the working directory and loads it into the workspace as an object called \code{.runbgOutput}.
#' This object is a list named according to the machines that contains the results returned by each
#' machine.
#' \code{purge()} deletes the temporary folder
#' from the working directory and the remote machines.
#' @export
#' @examples
#' \dontrun{
#' out_job1 <- runbg({
#' M <- matrix(rnorm(1e2), 10, 10)
#' solve(M)
#' }, machine = c("localhost", "localhost"), filename = "job1")
#' out_job1$check()
#' out_job1$get()
#' result <- .runbgOutput
#' print(result)
#' out_job1$purge()
#' }
#' \dontrun{
#' #' Recover a runbg job with the option "recover"
#' out_job1 <- runbg({
#' M <- matrix(rnorm(1e2), 10, 10)
#' solve(M)
#' }, machine = c("localhost", "localhost"), filename = "job1")
#' Sys.sleep(1)
#' remove(out_job1)
#' try(out_job1$check())
#' out_job1 <- runbg({
#' "This code is not run"
#' }, machine = c("localhost", "localhost"), filename = "job1", recover = T)
#' out_job1$get()
#' result <- .runbgOutput
#' print(result)
#' out_job1$purge()
#' }
runbg <- function(..., machine = "localhost", filename = NULL, input = ls(.GlobalEnv), compile = FALSE, wait = FALSE, recover = F) {
expr <- as.expression(substitute(...))
nmachines <- length(machine)
# Set file name
if (is.null(filename))
filename <- paste0("tmp_", paste(sample(c(0:9, letters), 5, replace = TRUE), collapse = ""))
filename0 <- filename
filename <- paste(filename, 1:nmachines, sep = "_")
# Initialize output
out <- structure(vector("list", 3), names = c("check", "get", "purge"))
# Check
out[[1]] <- function() {
check.out <- sapply(1:nmachines, function(m) length(suppressWarnings(
system(paste0("ssh ", machine[m], " ls ", filename[m], "_folder/ | grep -x ", filename[m], "_result.RData"),
intern = TRUE))))
if (all(check.out) > 0) {
cat("Result is ready!\n")
return(TRUE)
}
else if (any(check.out) > 0) {
cat("Result from machines", paste(which(check.out > 0), collapse = ", "), "are ready.")
return(FALSE)
}
else if (all(check.out) == 0) {
cat("Not ready!\n")
return(FALSE)
}
}
# Get
out[[2]] <- function() {
result <- structure(vector(mode = "list", length = nmachines), names = machine)
for (m in 1:nmachines) {
.runbgOutput <- NULL
system(paste0("scp ", machine[m], ":", filename[m], "_folder/", filename[m], "_result.RData ./"), ignore.stdout = TRUE, ignore.stderr = TRUE)
check <- try(load(file = paste0(filename[m], "_result.RData")), silent = TRUE)
if (!inherits("try-error", check)) result[[m]] <- .runbgOutput
}
.GlobalEnv$.runbgOutput <- result
}
# Purge
out[[3]] <- function() {
for (m in 1:nmachines) {
system(paste0("ssh ", machine[m], " rm -r ", filename[m], "_folder"))
}
system(paste0("rm ", filename0, "*"))
}
# Recover the three functions check, get, purge without letting the job being evaluated again.
if (recover) return(out)
# Check if filenames exist and load last result (only if wait == TRUE)
resultfile <- paste(filename, "result.RData", sep = "_")
if (all(file.exists(resultfile)) & wait) {
for (m in 1:nmachines) {
result <- structure(vector(mode = "list", length = nmachines), names = machine)
load(file = resultfile[m])
result[[m]] <- .runbgOutput
}
.GlobalEnv$.runbgOutput <- result
return(out)
}
# Save current workspace
save(list = input, file = paste0(filename0, ".RData"), envir = .GlobalEnv)
# Get loaded packages
pack <- sapply(strsplit(search(), "package:", fixed = TRUE), function(v) v[2])
pack <- pack[!is.na(pack)]
pack <- paste(paste0("try(library(", pack, "))"), collapse = "\n")
# Define outputs
output <- ".runbgOutput"
compile.line <- NULL
if (compile)
compile.line <- paste(
"cfiles <- list.files(pattern = '.c$')",
"cppfiles <- list.files(pattern = '.cpp$')",
"filelist <- paste(paste(cfiles, collapse = ' '), paste(cppfiles, collapse = ' '))",
paste0("filename0 <- '", filename0, "'"),
"system(paste0('R CMD SHLIB ', filelist, ' -o ', filename0, '.so'))",
sep = "\n")
# Write program into character
program <- lapply(1:nmachines, function(m) paste(
pack,
paste0("setwd('~/", filename[m], "_folder')"),
"rm(list = ls())",
compile.line,
paste0("load('", filename0, ".RData')"),
#"do.call(loadDLL, lapply(lsdMod(c('parfn', 'prdfn', 'obsfn', 'objfn')), get))",
"files <- list.files(pattern = '.so')",
"for (f in files) dyn.load(f)",
#".oldobjects <- ls()",
paste0(".node <- ", m),
paste0(".runbgOutput <- try(", as.character(expr), ")"),
#".newobjects <- ls()",
paste0("save(", output ,", file = '", filename[m], "_result.RData')"),
sep = "\n"
))
# Write program code into file
for (m in 1:nmachines) cat(program[[m]], file = paste0(filename[m], ".R"))
# Copy files to temporal folder
for (m in 1:nmachines) {
system(paste0("ssh ", machine[m], " mkdir ", filename[m], "_folder/"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0("ssh ", machine[m], " rm ", filename[m], "_folder/*"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0("scp ", getwd(), "/", filename0, ".RData* ", machine[m], ":", filename[m], "_folder/"))
system(paste0("scp ", getwd(), "/", filename[m], ".R* ", machine[m], ":", filename[m], "_folder/"))
if (compile) {
system(paste0("scp ", getwd(), "/*.c ", getwd(), "/*.cpp ", machine[m], ":", filename[m], "_folder/"))
} else {
system(paste0("scp ", getwd(), "/*.so ", machine[m], ":", filename[m], "_folder/"))
}
}
# Run in background
for (m in 1:nmachines) system(paste0("ssh ", machine[m], " R CMD BATCH --vanilla ", filename[m], "_folder/", filename[m], ".R"), intern = FALSE, wait = wait)
if (wait) {
out$get()
out$purge()
} else {
return(out)
}
}
#' Run an R expression on the bwForCluster
#'
#' @description Generate an R code of the expression that is copied via \code{scp}
#' to the bwForCluster (ssh-key needed). Then collect the results.
#' @details \code{runbg()} generates a workspace from the \code{input} argument
#' and copies the workspace and all C files or .so files to the remote machines via
#' \code{scp}. This will only work if *an ssh-key had been generated and added
#' to the authorized keys on the remote machine*. The
#' code snippet, i.e. the \code{...} argument, can include several intermediate results
#' but only the last call which is not redirected into a variable is returned via the
#' variable \code{.runbgOutput}, see example below.
#' @param ... Some R code
#' @param machine e.g. \code{fr_dk846@@bwfor.cluster.uni-mannheim.de}
#' @param filename Character, defining the filename of the temporary file. Random
#' file name ist chosen if NULL.
#' @param nodes Number of nodes, e.g. 10
#' @param cores Number of cores, e.g. 16
#' @param walltime estimated runtime in the format \code{hh:mm:ss}, e.g. \code{01:30:00}.
#' Jobs with a walltime up to 30 min are sent to a quick queue. When the walltime
#' is exceeded, all jobs are automatically killed by the queue.
#' @param input Character vector, the objects in the workspace that are stored
#' into an R data file and copied to the remove machine.
#' @param compile Logical. If \code{TRUE}, C files are copied and compiled on the remote machine.
#' Otherwise, the .so files are copied.
#' @param recover Logical, If \code{TRUE}, the scripts will not be started again.
#' Can be used to get back the check and get functions for an already started process, e.g. after local session has aborted.
#' @return List of functions \code{check()}, \code{get()} and \code{purge()}.
#' \code{check()} checks, if the result is ready.
#' \code{get()} copies the result file
#' to the working directory and loads it into the workspace as an object called \code{.runbgOutput}.
#' This object is a list named according to the machines that contains the results returned by each
#' machine.
#' \code{purge()} deletes the temporary folder
#' from the working directory and the remote machines.
#' @examples
#' \dontrun{
#' out_job1 <- runbg({
#' mstrust(obj, center, fits = 10, cores = 2)
#' },
#' machine = "bwfor", nodes = 2, cores = "2:best",
#' walltime = "00:01:00",
#' filename = "job1")
#' out_job1$check()
#' out_job1$get()
#' out_job1$purge()
#' result <- .runbgOutput
#' print(result)
#' }
#'
#' @export
runbg_bwfor <- function(..., machine, filename = NULL, nodes = 1, cores = 1, walltime = "01:00:00", input = ls(.GlobalEnv), compile = TRUE, recover = F) {
expr <- as.expression(substitute(...))
# Set file name
if (is.null(filename))
filename <- paste0("tmp_", paste(sample(c(0:9, letters), 5, replace = TRUE), collapse = ""))
filename0 <- filename
filename <- paste(filename, 1:nodes, sep = "_")
# Initialize output
out <- structure(vector("list", 3), names = c("check", "get", "purge"))
out[[1]] <- function() {
check.out <- length(suppressWarnings(
system(paste0("ssh ", machine, " ls ", filename0, "_folder/ | grep result.RData"),
intern = TRUE)))
if (check.out == nodes) {
cat("Result is ready!\n")
return(TRUE)
}
else if (check.out < nodes) {
cat("Result from", check.out, "out of", nodes, "nodes are ready.")
return(FALSE)
}
}
out[[2]] <- function() {
result <- structure(vector(mode = "list", length = nodes))
system(paste0("scp ", machine, ":", filename0, "_folder/*", "_result.RData ./"), ignore.stdout = TRUE, ignore.stderr = TRUE)
for (m in 1:nodes) {
.runbgOutput <- NULL
check <- try(load(file = paste0(filename[m], "_result.RData")), silent = TRUE)
if (!inherits("try-error", check)) result[[m]] <- .runbgOutput
}
.GlobalEnv$.runbgOutput <- result
}
out[[3]] <- function() {
system(paste0("rm ", filename0, "*"), wait = TRUE)
system(paste0("ssh ", machine, " rm -r ", filename0, "*"), wait = TRUE)
}
if(recover) return(out)
# Save current workspace
save(list = input, file = paste0(filename0, ".RData"), envir = .GlobalEnv)
# Get loaded packages
pack <- sapply(strsplit(search(), "package:", fixed = TRUE), function(v) v[2])
pack <- pack[!is.na(pack)]
pack <- paste(paste0("try(library(", pack, "))"), collapse = "\n")
# Define outputs
output <- ".runbgOutput"
# Write program into character
program <- lapply(1:nodes, function(m) {
paste(
pack,
paste0("setwd('~/", filename0, "_folder')"),
"rm(list = ls())",
"library(doParallel)",
"procs <- as.numeric(Sys.getenv('MOAB_PROCCOUNT'))",
"registerDoParallel(cores=procs)",
paste0("load('", filename0, ".RData')"),
"files <- list.files(pattern = '.so')",
"for (f in files) dyn.load(f)",
paste0(".node <- ", m),
paste0(".runbgOutput <- try(", as.character(expr), ")"),
paste0("save(", output ,", file = '", filename[m], "_result.RData')"),
sep = "\n"
)
})
# Write program code into file
for (m in 1:nodes) cat(program[[m]], file = paste0(filename[m], ".R"))
# Write job file to be called by msub
job <- lapply(1:nodes, function(m) {
paste(
"#!/bin/sh",
"########## Begin MOAB/Slurm header ##########",
"#",
"# Give job a reasonable name",
paste0("#MOAB -N ", filename[m]),
"#",
"# Request number of nodes and CPU cores per node for job",
paste0("#MOAB -l nodes=1:ppn=", cores),
"#",
"# Estimated wallclock time for job",
paste0("#MOAB -l walltime=", walltime),
"#",
"# Write standard output and errors in same file",
"#MOAB -j oe ",
"#",
"########### End MOAB header ##########",
"",
"# Setup R Environment",
"module load math/R",
"export OPENBLAS_NUM_THREADS=1",
"# Start program",
paste0("R CMD BATCH --no-save --no-restore --slave ", filename0, "_folder/", filename[m], ".R"),
sep = "\n"
)
})
# Write job file to file
for (m in 1:nodes) cat(job[[m]], file = paste0(filename[m], ".moab"))
# Copy files to temporal folder
system(paste0("ssh ", machine, " mkdir ", filename0, "_folder/"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0("ssh ", machine, " rm ", filename0, "_folder/*"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0("scp ", getwd(), "/", filename0, ".RData* ", machine, ":", filename0, "_folder/"))
system(paste0("scp ", getwd(), "/", filename0, "*.R* ", machine, ":", filename0, "_folder/"))
system(paste0("scp ", getwd(), "/", filename0, "*.moab ", machine, ":"))
if (compile) {
sourcefiles <- paste(
paste0(
filename0, "_folder/",
c(list.files(pattern = glob2rx("*.c")), list.files(pattern = glob2rx("*.cpp")))
),
collapse = " "
)
system(paste0("scp ", getwd(), "/*.c ", getwd(), "/*.cpp ", machine, ":", filename0, "_folder/"))
system(paste0("ssh ", machine, " 'module load math/R; R CMD SHLIB ", sourcefiles, " -o ", filename0, "_folder/", filename0, ".so'"))
} else {
system(paste0("scp ", getwd(), "/*.so ", machine, ":", filename0, "_folder/"))
}
# Run in background
for (m in 1:nodes) system(paste0("ssh ", machine, " msub ", filename[m], ".moab"), intern = FALSE)
return(out)
}
#' Run an R expression on the bwForCluster via sshpass
#'
#' @description Generate an R code of the expression that is copied via \code{scp}
#' to the bwForCluster. Then collect the results. ssh-key not needed. Password can be provided via an additional argument.
#' sshpass needs to be installed on your local machine.
#' @details \code{runbg()} generates a workspace from the \code{input} argument
#' and copies the workspace and all C files or .so files to the remote machines via
#' \code{scp}. This will only work if *an ssh-key had been generated and added
#' to the authorized keys on the remote machine*. The
#' code snippet, i.e. the \code{...} argument, can include several intermediate results
#' but only the last call which is not redirected into a variable is returned via the
#' variable \code{.runbgOutput}, see example below.
#' @param ... Some R code
#' @param machine e.g. \code{fr_dk846@@bwfor.cluster.uni-mannheim.de}
#' @param filename Character, defining the filename of the temporary file. Random
#' file name ist chosen if NULL.
#' @param nodes Number of nodes, e.g. 10
#' @param cores Number of cores, e.g. 16
#' @param walltime estimated runtime in the format \code{hh:mm:ss}, e.g. \code{01:30:00}.
#' Jobs with a walltime up to 30 min are sent to a quick queue. When the walltime
#' is exceeded, all jobs are automatically killed by the queue.
#' @param input Character vector, the objects in the workspace that are stored
#' into an R data file and copied to the remove machine.
#' @param compile Logical. If \code{TRUE}, C files are copied and compiled on the remote machine.
#' Otherwise, the .so files are copied.
#' @param recover Logical, If \code{TRUE}, the scripts will not be started again.
#' Can be used to get back the check and get functions for an already started process, e.g. after local session has aborted.
#' @param password Your ssh password in plain text (yes, no joke unfortunately), the password is handed over to sshpass for automatic login on the cluster.
#' @return List of functions \code{check()}, \code{get()} and \code{purge()}.
#' \code{check()} checks, if the result is ready.
#' \code{get()} copies the result file
#' to the working directory and loads it into the workspace as an object called \code{.runbgOutput}.
#' This object is a list named according to the machines that contains the results returned by each
#' machine.
#' \code{purge()} deletes the temporary folder
#' from the working directory and the remote machines.
#' @examples
#' \dontrun{
#' out_job1 <- runbg({
#' mstrust(obj, center, fits = 10, cores = 2)
#' },
#' machine = "bwfor", nodes = 2, cores = "2:best",
#' walltime = "00:01:00",
#' filename = "job1")
#' out_job1$check()
#' out_job1$get()
#' out_job1$purge()
#' result <- .runbgOutput
#' print(result)
#' }
#'
#' @export
runbg_bwfor_sshpass <- function(..., machine, filename = NULL, nodes = 1, cores = 1, walltime = "01:00:00", input = ls(.GlobalEnv), compile = TRUE, recover = F, password="'begin__end'") {
expr <- as.expression(substitute(...))
# Set file name
if (is.null(filename))
filename <- paste0("tmp_", paste(sample(c(0:9, letters), 5, replace = TRUE), collapse = ""))
filename0 <- filename
filename <- paste(filename, 1:nodes, sep = "_")
# Initialize output
out <- structure(vector("list", 3), names = c("check", "get", "purge"))
out[[1]] <- function() {
check.out <- length(suppressWarnings(
system(paste0("sshpass -p ",password, " ssh ", machine, " ls ", filename0, "_folder/ | grep result.RData"),
intern = TRUE)))
if (check.out == nodes) {
cat("Result is ready!\n")
return(TRUE)
}
else if (check.out < nodes) {
cat("Result from", check.out, "out of", nodes, "nodes are ready.")
return(FALSE)
}
}
out[[2]] <- function() {
result <- structure(vector(mode = "list", length = nodes))
system(paste0("sshpass -p ",password, " scp ", machine, ":", filename0, "_folder/*", "_result.RData ./"), ignore.stdout = TRUE, ignore.stderr = TRUE)
for (m in 1:nodes) {
.runbgOutput <- NULL
check <- try(load(file = paste0(filename[m], "_result.RData")), silent = TRUE)
if (!inherits("try-error", check)) result[[m]] <- .runbgOutput
}
.GlobalEnv$.runbgOutput <- result
}
out[[3]] <- function() {
system(paste0("rm ", filename0, "*"), wait = TRUE)
system(paste0("sshpass -p ",password, " ssh ", machine, " rm -r ", filename0, "*"), wait = TRUE)
}
if(recover) return(out)
# Save current workspace
save(list = input, file = paste0(filename0, ".RData"), envir = .GlobalEnv)
# Get loaded packages
pack <- sapply(strsplit(search(), "package:", fixed = TRUE), function(v) v[2])
pack <- pack[!is.na(pack)]
pack <- paste(paste0("try(library(", pack, "))"), collapse = "\n")
# Define outputs
output <- ".runbgOutput"
# Write program into character
program <- lapply(1:nodes, function(m) {
paste(
pack,
paste0("setwd('~/", filename0, "_folder')"),
"rm(list = ls())",
"library(doParallel)",
"procs <- as.numeric(Sys.getenv('MOAB_PROCCOUNT'))",
"registerDoParallel(cores=procs)",
paste0("load('", filename0, ".RData')"),
"files <- list.files(pattern = '.so')",
"for (f in files) dyn.load(f)",
paste0(".node <- ", m),
paste0(".runbgOutput <- try(", as.character(expr), ")"),
paste0("save(", output ,", file = '", filename[m], "_result.RData')"),
sep = "\n"
)
})
# Write program code into file
for (m in 1:nodes) cat(program[[m]], file = paste0(filename[m], ".R"))
# Write job file to be called by msub
job <- lapply(1:nodes, function(m) {
paste(
"#!/bin/sh",
"########## Begin MOAB/Slurm header ##########",
"#",
"# Give job a reasonable name",
paste0("#MOAB -N ", filename[m]),
"#",
"# Request number of nodes and CPU cores per node for job",
paste0("#MOAB -l nodes=1:ppn=", cores),
"#",
"# Estimated wallclock time for job",
paste0("#MOAB -l walltime=", walltime),
"#",
"# Write standard output and errors in same file",
"#MOAB -j oe ",
"#",
"########### End MOAB header ##########",
"",
"# Setup R Environment",
"module load math/R",
"export OPENBLAS_NUM_THREADS=1",
"# Start program",
paste0("R CMD BATCH --no-save --no-restore --slave ", filename0, "_folder/", filename[m], ".R"),
sep = "\n"
)
})
# Write job file to file
for (m in 1:nodes) cat(job[[m]], file = paste0(filename[m], ".moab"))
# Copy files to temporal folder
system(paste0("sshpass -p ",password, " ssh ", machine, " mkdir ", filename0, "_folder/"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0("sshpass -p ",password, " ssh ", machine, " rm ", filename0, "_folder/*"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0("sshpass -p ",password, " scp ", getwd(), "/", filename0, ".RData* ", machine, ":", filename0, "_folder/"))
system(paste0("sshpass -p ",password, " scp ", getwd(), "/", filename0, "*.R* ", machine, ":", filename0, "_folder/"))
system(paste0("sshpass -p ",password, " scp ", getwd(), "/", filename0, "*.moab ", machine, ":"))
if (compile) {
sourcefiles <- paste(
paste0(
filename0, "_folder/",
c(list.files(pattern = glob2rx("*.c")), list.files(pattern = glob2rx("*.cpp")))
),
collapse = " "
)
system(paste0("sshpass -p ",password, " scp ", getwd(), "/*.c ", getwd(), "/*.cpp ", machine, ":", filename0, "_folder/"))
system(paste0("sshpass -p ",password, " ssh ", machine, " 'module load math/R; R CMD SHLIB ", sourcefiles, " -o ", filename0, "_folder/", filename0, ".so'"))
} else {
system(paste0("sshpass -p ",password, " scp ", getwd(), "/*.so ", machine, ":", filename0, "_folder/"))
}
# Run in background
for (m in 1:nodes) system(paste0("sshpass -p ",password, " ssh ", machine, " msub ", filename[m], ".moab"), intern = FALSE)
return(out)
}
#' Run an R expression on the bwForCluster via sshpass and slurm
#'
#' @description Generate an R code of the expression that is copied via \code{scp}
#' to the bwForCluster. Then collect the results. ssh-key not needed. Password can be provided via an additional argument.
#' sshpass needs to be installed on your local machine.
#' @details \code{runbg()} generates a workspace from the \code{input} argument
#' and copies the workspace and all C files or .so files to the remote machines via
#' \code{scp}. This will only work if *an ssh-key had been generated and added
#' to the authorized keys on the remote machine*. The
#' code snippet, i.e. the \code{...} argument, can include several intermediate results
#' but only the last call which is not redirected into a variable is returned via the
#' variable \code{.runbgOutput}, see example below.
#' @param ... Some R code
#' @param machine e.g. \code{fr_dk846@@bwfor.cluster.uni-mannheim.de}
#' @param filename Character, defining the filename of the temporary file. Random
#' file name ist chosen if NULL. Must not contain the string "Minus".
#' @param nodes Number of nodes, e.g. 10
#' @param cores Number of cores, e.g. 16
#' @param partition character, the partition where to start the job
#' @param walltime estimated runtime in the format \code{hh:mm:ss}, e.g. \code{01:30:00}.
#' Jobs with a walltime up to 30 min are sent to a quick queue. When the walltime
#' is exceeded, all jobs are automatically killed by the queue.
#' @param input Character vector, the objects in the workspace that are stored
#' into an R data file and copied to the remove machine.
#' @param compile Logical. If \code{TRUE}, C files are copied and compiled on the remote machine.
#' Otherwise, the .so files are copied.
#' @param recover Logical, If \code{TRUE}, the scripts will not be started again.
#' Can be used to get back the check and get functions for an already started process, e.g. after local session has aborted.
#' @param password Your ssh password in plain text (yes, no joke unfortunately), the password is handed over to sshpass for automatic login on the cluster. If NULL, the standard ssh/scp is used and you will be asked for your password multiple times while uploading the scripts.
#' @return List of functions \code{check()}, \code{get()} and \code{purge()}.
#' \code{check()} checks, if the result is ready.
#' \code{get()} copies the result file
#' to the working directory and loads it into the workspace as an object called \code{.runbgOutput}.
#' This object is a list named according to the machines that contains the results returned by each
#' machine.
#' \code{purge()} deletes the temporary folder
#' from the working directory and the remote machines.
#' @examples
#' \dontrun{
#' out_job1 <- runbg({
#' mstrust(obj, center, fits = 10, cores = 2)
#' },
#' machine = "bwfor", nodes = 2, cores = "2:best",
#' walltime = "00:01:00",
#' filename = "job1")
#' out_job1$check()
#' out_job1$get()
#' out_job1$purge()
#' result <- .runbgOutput
#' print(result)
#' }
#'
#' @export
runbg_bwfor_slurm <- function(..., machine, filename = NULL, nodes = 1, cores = 1, partition = "single", walltime = "01:00:00", input = ls(.GlobalEnv), compile = TRUE, recover = F, password="'begin__end'") {
if(is.null(password)){
ssh_command <- "ssh "
scp_command <- "scp "
} else {
ssh_command <- paste0("sshpass -p ", password, " ssh ")
scp_command <- paste0("sshpass -p ", password, " scp ")
}
expr <- as.expression(substitute(...))
# Set file name
if (is.null(filename))
filename <- paste0("tmp_", paste(sample(c(0:9, letters), 5, replace = TRUE), collapse = ""))
filename0 <- filename
filename <- paste(filename, 1:nodes, sep = "_")
# Initialize output
out <- structure(vector("list", 3), names = c("check", "get", "purge"))
out[[1]] <- function() {
check.out <- length(suppressWarnings(
system(paste0(ssh_command, machine, " ls ", filename0, "_folder/ | grep result.RData"),
intern = TRUE)))
if (check.out == nodes) {
cat("Result is ready!\n")
return(TRUE)
}
else if (check.out < nodes) {
cat("Result from", check.out, "out of", nodes, "nodes are ready.")
return(FALSE)
}
}
out[[2]] <- function() {
result <- structure(vector(mode = "list", length = nodes))
system(paste0(scp_command, machine, ":", filename0, "_folder/*", "_result.RData ./"), ignore.stdout = TRUE, ignore.stderr = TRUE)
for (m in 1:nodes) {
.runbgOutput <- NULL
check <- try(load(file = paste0(filename[m], "_result.RData")), silent = TRUE)
if (!inherits("try-error", check)) result[[m]] <- .runbgOutput
}
.GlobalEnv$.runbgOutput <- result
}
out[[3]] <- function() {
system(paste0("rm ", filename0, "*"), wait = TRUE)
system(paste0(ssh_command, machine, " rm -r ", filename0, "*"), wait = TRUE)
}
if(recover) return(out)
# Save current workspace
save(list = input, file = paste0(filename0, ".RData"), envir = .GlobalEnv)
# Get loaded packages
pack <- sapply(strsplit(search(), "package:", fixed = TRUE), function(v) v[2])
pack <- pack[!is.na(pack)]
pack <- paste(paste0("try(library(", pack, "))"), collapse = "\n")
# Define outputs
output <- ".runbgOutput"
# Write program into character
program <- lapply(1:nodes, function(m) {
paste(
pack,
paste0("setwd('~/", filename0, "_folder')"),
"rm(list = ls())",
"library(doParallel)",
"procs <- as.numeric(Sys.getenv('SLURM_NTASKS'))",
"registerDoParallel(cores=procs)",
paste0("load('", filename0, ".RData')"),
"files <- list.files(pattern = '.so')",
"for (f in files) dyn.load(f)",
paste0(".node <- ", m),
paste0(".runbgOutput <- try(", as.character(expr), ")"),
paste0("save(", output ,", file = '", filename[m], "_result.RData')"),
sep = "\n"
)
})
# Write program code into file
for (m in 1:nodes) cat(program[[m]], file = paste0(filename[m], ".R"))
# Write job file to be called by msub
job <- lapply(1:nodes, function(m) {
paste(
"#!/bin/sh",
"########## Begin MOAB/Slurm header ##########",
"#",
"# Give job a reasonable name",
paste0("#SBATCH -J ", filename[m]),
"#",
"# Request number of nodes and CPU cores per node for job",
paste0("#SBATCH -n ", cores),
"#",
"# Estimated wallclock time for job",
paste0("#SBATCH -t ", walltime),
"#",
"# Request correct partition",
paste0("#SBATCH --partition ", partition),
"#",
"########### End MOAB header ##########",
"",
"# Setup R Environment",
"module load math/R",
"export OPENBLAS_NUM_THREADS=1",
"# Start program",
paste0("R CMD BATCH --no-save --no-restore --slave ", filename0, "_folder/", filename[m], ".R"),
sep = "\n"
)
})
# Write job file to file
for (m in 1:nodes) cat(job[[m]], file = paste0(filename[m], ".sh"))
# Copy files to temporal folder
system(paste0(ssh_command, machine, " mkdir ", filename0, "_folder/"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0(ssh_command, machine, " rm ", filename0, "_folder/*"), ignore.stdout = TRUE, ignore.stderr = TRUE)
system(paste0(scp_command, getwd(), "/", filename0, ".RData* ", machine, ":", filename0, "_folder/"))
system(paste0(scp_command, getwd(), "/", filename0, "*.R* ", machine, ":", filename0, "_folder/"))
system(paste0(scp_command, getwd(), "/", filename0, "*.sh ", machine, ":"))
if (compile) {
sourcefiles <- paste(
paste0(
filename0, "_folder/",
c(list.files(pattern = glob2rx("*.c")), list.files(pattern = glob2rx("*.cpp")))
),
collapse = " "
)
system(paste0(scp_command, getwd(), "/*.c ", getwd(), "/*.cpp ", machine, ":", filename0, "_folder/"))
system(paste0(ssh_command, machine, " 'module load math/R; R CMD SHLIB ", sourcefiles, " -o ", filename0, "_folder/", filename0, ".so'"))
} else {
system(paste0(scp_command, getwd(), "/*.so ", machine, ":", filename0, "_folder/"))
}
# Run in background
for (m in 1:nodes) system(paste0(ssh_command, machine, " sbatch ", filename[m], ".sh"), intern = FALSE)
return(out)
}
#' Remote install dMod to a ssh-reachable host
#'
#' @description Install your local dMod version to a remote host via ssh.
#' @param sshtarget The ssh host url.
#' @param source If type = local, source must point to the source directory of
#' your dMod version. This is most probably you local dMod git repository.
#' @param type Which dMod to install. At the moment, only your local version is
#' supported.
#'
#' @author Wolfgang Mader, \email{Wolfgang.Mader@@fdm.uni-freiburg.de}
#' @importFrom utils packageVersion
#' @export
runbgInstall <- function(sshtarget, source = NULL, type = "local") {
if (type == "local") {
# Build dMod package
if (is.null(source)) {
stop("dMod source location not specified.")
}
cat("* Preparing local dMod version for remote installation:\n")
system(eval(paste("R CMD build --no-build-vignettes", source)))
# Figure out package name
dModPkg <- paste0("dMod_", packageVersion("dMod"), ".tar.gz")
# Install to remote host
cat(paste("* Installing to remote host", sshtarget, ":\n"))
system(eval(paste0("scp ", dModPkg, " ", sshtarget, ":~/")))
system(eval(paste("ssh", sshtarget, "R CMD INSTALL", dModPkg)))
unlink(dModPkg)
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.