# ******************** Constructors ********************
# Abstract base class constructor for general workers.
#
# @param ssh [\code{logical(1)})]\cr
# Use ssh for remote node?
# @param nodename [\code{character(1)}]\cr
# Host name of node.
# @param ssh.cmd [\code{character(1)})]\cr
# CLI command to ssh into remote node.
# @param ssh.args [\code{character}]\cr
# CLI args for \code{ssh.cmd}.
# @param rhome [\code{character(1)}]\cr
# Path to R installation on worker.
# \dQuote{} means R installation on the PATH is used.
# @param r.options [\code{character}]
# Options for R and Rscript, one option per element of the vector,
# a la \dQuote{--vanilla}.
# @param script [\code{character(1)}]\cr
# Path to helper script on worker.
# Default means to call \code{\link{findHelperScriptLinux}}.
# @param ncpus [\code{integers(1)}]\cr
# Number of VPUs of worker.
# Default means to query the worker via \code{\link{getWorkerNumberOfCPUs}}.
# @param max.jobs [\code{integer(1)}]\cr
# Maximal number of jobs that can run concurrently for the current registry.
# Default is \code{ncpus}.
# @param max.load [\code{numeric(1)}]\cr
# Load average (of the last 5 min) at which the worker is considered occupied,
# so that no job can be submitted.
# Default is \code{ncpus-1}.
# @param nice [\code{integer(1)}]\cr
# Process priority to run R with set via nice. Integers between -20 and 19 are allowed.
# If missing, processes are not nice'd and the system default applies (usually 0).
# @param classes [\code{character}]\cr
# Extra classes, more specific than dQuote{Worker}.
# Will be added to the class attribute of the object.
# @return [\code{\link{Worker}}].
makeWorker = function(ssh, nodename, ssh.cmd, ssh.args, rhome, r.options = c("--no-save", "--no-restore", "--no-init-file", "--no-site-file"),
script, ncpus, max.jobs, max.load, nice, classes) {
assertFlag(ssh)
assertString(nodename)
if (!is.null(ssh.cmd))
assertString(ssh.cmd)
if (!is.null(ssh.args))
assertCharacter(ssh.args, any.missing = FALSE)
assertString(rhome)
assertCharacter(r.options, any.missing = FALSE)
if (missing(script)) {
# FIXME: dont use linux specific in base class
script = findHelperScriptLinux(rhome, r.options, ssh, ssh.cmd, ssh.args, nodename)
} else {
assertString(script)
}
# construct object partially so we can query ncpus
w = as.environment(list(
ssh = ssh,
ssh.cmd = ssh.cmd,
ssh.args = ssh.args,
nodename = nodename,
rhome = rhome,
r.options = r.options,
script = script,
last.update = -Inf,
available = "A", # worker is available, we can submit, set in update loop in scheduleWorkerJobs.R
status = NULL))
class(w) = c(classes, "Worker")
if (missing(ncpus)) {
ncpus = getWorkerNumberOfCPUs(w)
messagef("Setting for worker %s: ncpus=%i", w$nodename, ncpus)
} else {
ncpus = asCount(ncpus)
}
if (missing(max.jobs)) {
max.jobs = ncpus
} else {
max.jobs = asCount(max.jobs)
}
if (missing(max.load)) {
max.load = max(getOption("mc.cores", parallel::detectCores()) - 1L, 1L)
} else {
assertNumber(max.load, lower = 0)
}
if (missing(nice)) {
nice = ""
} else {
nice = asInt(nice, lower = -20, upper = 19)
}
w$ncpus = ncpus
w$max.jobs = max.jobs
w$max.load = max.load
w$nice = nice
return(w)
}
# ******************** Interface definition ********************
# Return number of cores on worker.
# @param worker [\code{\link{Worker}}].
# Worker.
# @return [\code{integer(1)}].
getWorkerNumberOfCPUs = function(worker) {
UseMethod("getWorkerNumberOfCPUs")
}
# Return 4 numbers to describe worker status.
# - load average of last 1 min, as given by e.g. uptime
# - number of R processes by _all_ users
# - number of R processes by _all_ users which have a load of >= 50%
# - number of R processes by current user which match $FILEDIR/jobs in the cmd call of R
# @param worker [\code{\link{Worker}}].
# Worker.
# @param file.dir [\code{character(1)}}].
# File dir of registry.
# @return [named \code{list} of \code{numeric(1)}].
getWorkerStatus = function(worker, file.dir) {
UseMethod("getWorkerStatus")
}
# Start a job on worker, probably with R CMD BATCH.
# @param worker [\code{\link{Worker}}].
# Worker.
# @param rfile [\code{character(1)}].
# Path to R file to execute.
# @param outfile [\code{character(1)}].
# Path to log file for R process.
# @return [\code{character(1)}]. Relevant process id.
startWorkerJob = function(worker, rfile, outfile) {
UseMethod("startWorkerJob")
}
# Kill a job on worker. Really do it.
# @param worker [\code{\link{Worker}}].
# Worker.
# @param pid [\code{character(1)}].
# Process id from DB/batch.job.id to kill.
# @return Nothing.
killWorkerJob = function(worker, pid) {
UseMethod("killWorkerJob")
}
# List all jobs on worker belonging to the current registry.
# @param worker [\code{\link{Worker}}].
# Worker.
# @param file.dir [\code{character(1)}}].
# File dir of registry.
# @return [\code{character}]. Vector of process ids.
listWorkerJobs = function(worker, file.dir) {
UseMethod("listWorkerJobs")
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.