#' Description of R_batch_job R6 class
#' @importFrom lubridate period_to_seconds
#' @export
R_batch_job <- R6::R6Class("batch_job",
private = list(
# whether the batch and compute files have already been created
batch_generated = FALSE,
# absolute path to batch file
batch_file_name = NULL,
# absolute path to compute file
compute_file_name = NULL,
# absolute path to R script
r_file_name = NULL,
# field job_id the process or scheduler job id that uniquely identifies this job
job_id = NULL,
# field child_job_ids a vector of job ids generated by the submission of this job. Only relevant if this job
# submits additional jobs through the scheduler directly
child_job_ids = NULL, #currently no simple way to populate these without a messaging interface with running batch job
get_unique_file_name = function(file_name = "") {
base <- tools::file_path_sans_ext(file_name)
ext <- tools::file_ext(file_name)
file_name <- paste0(
base,
ifelse(is.null(self$job_name), "", paste0("_", self$job_name)),
".", ext
)
if (file.exists(normalizePath(file.path(self$batch_directory, file_name), mustWork = FALSE))) {
# add unique random string to script name if the file already exists
file_name <- sub(
paste0("\\.", ext, "$"),
paste0("_", sub("^/", "", tempfile(pattern = "", tmpdir = "")), ".", ext),
file_name
)
}
return(normalizePath(file.path(self$batch_directory, file_name), mustWork = FALSE))
},
# helper to generate and return the full path to the batch script
get_batch_file_name = function() {
if (is.null(private$batch_file_name)) {
private$batch_file_name <- private$get_unique_file_name("submit_batch.sh")
}
return(private$batch_file_name)
},
# helper to generate and return the full path to the compute script
get_compute_file_name = function() {
if (is.null(private$compute_file_name)) {
private$compute_file_name <- private$get_unique_file_name("batch_run.R")
}
return(private$compute_file_name)
},
# helper to generate and return the full path to the R script that is sourced
get_r_file_name = function() {
if (is.null(private$r_file_name)) {
private$r_file_name <- private$get_unique_file_name("r_source.R")
}
return(private$r_file_name)
},
# write the sbatch, pbs, or local bash script
write_batch_file = function() {
syntax <- c("#!/bin/sh")
if (self$scheduler %in% c("slurm", "sbatch")) {
mem_string <- NULL
if (!is.null(self$mem_per_cpu)) {
mem_string <- paste0("#SBATCH --mem-per-cpu=", self$mem_per_cpu)
} else if (!is.null(self$mem_total)) {
mem_string <- paste0("#SBATCH --mem=", self$mem_total)
}
job_string <- if (is.null(self$job_name)) NULL else paste("#SBATCH -J", self$job_name)
sched_string <- if (is.null(self$scheduler_options)) NULL else paste("#SBATCH", self$scheduler_options)
syntax <- c(
syntax,
paste("#SBATCH -N", self$n_nodes),
paste("#SBATCH -n", self$n_cpus),
paste("#SBATCH -t", self$wall_time),
"#SBATCH -b now+10", # add 10-second delay to job allocation so that job id can be written to script
mem_string,
job_string,
sched_string,
"",
"export JOBID=$SLURM_JOB_ID",
"cd $SLURM_SUBMIT_DIR",
self$batch_code
)
} else if (self$scheduler %in% c("torque", "qsub")) {
mem_string <- NULL
if (!is.null(self$mem_per_cpu)) {
mem_string <- paste0("#PBS -l pmem=", self$mem_per_cpu)
} else if (!is.null(self$mem_total)) {
mem_string <- paste0("#PBS -l mem=", self$mem_total)
}
job_string <- if (is.null(self$job_name)) NULL else paste("#PBS -N", substr(self$job_name, 1, 15)) # torque limits to 15 chars
sched_string <- if (is.null(self$scheduler_options)) NULL else paste("#PBS", self$scheduler_options)
syntax <- c(
syntax,
paste0("#PBS -l nodes=", self$n_nodes, ":ppn=", self$n_cpus),
paste0("#PBS -l walltime=", self$wall_time),
mem_string,
job_string,
sched_string,
"",
"export JOBID=$PBS_JOBID",
"cd $PBS_O_WORKDIR",
self$batch_code
)
} else {
# only paste in batch_code, which may contain compute environment setup statements
syntax <- c(
syntax,
"export JOBID=$$",
self$batch_code
)
}
# syntax <- c(syntax, paste("Rscript --vanilla", private$get_compute_file_name()))
syntax <- c(syntax, paste("R CMD BATCH --no-save --no-restore", private$get_compute_file_name()))
message("Writing batch script to: ", private$get_batch_file_name())
writeLines(syntax, con = private$get_batch_file_name())
},
# write code to be executed
write_compute_file = function() {
# job tracking
syntax <- "cat(paste('Job start time:', Sys.time()), '\\n')"
if (!is.null(self$sqlite_db)) syntax <- c(syntax, glue::glue("fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'STARTED')"))
if (isTRUE(self$print_session_info)) {
syntax <- c(
syntax,
"print(sessionInfo())",
"print(Sys.info())"
)
}
if (isTRUE(self$print_environment)) {
syntax <- c(
syntax,
"print(Sys.getenv())"
)
}
if (!is.null(self$r_packages)) {
syntax <- c(
syntax,
"if (!require(pacman)) { install.packages('pacman'); library(pacman) }",
paste0("pacman::p_load(", paste(self$r_packages, collapse = ", "), ")")
)
}
if (!is.null(self$input_rdata_file)) {
if (!file.exists(self$input_rdata_file)) {
# warning(sprintf("input_rdata_file %s did not exist at the time of script generation.", self$input_rdata_file))
}
syntax <- c(
syntax,
paste(glue::glue("if (file.exists('{self$input_rdata_file}'))"), "{"),
glue::glue(" load('{self$input_rdata_file}')"),
"} else {",
glue::glue(" stop('Cannot load input environment object: {self$input_rdata_file}')"),
"}"
)
}
failed_str <- if (!is.null(self$sqlite_db)) glue::glue(" fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'FAILED', cascade = TRUE)") else NULL
syntax <- c(
syntax,
glue::glue("tryCatch(source(\"{private$get_r_file_name()}\", echo=TRUE, max.deparse.length=2000),"),
" error=function(e) { ",
" print(as.character(e))",
failed_str,
" stop(e)",
"})"
)
# write R code to execute
writeLines(self$r_code, con = private$get_r_file_name())
if (!is.null(self$output_rdata_file)) {
syntax <- c(syntax, paste0("save.image(file='", self$output_rdata_file, "')"))
}
if (isTRUE(self$wait_for_children)) {
syntax <- c(
syntax,
"if (exists('child_job_ids') && inherits(child_job_ids, c('numeric', 'integer', 'character'))) {",
glue::glue(" sapply(child_job_ids, function(id) fmri.pipeline::add_tracked_job_parent(sqlite_db = '{self$sqlite_db}', job_id = id, parent_job_id = Sys.getenv('JOBID')))"),
paste0(
" success <- fmri.pipeline::wait_for_job(child_job_ids, quiet=FALSE",
", repolling_interval=", self$repolling_interval,
", max_wait=", lubridate::period_to_seconds(dhms(self$wall_time)),
", scheduler='", self$scheduler, "')"
),
if (isTRUE(self$all_children_successful)) {
glue::glue(" if (isFALSE(success)) {{ fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'FAILED', cascade = TRUE, exclude = child_job_ids); stop() }}")
} else {
NULL
},
"} else {",
" warning('Attempt to wait for child jobs failed due to non-existent or improper child_job_ids variable.')",
"}"
)
# add any post-children R code to be executed (e.g., combining outputs from the child jobs)
if (!is.null(self$post_children_r_code)) {
syntax <- c(
syntax,
self$post_children_r_code
)
}
}
syntax <- c(
syntax,
"cat(paste('Job end time:', Sys.time()), '\\n')"
)
if (!is.null(self$sqlite_db)) syntax <- c(syntax, glue::glue("fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'COMPLETED')"))
message("Writing R script to: ", private$get_compute_file_name())
writeLines(syntax, con = private$get_compute_file_name())
},
write_rdata_file = function() {
if (is.null(self$input_objects)) {
return(invisible(NULL))
} # nothing to do
self$input_rdata_file <- private$get_unique_file_name(self$input_rdata_file)
save(list = ls(all.names = TRUE, envir = self$input_objects), envir = self$input_objects, file = self$input_rdata_file)
},
# helper to gather arguments for tracking, specifically for an R batch job
gather_tracking_args = function() {
if (is.null(self$sqlite_db)) return(NULL) # return NULL if not tracking
job_obj <- if (isTRUE(self$sqlite_cache_obj)) serialize(self, NULL) else NULL
scheduler_options <- if (is.null(self$scheduler_options)) NA else paste(self$scheduler_options, collapse="\n")
tracking_args <- list(job_name = self$job_name, batch_directory = self$batch_directory,
compute_file = private$get_compute_file_name(),
code_file = private$get_r_file_name(),
n_nodes = self$n_nodes, n_cpus = self$n_cpus, wall_time = self$wall_time,
mem_per_cpu = self$mem_per_cpu, mem_total = self$mem_total,
scheduler_options = scheduler_options, job_obj = list(job_obj))
return(tracking_args)
}
),
public = list(
#' @field parent_jobs a vector of parent_jobs that are upstream of this job and may influence its execution
parent_jobs = NULL,
#' @field depends_on_parents logical or character string indicating whether this job should wait until
#' \code{parent_jobs} complete. If a character string is passed, it indicates which named elements of
#' \code{parent_jobs} must complete before this job begins.
depends_on_parents = FALSE,
#' @field job_name a user-defined name for the job used for specifying job dependencies and informative job
#' status queries on a job scheduler
job_name = NULL,
#' @field wait_for_children if TRUE, code will be inserted to wait for all jobs in a vector called
#' \code{child_job_ids} to finish before the batch exits. It's up to your code to use this variable name
wait_for_children = FALSE,
#' @field all_children_successful if TRUE, all jobs in vector \code{child_job_ids}
#' must be successful for this job to finish (see \code{wait_for_children} field)
all_children_successful = FALSE,
#' @field wall_time The amount of time requested on the job scheduler, following d-hh:mm:ss format. Defaults to
#' "4:00:00", which is 4 hours.
wall_time = "4:00:00",
#' @field n_nodes The number of nodes to be requested on the job scheduler
n_nodes = "1",
#' @field n_cpus The number of cores (aka 'cpus', ignoring hyperthreading) to be requested on the job scheduler
n_cpus = "4",
#' @field mem_total The total amount of memory (RAM) requested by the job
mem_total = "4G",
#' @field mem_per_cpu The amount of memory (RAM) requested per cpu (total = mem_per_cpu * n_cpus)
mem_per_cpu = NULL,
#' @field input_objects An environment containing all objects to be written to an RData object and passed
#' to the batch job at execution
input_objects = NULL,
#' @field input_rdata_file The name of the environment to be loaded at the beginning of the R batch prior to executing
# other code. Used to setup any local objects needed to begin computation.
input_rdata_file = NULL,
#' @field output_rdata_file The name of the environment to be saved at the end of the R batch execution, which can then
#' be loaded by subsequent jobs.
output_rdata_file = NULL,
#' @field sqlite_db File path to tracking SQLite database
sqlite_db = NULL,
#' @field batch_id Not currently used, but intended for job sequence tracking
batch_id = NULL,
#' @field batch_directory Location of batch scripts to be written
batch_directory = NULL, # needs to be somewhere that both compute nodes and login nodes can access, so /tmp is not good.
#' @field batch_code Shell code to be included in the batch script prior to the R code to be run. This can include
#' module load statements, environment variable exports, etc.
batch_code = NULL,
#' @field r_code The R code to be executed by the scheduler. This can be a character vector that includes multiple
#' R statements or an expression object containing the R code to be evaluated
r_code = NULL,
#' @field post_children_r_code The R code to be executed after child jobs have completed. This can be a character vector that includes
#' multiple R statements or an expression object containing the R code to be evaluated. Only relevant if wait_for_children = TRUE
post_children_r_code = NULL,
#' @field r_packages The R packages to be loaded into the environment before job execution. These are loaded by
#' pacman::p_load, which will install any missing packages before attempting to load
r_packages = NULL,
#' @field scheduler The job scheduler to be used for this batch. Options are: "slurm", "torque", or "local".
scheduler = "slurm",
#' @field scheduler_options An optional character vector of scheduler arguments to be included in the batch script header
#' that control additional features such as job emails or group permissions. These directives are added with #SBATCH
#' or #PBS headings, depending on the scheduler, and are ignored if the scheduler is "local".
scheduler_options = NULL,
#' @field repolling_interval The number of seconds to wait between successive checks on whether parent jobs have completed.
#' This is mostly relevant to the 'local' scheduler.
repolling_interval = 300, # seconds
#' @field print_session_info If TRUE, print the `sessionInfo()` and `Sys.info()` when the job starts. Useful for debugging
#' problems with the compute environment or R installation.
print_session_info = TRUE,
#' @field print_environment If `TRUE`, print the `Sys.getenv()` when the job starts. This can produce a lot of output,
#' but can be useful if certain environment variables are not being found when your job runs, leading it to fail.
print_environment = FALSE,
#' @field sqlite_cache_obj If `TRUE`, copy this object into the SQLite database at submission time. This is mostly useful for
#' detailed debugging and is not generally recommended because it can increase the size of the database considerably
sqlite_cache_obj = FALSE,
#' @description Create a new R_batch_job object for execution on an HPC cluster
#' @param batch_directory The location of batch scripts to be generated
#' @param parent_jobs A vector of one or more job ids that are parents to this job. This can be a named vector, to
#' be used in conjunction with \code{depends_on_parents} to specify which parent jobs must be completed before this
#' job begins.
#' @param job_name The name of the job used in dependency specification and job scheduler naming
#' @param n_nodes The number of compute nodes to be requested on the scheduler
#' @param n_cpus The number of cpus to be requested on the scheduler
#' @param wall_time The compute time requested on the cluster dd-HH:MM:SS
#' @param mem_per_cpu The amount of memory to be requested per cpu
#' @param mem_total The total amount of memory to requested by the job
#' @param batch_id The batch id (not currently used)
#' @param r_code A character vector or expression containing R code to be executed
#' @param r_script The path to an R script to be executed by the batch (mutually exclusive with \code{r_code}).
#' @param post_children_r_code A character vector of R code to be executed after waiting for child jobs finishes
#' @param batch_code A character vector of code to be included in the batch script for job scheduling
#' @param r_packages A character vector of R packages to be loaded when compute script runs
#' @param scheduler The scheduler to be used for this compute. Options are 'slurm', 'torque', or 'local'.
#' @param wait_for_children If TRUE, do not end this job until all child jobs have completed
#' @param all_children_successful If TRUE, don't count this job as successful unless all child jobs are successful
#' @param input_rdata_file The name of the environment to be loaded at the beginning of the R batch prior to executing code
#' @param input_objects A list object in the current execution environment to be cached and used as input to the R batch.
#' This is mutually exclusive with input_rdata_file at present.
#' @param output_rdata_file The name of the environment to be saved at the end of the R batch execution
#' @param sqlite_db The location of the SQLite database to be used for job tracking. If `NULL`, job tracking will be disabled.
#' @param scheduler_options A character vector of scheduler options to be added to the header of the batch script
#' @param repolling_interval The number of seconds to wait before rechecking whether parent jobs have completed
#' @param print_session_info If TRUE (default), print information about the R environment `sessionInfo()` and
#' compute environment `Sys.info()` when the job starts.
#' @param print_environment If TRUE, print the session environment via `Sys.getenv()` when the job starts. Default: FALSE.
initialize = function(batch_directory = NULL, parent_jobs = NULL, job_name = NULL, n_nodes = NULL, n_cpus = NULL,
wall_time = NULL, mem_per_cpu = NULL, mem_total = NULL, batch_id = NULL, r_code = NULL, r_script = NULL,
post_children_r_code = NULL, batch_code = NULL, r_packages = NULL, scheduler = NULL, wait_for_children = NULL,
all_children_successful = NULL, input_rdata_file = NULL, input_objects = NULL, output_rdata_file = NULL, sqlite_db = NULL,
scheduler_options = NULL, repolling_interval = NULL, print_session_info = TRUE, print_environment = FALSE) {
if (!is.null(batch_directory)) {
self$batch_directory <- batch_directory
} else {
hostname <- Sys.info()["nodename"]
user <- Sys.info()["user"]
if (isTRUE(grepl("longleaf", hostname))) {
self$batch_directory <- glue("/work/users/{substr(user, 1, 1)}/{substr(user, 2, 2)}/{user}")
message(glue("On Longleaf, defaulting to batch_directory on the scratch system: {self$batch_directory}"))
} else {
self$batch_directory <- normalizePath(("~/"))
message(glue("Defaulting batch_directory to your home directory: {self$batch_directory}"))
}
}
if (!is.null(parent_jobs)) self$parent_jobs <- parent_jobs
if (!is.null(job_name)) self$job_name <- as.character(job_name)
if (!is.null(n_nodes)) self$n_nodes <- as.character(n_nodes)
if (!is.null(n_cpus)) self$n_cpus <- as.character(n_cpus)
if (is.null(wall_time)) {
message("Using default wall_time of: ", self$wall_time)
} else {
self$wall_time <- as.character(wall_time)
}
if (!is.null(mem_per_cpu)) {
checkmate::assert_string(mem_per_cpu)
self$mem_per_cpu <- mem_per_cpu
}
if (!is.null(mem_total)) {
checkmate::assert_string(mem_total)
self$mem_total <- mem_total
}
if (!is.null(batch_id)) self$batch_id <- as.character(batch_id)
if (!is.null(r_script)) {
if (!is.null(r_code)) {
stop("Both r_script and r_code provided to R_batch_job. These are mutually exclusive!")
}
checkmate::assert_file_exists(r_script)
r_code <- readLines(r_script) # read in r_code from external script
}
if (is.null(r_code)) {
stop("Unable to initialize R_batch_job object without r_code")
} else {
checkmate::assert_multi_class(r_code, c("expression", "character"))
if (is.expression(r_code)) { # expand r_code expression as character vector
r_code <- capture.output(cat(as.character(r_code), sep = "\n"))
}
self$r_code <- r_code
}
if (!is.null(batch_code)) {
checkmate::assert_character(batch_code)
self$batch_code <- batch_code
}
if (!is.null(r_packages)) {
checkmate::assert_character(r_packages)
self$r_packages <- r_packages
}
checkmate::assert_subset(scheduler, c("torque", "qsub", "slurm", "sbatch", "sh", "local"))
if (!is.null(scheduler)) self$scheduler <- scheduler
if (!is.null(input_objects)) {
if (!is.null(input_rdata_file)) {
stop("At present, you cannot specify both input_rdata_file and input_objects as inputs.")
}
checkmate::assert_multi_class(input_objects, c("list", "environment"))
if (checkmate::test_list(input_objects)) {
if (is.null(names(input_objects))) {
stop("For list input_objects input, elements of the list must be named.")
}
input_objects <- as.environment(input_objects) # always convert to environment for type consistency
}
self$input_objects <- input_objects # store objects internally for output when write_compute_file is called
self$input_rdata_file <- "R_batch_job_environment.RData"
}
if (!is.null(input_rdata_file)) self$input_rdata_file <- input_rdata_file
if (!is.null(output_rdata_file)) self$output_rdata_file <- output_rdata_file
if (!is.null(sqlite_db)) self$sqlite_db <- sqlite_db
if (!is.null(scheduler_options)) {
checkmate::assert_character(scheduler_options)
self$scheduler_options <- scheduler_options
}
if (!is.null(repolling_interval)) {
checkmate::assert_number(repolling_interval, lower = 0.1, upper = 2e5)
self$repolling_interval <- repolling_interval
}
if (!is.null(wait_for_children)) {
checkmate::assert_logical(wait_for_children, len = 1L)
self$wait_for_children <- wait_for_children
}
if (!is.null(all_children_successful)) {
checkmate::assert_logical(all_children_successful, len = 1L)
self$all_children_successful <- all_children_successful
}
if (!is.null(post_children_r_code)) {
stopifnot(wait_for_children == TRUE) # don't let the user try it
checkmate::assert_multi_class(post_children_r_code, c("expression", "character"))
self$post_children_r_code <- as.character(post_children_r_code)
}
if (checkmate::test_logical(print_session_info, len=1L)) {
self$print_session_info <- print_session_info
}
if (checkmate::test_logical(print_environment, len=1L)) {
self$print_environment <- print_environment
}
},
#' @description Helper function that generates the batch and compute files for a job
#' @param force if TRUE, the RData, batch, and compute will be regenerated/rewritten
#' @details this is called by \code{$submit} when a job is submitted and is provided
#' here in case the user wants to generate the batch files without executing them
generate = function(force = FALSE) {
if (isFALSE(force) && isTRUE(private$batch_generated)) {
# skip out of generation if this has already completed
return(invisible(self))
}
# create batch_directory, if missing
if (!dir.exists(self$batch_directory)) dir.create(self$batch_directory, recursive = TRUE)
private$write_rdata_file() # save input_objects to file if needed
private$write_batch_file()
private$write_compute_file()
private$batch_generated <- TRUE
return(invisible(self))
},
#' @description Submit job to scheduler or local compute
submit = function() {
if (isFALSE(private$batch_generated)) self$generate()
cd <- getwd()
setwd(self$batch_directory)
batch_script <- private$get_batch_file_name()
# submit job for computation, waiting for parents if requested
wait_jobs <- NULL
if (checkmate::test_logical(self$depends_on_parents, max.len = 1) && isTRUE(self$depends_on_parents)) {
# wait on any/all parent jobs
wait_jobs <- self$parent_jobs
} else if (checkmate::test_character(self$depends_on_parents)) {
# enforce that all parent job dependencies need to be named elements of $parent_jobs
if (checkmate::test_subset(self$depends_on_parents, names(self$parent_jobs))) {
wait_jobs <- self$parent_jobs[self$depends_on_parents]
} else {
# could make this more informative
warning("Could not add parent job dependency because one or more depend_on_parents elements were not in parent_jobs")
}
}
message("Submitting job: ", self$job_name)
# TODO: if a job_id already exists and submit is called again, do we check job status, insist a 'forced' submission?
private$job_id <- fmri.pipeline::cluster_job_submit(batch_script,
scheduler = self$scheduler,
wait_jobs = wait_jobs, repolling_interval = self$repolling_interval,
tracking_sqlite_db = self$sqlite_db,
tracking_args = private$gather_tracking_args()
)
message("Job received job id: ", private$job_id)
if (!is.null(cd) && dir.exists(cd)) setwd(cd) # reset working directory (don't attempt if that directory is absent)
# return job id in case there are subsidiary scripts that depend on this
return(private$job_id)
},
#' @description Function to create a deep copy of a batch job
#'
#' @details Note that this also resets the compute_file_name and batch_file_name fields so that the
#' copied object doesn't create files that collide with the original.
#' This method also exposes a few named parameters that can be used to override the copied fields
#' with new values to avoid needing to change these one-by-one using obj$<field> <- x syntax
#' @param job_name The name of the job used in dependency specification and job scheduler naming
#' @param n_nodes The number of compute nodes to be requested on the scheduler
#' @param n_cpus The number of cpus to be requested on the scheduler
#' @param wall_time The compute time requested on the cluster dd-HH:MM:SS
#' @param mem_total The total amount of memory (RAM) requested by the job
#' @param r_code A character vector or expression containing R code to be executed
copy = function(job_name=NULL, n_nodes=NULL, n_cpus=NULL, wall_time=NULL, mem_total=NULL, r_code=NULL) {
cloned <- self$clone(deep = TRUE)
cloned$reset_file_names()
if (!is.null(job_name)) cloned$job_name <- as.character(job_name)
if (!is.null(n_nodes)) cloned$n_nodes <- as.character(n_nodes)
if (!is.null(n_cpus)) cloned$n_cpus <- as.character(n_cpus)
if (!is.null(wall_time)) cloned$wall_time <- as.character(wall_time)
if (!is.null(mem_total)) cloned$mem_total <- as.character(mem_total)
if (!is.null(r_code)) {
checkmate::assert_multi_class(r_code, c("expression", "character"))
if (is.expression(r_code)) { # expand r_code expression as character vector
r_code <- capture.output(cat(as.character(r_code), sep = "\n"))
}
cloned$r_code <- r_code
}
return(cloned)
},
#' @description helper function to reset names of compute and batch files that will be generated by this job.
#' @details This needs to be exposed as a public method for copied objects to be able to reset these private fields.
reset_file_names = function() {
private$batch_generated <- FALSE
private$batch_file_name <- NULL
private$compute_file_name <- NULL
},
#' @description Return the job id of this job (populated by job submission)
get_job_id = function() {
private$job_id
},
#' @description Return the ids of all child jobs launched by this job
get_child_ids = function() {
private$child_job_ids
}
)
)
#' Description of R_batch_sequence R6 class
#' @export
R_batch_sequence <- R6::R6Class("batch_sequence",
private = list(
# list of R_batch_job classes to be run sequentially
sequence_jobs = list()
),
public = list(
#' @description create a new R_batch_sequence object
#' @param ... One or more R_batch_job objects to be run in sequence
#' @param joblist Optional list of jobs to be used instead of ...
initialize = function(..., joblist = NULL) {
others <- list(...)
if (!is.null(joblist) && is.list(joblist)) {
all_jobs <- joblist
} else {
all_jobs <- others
}
# tolerate NULLs in inputs, which are ignored
private$sequence_jobs <- all_jobs[!sapply(all_jobs, is.null)]
sapply(private$sequence_jobs, checkmate::assert_class, "batch_job")
},
#' @description add one or more R_batch_job objects to the sequence
#' @param ... One or more R_batch_job objects to be added to sequence
add = function(...) {
others <- list(...)
others <- others[!sapply(others, is.null)] # remove NULLs
if (length(others) > 1L) {
sapply(others, checkmate::assert_class, "batch_job")
private$sequence_jobs <- c(private$sequence_jobs, others)
}
},
#' @description submit the job sequence to the scheduler or local compute
submit = function() {
job_list <- private$sequence_jobs
njobs <- length(job_list)
for (ii in seq_len(njobs)) {
this_job <- job_list[[ii]]
this_job$submit()
if (ii < njobs) {
dependent_children <- sapply(seq_along(job_list), function(jnum) {
this_job$job_name %in% job_list[[jnum]]$depends_on_parents && jnum > ii
})
if (any(dependent_children)) {
job_list[dependent_children] <- lapply(job_list[dependent_children], function(job) {
# Always add parent job id to any downstream jobs in sequence that depend on this job.
job$parent_jobs[this_job$job_name] <- this_job$get_job_id()
return(job)
})
}
}
}
return(self)
},
#' @description Calls each job's $generate() method so that scripts can be examined without running the sequence
generate = function() {
# call generation steps for each job (mostly for testing)
lapply(private$sequence_jobs, function(x) x$generate())
return(self)
},
#' @description Return a named vector of the job ids for all jobs in this sequence
get_job_ids = function() {
ids <- sapply(private$sequence_jobs, function(x) setNames(x$get_job_id(), x$job_name))
dupe_names <- duplicated(names(ids))
if (any(dupe_names)) {
nm <- paste(unique(names(ids)[dupe_names]), collapse=", ")
warning("The following sequence objects have identical names: ", nm, ". Check that you find the right id!")
}
return(ids)
}
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.