#' Submit a script as a job to the Slurm cluster
#'
#' Function contains internal defaults for R and Python shell scripts. Function
#' will build log paths automatically.
#'
#' @param script_path [chr] full path to submitted script
#' @param threads [int] cluster resource requirement
#' @param mem [chr] cluster resource requirement
#' @param runtime_min [num] cluster resource requirement
#' @param array_tasks_int [int] (default: NULL - if not NULL, array job is
#' assumed) - vector of integers for you array (e.g. c(1L:10L))
#' @param archiveTF [lgl] (default FALSE) do you need an archive node?
#' @param job_name [chr] Will be name of script if NULL
#' @param partition [chr] a.k.a. 'queue' - cluster resource requirement
#' @param account [chr] a.k.a. 'project' - cluster resource requirement
#' @param hold_for_JobIDs vector of jobids that must complete successfully
#' before running this job
#' (https://slurm.schedmd.com/sbatch.html#OPT_dependency)
#' @param language [chr] coding language for job (see valid_langs validation)
#' @param r_image [chr] (default "latest.img") e.g.
#' "/mnt/share/singularity-images/rstudio/ihme_rstudio_4214.img"
#' @param shell_script_path [path] path to shell script (language-specific)
#' @param std_err_root [chr] path for Slurm std_err logs
#' @param std_out_root [chr] path for Slurm std_out logs
#' @param console_style_log_tf [lgl] if TRUE, combine std_err and std_out into
#' one log in the std_out_root
#' @param args_list [list, chr] optional named list() of arguments, e.g.
#' list("arg1" = arg1, "arg2" = arg2)
#' @param arg_name_code_root [chr] name of an arg_list element with fullpath to
#' the repository root (if applicable) - allows submitted script to find
#' .Rprofile and .Renviron
#' @param args_include_script [lgl] if TRUE, include script_path in args_list
#' @param arg_vecs_to_comma_str [lgl] if TRUE, convert atomic elements of
#' args_list to comma-separated strings
#' @param verbose [lgl] print submission command and job_id
#' @param v_verbose [lgl] print log paths
#' @param send_email [lgl] send email on job completion?
#' @param email_address [chr] email address to send job completion notification
#' @param dry_runTF [lgl] (default FALSE) if TRUE, only message and return
#' submission command, no job submission
#'
#' @family job_submission
#' @return [int] job_id of submitted job, also messsage with job_id and job_name
#' @export
submit_job <- function(
script_path = NULL
, threads = 2L
, mem = "10G"
, runtime_min = 15L
, account = NULL
, console_style_log_tf = FALSE
, std_err_root = file.path("/mnt/share/temp/slurmoutput", Sys.info()[["user"]], "error")
, std_out_root = file.path("/mnt/share/temp/slurmoutput", Sys.info()[["user"]], "output")
, array_tasks_int = NULL
, archiveTF = TRUE
, job_name = NULL
, partition = "all.q"
, hold_for_JobIDs = NULL
, language = "R"
, r_image = NULL
, shell_script_path = NULL
, args_list = NULL
, arg_name_code_root = "code_root"
, args_include_script = TRUE
, arg_vecs_to_comma_str = TRUE
, verbose = TRUE
, v_verbose = FALSE
, send_email = FALSE
, email_address = paste0(Sys.info()[["user"]], "@uw.edu")
, dry_runTF = FALSE
) {
# Argument validation
## coding language
valid_langs <- c("r", "python")
valid_langs_msg <- paste0(valid_langs, collapse = ", ")
if(is.null(language)) stop("Input a valid language (case insensitive): ", valid_langs_msg)
language <- tolower(language)
if(!language %in% valid_langs) stop("Input a valid language (case insensitive): ", valid_langs_msg)
## others
if(is.null(script_path)) stop("Please define a valid script path to submit")
if(is.null(account)) stop("Please define a Slurm account e.g. proj_cov_vpd")
if(is.null(partition)) stop("Please define a Slurm partition e.g. all.q")
if(is.null(threads)) stop("Please define a number of threads")
if(is.null(mem)) stop("Please define a memory requirement e.g. '30G' or '300M'")
if(is.null(runtime_min)) stop("Please define a runtime requirement")
is_array_job <- ifelse(!is.null(array_tasks_int), TRUE, FALSE)
if(is_array_job) stopifnot(is.integer(array_tasks_int))
stopifnot(is.logical(console_style_log_tf))
stopifnot(is.logical(archiveTF))
stopifnot(is.logical(verbose))
stopifnot(is.logical(v_verbose))
stopifnot(is.logical(arg_vecs_to_comma_str))
stopifnot(is.logical(send_email))
stopifnot(is.logical(dry_runTF))
stopifnot(is.logical(args_include_script))
# build log folders silently (dir.create fails naturally if directory exists)
dir.create(std_err_root, recursive = TRUE, showWarnings = FALSE)
dir.create(std_out_root, recursive = TRUE, showWarnings = FALSE)
if(!is.null(hold_for_JobIDs)){
if(!is.vector(hold_for_JobIDs, mode = "integer")) stop("hold_for_JobIDs must be a simple integer vector")
}
# Define commands
if (is.null(job_name)) {
script_path_decon <- unlist(strsplit(script_path, "[/.]"))
job_name <- script_path_decon[length(script_path_decon) - 1]
}
# Code language
if(language == "r") {
if(is.null(r_image)) {
r_image_cmd <- "-i /mnt/share/singularity-images/rstudio/latest.img"
} else {
r_image_cmd <- paste0("-i ", r_image)
}
if(is.null(shell_script_path)) {
shell_script_path <- "/mnt/share/singularity-images/rstudio/shells/execRscript.sh"
}
} else if (language == "python") {
if(is.null(shell_script_path)) {
shell_script_path <- system.file("py/python_shell_slurm.sh", package = "SamsElves")
}
}
## format for scheduler
# https://slurm.schedmd.com/sbatch.html#SECTION_FILENAME-PATTERN
log_format <- ifelse(is_array_job, "%x_%A_%a", "%x_%j")
if(console_style_log_tf){
std_err_path <- std_out_path <- file.path(std_out_root, paste0(log_format, "_console.log"))
} else {
std_err_path <- file.path(std_err_root, paste0(log_format, "e.log"))
std_out_path <- file.path(std_out_root, paste0(log_format, "o.log"))
}
archive_cmd <- ifelse(archiveTF, " -C archive", "")
# keep this for downstream provenance
if (args_include_script == TRUE) {
args_list <- append(args_list, list(script_path = script_path))
}
# deal with args_list as a block
if(!is.null(args_list)){
assert_named_list(args_list)
# nulls come through as "", which the cli doesn't like
# - parse_all_named_cli_args deals with the "NULL" string
if(any(unlist(lapply(args_list, is_cli_null)))){
message(
"\nNULL and empty string ('') args will be converted to string literal 'NULL' for CLI compatibility.\n",
" - Ensure 'NULL' is parsed correctly by child scripts with SamsElves::parse_all_named_cli_args(), or by hand.\n"
)
args_list <- lapply(args_list, function(x) if (is_cli_null(x)) return("NULL") else return(x))
}
# auto-convert simple vectors to comma-separated strings
if(arg_vecs_to_comma_str) args_list <- apply_comma_string_to_list(args_list)
# don't break backward compatibility
names(args_list) <- gsub("^--", "", names(args_list))
# submit from code_root as working directory
# - ensures submitted job can find .Rprofile and .Renviron
if(isTRUE(arg_name_code_root %in% names(args_list))){
message("Found code_root in args_list, temporarily setting working directory to code_root: ", args_list[[arg_name_code_root]])
wd_current <- getwd()
on.exit(setwd(wd_current), add = TRUE)
code_root <- args_list[[arg_name_code_root]]
if(!dir.exists(code_root)) stop(sprintf("%s (%s) must be a valid directory", arg_name_code_root, code_root))
setwd(code_root)
}
# format for scheduler
names(args_list) <- paste0("--", names(args_list))
}
array_cmd_string <-
if(is_array_job){
# Build array string
array_tasks_string <-
paste0(
" --array=",
ifelse(
is_sequential_int_vec(array_tasks_int),
paste0(min(array_tasks_int), "-", max(array_tasks_int)), # concise if sequential
vec_to_comma_string(array_tasks_int) # specific integers otherwise
)
)
} else {
""
}
email_cmd_string <-
if(send_email){
paste0(" --mail-type=END --mail-user=", email_address)
} else {
""
}
# build system command string
command <- paste0(
"sbatch"
, " -J ", job_name
, "", archive_cmd
, " --mem=", mem
, " -c ", threads
, " -t ", runtime_min
, " -p ", partition
, " -A ", account
, array_cmd_string
, email_cmd_string
, " -e ", std_err_path
, " -o ", std_out_path
, " " , shell_script_path
, " " , r_image_cmd
, " -s ", script_path
)
# add hold_for_JobIDs if exists
if(!is.null(hold_for_JobIDs)){
hold_ids <- paste(hold_for_JobIDs, collapse = ":")
command <- paste0(command, " --dependency=afterok:", hold_ids)
}
# append extra arguments - handles NULL input by default
for(arg_name in names(args_list)) {
command <- paste(command, arg_name, args_list[arg_name])
}
if(dry_runTF) {
message("\n", command, "\n")
return(0L)
}
submission_return <- system(command, intern = TRUE)
job_id <- regmatches(submission_return, gregexpr("\\d+$", submission_return))
array_message <- ifelse(is_array_job, "array", "")
if(length(job_id) > 1) warning("job_id from submitted job '", job_name ,"' is longer than 1, inspect before use.")
job_id <- as.integer(unlist(job_id))
if(verbose) message(paste0("\n", submission_return, array_message, " : ", job_name, "\n"))
if(v_verbose) message("Logs saved to: \n", paste0(unique(c(std_out_path, std_err_path)), collapse = "\n"), "\n")
return(job_id)
}
# scoping
# language = "R"
# script_path = file.path(.bootstrap_root, "module_simmod/run_eppasm_for_location_by_draw_and_scenario.R")
# std_err_root = file.path(.output_root, "std_err")
# std_out_root = file.path(.output_root, "std_out")
# job_name = .job_name
# mem = "40G"
# shell_script_path = NULL
# archiveTF = T
# use_paths_file_r_image = T
# account = "proj_hiv"
# partition = "all.q"
# threads = as.character(PARAMS$n_cores_eppasm)
# runtime_min = "60"
# job_name = NULL
# r_image = NULL
# args_list =
# list("--bootstrap_root" = .bootstrap_root,
# "--output_root" = .output_root,
# "--n_cores" = as.integer(PARAMS$n_cores_eppasm),
# "--loc_id" = loc_id,
# "--available_draws" = available_draws)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.