Nothing
#' @title ClusterFunctions for Docker
#'
#' @description
#' Cluster functions for Docker/Docker Swarm (\url{https://docs.docker.com/engine/swarm/}).
#'
#' The \code{submitJob} function executes
#' \code{docker [docker.args] run --detach=true [image.args] [resources] [image] [cmd]}.
#' Arguments \code{docker.args}, \code{image.args} and \code{image} can be set on construction.
#' The \code{resources} part takes the named resources \code{ncpus} and \code{memory}
#' from \code{\link{submitJobs}} and maps them to the arguments \code{--cpu-shares} and \code{--memory}
#' (in Megabytes). The resource \code{threads} is mapped to the environment variables \dQuote{OMP_NUM_THREADS}
#' and \dQuote{OPENBLAS_NUM_THREADS}.
#' To reliably identify jobs in the swarm, jobs are labeled with \dQuote{batchtools=[job.hash]} and named
#' using the current login name (label \dQuote{user}) and the job hash (label \dQuote{batchtools}).
#'
#' \code{listJobsRunning} uses \code{docker [docker.args] ps --format=\{\{.ID\}\}} to filter for running jobs.
#'
#' \code{killJobs} uses \code{docker [docker.args] kill [batch.id]} to filter for running jobs.
#'
#' These cluster functions use a \link{Hook} to remove finished jobs before a new submit and every time the \link{Registry}
#' is synchronized (using \code{\link{syncRegistry}}).
#' This is currently required because docker does not remove terminated containers automatically.
#' Use \code{docker ps -a --filter 'label=batchtools' --filter 'status=exited'} to identify and remove terminated
#' containers manually (or usa a cron job).
#'
#' @param image [\code{character(1)}]\cr
#' Name of the docker image to run.
#' @param docker.args [\code{character}]\cr
#' Additional arguments passed to \dQuote{docker} *before* the command (\dQuote{run}, \dQuote{ps} or \dQuote{kill}) to execute (e.g., the docker host).
#' @param image.args [\code{character}]\cr
#' Additional arguments passed to \dQuote{docker run} (e.g., to define mounts or environment variables).
#' @inheritParams makeClusterFunctions
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsDocker = function(image, docker.args = character(0L), image.args = character(0L), scheduler.latency = 1, fs.latency = 65) { # nocov start
assertString(image)
assertCharacter(docker.args, any.missing = FALSE)
assertCharacter(image.args, any.missing = FALSE)
user = Sys.info()["user"]
submitJob = function(reg, jc) {
assertRegistry(reg, writeable = TRUE)
assertClass(jc, "JobCollection")
assertIntegerish(jc$resources$ncpus, lower = 1L, any.missing = FALSE, .var.name = "resources$ncpus")
assertIntegerish(jc$resources$memory, lower = 1L, any.missing = FALSE, .var.name = "resources$memory")
timeout = if (is.null(jc$resources$walltime)) character(0L) else sprintf("timeout %i", asInt(jc$resources$walltime, lower = 0L))
cmd = c("docker", docker.args, "run", "--detach=true", image.args,
sprintf("-e DEBUGME='%s'", Sys.getenv("DEBUGME")),
sprintf("-e OMP_NUM_THREADS=%i", jc$resources$omp.threads %??% jc$resources$threads),
sprintf("-e OPENBLAS_NUM_THREADS=%i", jc$resources$blas.threads %??% jc$resources$threads),
sprintf("-e MKL_NUM_THREADS=%i", jc$resources$blas.threads %??% jc$resources$threads),
sprintf("-c %i", jc$resources$ncpus),
sprintf("-m %im", jc$resources$memory),
sprintf("--memory-swap %im", jc$resources$memory),
sprintf("--label batchtools=%s", jc$job.hash),
sprintf("--label user=%s", user),
sprintf("--name=%s_bt_%s", user, jc$job.hash),
image, timeout, "Rscript", stri_join("-e", shQuote(sprintf("batchtools::doJobCollection('%s', '%s')", jc$uri, jc$log.file)), sep = " "))
res = runOSCommand(cmd[1L], cmd[-1L])
if (res$exit.code > 0L) {
housekeeping(reg)
no.res.msg = "no resources available"
if (res$exit.code == 1L && any(stri_detect_fixed(res$output, no.res.msg)))
return(makeSubmitJobResult(status = 1L, batch.id = NA_character_, msg = no.res.msg))
return(cfHandleUnknownSubmitError(stri_flatten(cmd, " "), res$exit.code, res$output))
} else {
if (length(res$output != 1L)) {
matches = which(stri_detect_regex(res$output, "^[[:alnum:]]{64}$"))
if (length(matches) != 1L)
stopf("Command '%s' did not return a long UUID identitfier", stri_flatten(cmd, " "))
res$output = res$output[matches]
}
return(makeSubmitJobResult(status = 0L, batch.id = stri_sub(res$output, 1L, 12L)))
}
}
listJobs = function(reg, filter = character(0L)) {
assertRegistry(reg, writeable = FALSE)
# use a workaround for DockerSwarm: docker ps does not list all jobs correctly, only
# docker inspect reports the status correctly
args = c(docker.args, "ps", "--format={{.ID}}", "--filter 'label=batchtools'", filter)
res = runOSCommand("docker", args)
if (res$exit.code > 0L)
OSError("Listing of jobs failed", res)
if (length(res$output) == 0L || !nzchar(res$output))
return(character(0L))
res$output
}
housekeeping = function(reg, ...) {
batch.ids = chintersect(listJobs(reg, "--filter 'status=exited'"), reg$status$batch.id)
if (length(batch.ids) > 0L)
runOSCommand("docker", c(docker.args, "rm", batch.ids))
invisible(TRUE)
}
killJob = function(reg, batch.id) {
assertRegistry(reg, writeable = TRUE)
assertString(batch.id)
cfKillJob(reg, "docker", c(docker.args, "kill", batch.id))
}
listJobsRunning = function(reg) {
assertRegistry(reg, writeable = FALSE)
listJobs(reg, sprintf("--filter 'user=%s'", user))
}
makeClusterFunctions(name = "Docker", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning,
store.job.collection = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency,
hooks = list(post.submit = housekeeping, post.sync = housekeeping))
} # nocov end
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.