R/R_batch_job.R

#' Description of R_batch_job R6 class
#' @importFrom lubridate period_to_seconds
#' @export
R_batch_job <- R6::R6Class("batch_job",
  private = list(
    # whether the batch and compute files have already been created
    batch_generated = FALSE,

    # absolute path to batch file
    batch_file_name = NULL,

    # absolute path to compute file
    compute_file_name = NULL,

    # absolute path to R script
    r_file_name = NULL,

    # field job_id the process or scheduler job id that uniquely identifies this job
    job_id = NULL,

    # field child_job_ids a vector of job ids generated by the submission of this job. Only relevant if this job
    #    submits additional jobs through the scheduler directly
    child_job_ids = NULL, #currently no simple way to populate these without a messaging interface with running batch job

    get_unique_file_name = function(file_name = "") {
      base <- tools::file_path_sans_ext(file_name)
      ext <- tools::file_ext(file_name)

      file_name <- paste0(
        base,
        ifelse(is.null(self$job_name), "", paste0("_", self$job_name)),
        ".", ext
      )

      if (file.exists(normalizePath(file.path(self$batch_directory, file_name), mustWork = FALSE))) {
        # add unique random string to script name if the file already exists
        file_name <- sub(
          paste0("\\.", ext, "$"),
          paste0("_", sub("^/", "", tempfile(pattern = "", tmpdir = "")), ".", ext),
          file_name
        )
      }

      return(normalizePath(file.path(self$batch_directory, file_name), mustWork = FALSE))
    },

    # helper to generate and return the full path to the batch script
    get_batch_file_name = function() {
      if (is.null(private$batch_file_name)) {
        private$batch_file_name <- private$get_unique_file_name("submit_batch.sh")
      }
      return(private$batch_file_name)
    },

    # helper to generate and return the full path to the compute script
    get_compute_file_name = function() {
      if (is.null(private$compute_file_name)) {
        private$compute_file_name <- private$get_unique_file_name("batch_run.R")
      }
      return(private$compute_file_name)
    },

    # helper to generate and return the full path to the R script that is sourced
    get_r_file_name = function() {
      if (is.null(private$r_file_name)) {
        private$r_file_name <- private$get_unique_file_name("r_source.R")
      }
      return(private$r_file_name)
    },

    # write the sbatch, pbs, or local bash script
    write_batch_file = function() {
      syntax <- c("#!/bin/sh")

      if (self$scheduler %in% c("slurm", "sbatch")) {
        mem_string <- NULL
        if (!is.null(self$mem_per_cpu)) {
          mem_string <- paste0("#SBATCH --mem-per-cpu=", self$mem_per_cpu)
        } else if (!is.null(self$mem_total)) {
          mem_string <- paste0("#SBATCH --mem=", self$mem_total)
        }

        job_string <- if (is.null(self$job_name)) NULL else paste("#SBATCH -J", self$job_name)
        sched_string <- if (is.null(self$scheduler_options)) NULL else paste("#SBATCH", self$scheduler_options)

        syntax <- c(
          syntax,
          paste("#SBATCH -N", self$n_nodes),
          paste("#SBATCH -n", self$n_cpus),
          paste("#SBATCH -t", self$wall_time),
          "#SBATCH -b now+10", # add 10-second delay to job allocation so that job id can be written to script
          mem_string,
          job_string,
          sched_string,
          "",
          "export JOBID=$SLURM_JOB_ID",
          "cd $SLURM_SUBMIT_DIR",
          self$batch_code
        )
      } else if (self$scheduler %in% c("torque", "qsub")) {
        mem_string <- NULL
        if (!is.null(self$mem_per_cpu)) {
          mem_string <- paste0("#PBS -l pmem=", self$mem_per_cpu)
        } else if (!is.null(self$mem_total)) {
          mem_string <- paste0("#PBS -l mem=", self$mem_total)
        }

        job_string <- if (is.null(self$job_name)) NULL else paste("#PBS -N", substr(self$job_name, 1, 15)) # torque limits to 15 chars
        sched_string <- if (is.null(self$scheduler_options)) NULL else paste("#PBS", self$scheduler_options)

        syntax <- c(
          syntax,
          paste0("#PBS -l nodes=", self$n_nodes, ":ppn=", self$n_cpus),
          paste0("#PBS -l walltime=", self$wall_time),
          mem_string,
          job_string,
          sched_string,
          "",
          "export JOBID=$PBS_JOBID",
          "cd $PBS_O_WORKDIR",
          self$batch_code
        )
      } else {
        # only paste in batch_code, which may contain compute environment setup statements
        syntax <- c(
          syntax,
          "export JOBID=$$",
          self$batch_code
        )
      }

      # syntax <- c(syntax, paste("Rscript --vanilla", private$get_compute_file_name()))
      syntax <- c(syntax, paste("R CMD BATCH --no-save --no-restore", private$get_compute_file_name()))

      message("Writing batch script to: ", private$get_batch_file_name())
      writeLines(syntax, con = private$get_batch_file_name())
    },

    # write code to be executed
    write_compute_file = function() {
      # job tracking

      syntax <- "cat(paste('Job start time:', Sys.time()), '\\n')"
      if (!is.null(self$sqlite_db)) syntax <- c(syntax, glue::glue("fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'STARTED')"))

      if (isTRUE(self$print_session_info)) {
        syntax <- c(
          syntax,
          "print(sessionInfo())",
          "print(Sys.info())"
        )
      }

      if (isTRUE(self$print_environment)) {
        syntax <- c(
          syntax,
          "print(Sys.getenv())"
        )
      }

      if (!is.null(self$r_packages)) {
        syntax <- c(
          syntax,
          "if (!require(pacman)) { install.packages('pacman'); library(pacman) }",
          paste0("pacman::p_load(", paste(self$r_packages, collapse = ", "), ")")
        )
      }

      if (!is.null(self$input_rdata_file)) {
        if (!file.exists(self$input_rdata_file)) {
          # warning(sprintf("input_rdata_file %s did not exist at the time of script generation.", self$input_rdata_file))
        }

        syntax <- c(
          syntax,
          paste(glue::glue("if (file.exists('{self$input_rdata_file}'))"), "{"),
          glue::glue("  load('{self$input_rdata_file}')"),
          "} else {",
          glue::glue("  stop('Cannot load input environment object: {self$input_rdata_file}')"),
          "}"
        )
      }
      
      failed_str <- if (!is.null(self$sqlite_db)) glue::glue("    fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'FAILED', cascade = TRUE)") else NULL
      syntax <- c(
        syntax,
        glue::glue("tryCatch(source(\"{private$get_r_file_name()}\", echo=TRUE, max.deparse.length=2000),"),
        "  error=function(e) { ",
        "    print(as.character(e))",
        failed_str,
        "    stop(e)",
        "})"        
      )

      # write R code to execute
      writeLines(self$r_code, con = private$get_r_file_name())

      if (!is.null(self$output_rdata_file)) {
        syntax <- c(syntax, paste0("save.image(file='", self$output_rdata_file, "')"))
      }

      if (isTRUE(self$wait_for_children)) {
        syntax <- c(
          syntax,
          "if (exists('child_job_ids') && inherits(child_job_ids, c('numeric', 'integer', 'character'))) {",
          glue::glue("  sapply(child_job_ids, function(id) fmri.pipeline::add_tracked_job_parent(sqlite_db = '{self$sqlite_db}', job_id = id, parent_job_id = Sys.getenv('JOBID')))"),          
          paste0(
            "  success <- fmri.pipeline::wait_for_job(child_job_ids, quiet=FALSE",
            ", repolling_interval=", self$repolling_interval,
            ", max_wait=", lubridate::period_to_seconds(dhms(self$wall_time)),
            ", scheduler='", self$scheduler, "')"
          ),
          if (isTRUE(self$all_children_successful)) {
            glue::glue("  if (isFALSE(success)) {{ fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'FAILED', cascade = TRUE, exclude = child_job_ids); stop() }}") 
          } else { 
            NULL
          },
          "} else {",
          "  warning('Attempt to wait for child jobs failed due to non-existent or improper child_job_ids variable.')",
          "}"
        )

        # add any post-children R code to be executed (e.g., combining outputs from the child jobs)
        if (!is.null(self$post_children_r_code)) {
          syntax <- c(
            syntax,
            self$post_children_r_code
          )
        }
      }

      syntax <- c(
        syntax,
        "cat(paste('Job end time:', Sys.time()), '\\n')"
      )
        
      if (!is.null(self$sqlite_db)) syntax <- c(syntax, glue::glue("fmri.pipeline::update_tracked_job_status('{self$sqlite_db}', Sys.getenv('JOBID'), 'COMPLETED')"))

      message("Writing R script to: ", private$get_compute_file_name())
      writeLines(syntax, con = private$get_compute_file_name())
    },

    write_rdata_file = function() {
      if (is.null(self$input_objects)) {
        return(invisible(NULL))
      } # nothing to do

      self$input_rdata_file <- private$get_unique_file_name(self$input_rdata_file)
      save(list = ls(all.names = TRUE, envir = self$input_objects), envir = self$input_objects, file = self$input_rdata_file)
    },

    # helper to gather arguments for tracking, specifically for an R batch job
    gather_tracking_args = function() {
      if (is.null(self$sqlite_db)) return(NULL) # return NULL if not tracking
      job_obj <- if (isTRUE(self$sqlite_cache_obj))  serialize(self, NULL) else NULL
      scheduler_options <- if (is.null(self$scheduler_options)) NA else paste(self$scheduler_options, collapse="\n")
      tracking_args <- list(job_name = self$job_name, batch_directory = self$batch_directory,
                            compute_file = private$get_compute_file_name(), 
                            code_file = private$get_r_file_name(),
                            n_nodes = self$n_nodes, n_cpus = self$n_cpus, wall_time = self$wall_time,
                            mem_per_cpu = self$mem_per_cpu, mem_total = self$mem_total,
                            scheduler_options = scheduler_options, job_obj = list(job_obj))
      return(tracking_args)
    }

  ),
  public = list(
    #' @field parent_jobs a vector of parent_jobs that are upstream of this job and may influence its execution
    parent_jobs = NULL,

    #' @field depends_on_parents logical or character string indicating whether this job should wait until
    #'    \code{parent_jobs} complete. If a character string is passed, it indicates which named elements of
    #'    \code{parent_jobs} must complete before this job begins.
    depends_on_parents = FALSE,

    #' @field job_name a user-defined name for the job used for specifying job dependencies and informative job
    #'    status queries on a job scheduler
    job_name = NULL,

    #' @field wait_for_children if TRUE, code will be inserted to wait for all jobs in a vector called
    #'    \code{child_job_ids} to finish before the batch exits. It's up to your code to use this variable name
    wait_for_children = FALSE,
    
    #' @field all_children_successful if TRUE, all jobs in vector \code{child_job_ids} 
    #'    must be successful for this job to finish (see \code{wait_for_children} field)
    all_children_successful = FALSE,

    #' @field wall_time The amount of time requested on the job scheduler, following d-hh:mm:ss format. Defaults to
    #'    "4:00:00", which is 4 hours.
    wall_time = "4:00:00",

    #' @field n_nodes The number of nodes to be requested on the job scheduler
    n_nodes = "1",

    #' @field n_cpus The number of cores (aka 'cpus', ignoring hyperthreading) to be requested on the job scheduler
    n_cpus = "4",

    #' @field mem_total The total amount of memory (RAM) requested by the job
    mem_total = "4G",

    #' @field mem_per_cpu The amount of memory (RAM) requested per cpu (total = mem_per_cpu * n_cpus)
    mem_per_cpu = NULL,

    #' @field input_objects An environment containing all objects to be written to an RData object and passed
    #'   to the batch job at execution
    input_objects = NULL,

    #' @field input_rdata_file The name of the environment to be loaded at the beginning of the R batch prior to executing
    #     other code. Used to setup any local objects needed to begin computation.
    input_rdata_file = NULL,

    #' @field output_rdata_file The name of the environment to be saved at the end of the R batch execution, which can then
    #'    be loaded by subsequent jobs.
    output_rdata_file = NULL,

    #' @field sqlite_db File path to tracking SQLite database
    sqlite_db = NULL,

    #' @field batch_id Not currently used, but intended for job sequence tracking
    batch_id = NULL,

    #' @field batch_directory Location of batch scripts to be written
    batch_directory = NULL, # needs to be somewhere that both compute nodes and login nodes can access, so /tmp is not good.

    #' @field batch_code Shell code to be included in the batch script prior to the R code to be run. This can include
    #'    module load statements, environment variable exports, etc.
    batch_code = NULL,

    #' @field r_code The R code to be executed by the scheduler. This can be a character vector that includes multiple
    #'    R statements or an expression object containing the R code to be evaluated
    r_code = NULL,

    #' @field post_children_r_code The R code to be executed after child jobs have completed. This can be a character vector that includes
    #'   multiple R statements or an expression object containing the R code to be evaluated. Only relevant if wait_for_children = TRUE
    post_children_r_code = NULL,

    #' @field r_packages The R packages to be loaded into the environment before job execution. These are loaded by
    #'    pacman::p_load, which will install any missing packages before attempting to load
    r_packages = NULL,

    #' @field scheduler The job scheduler to be used for this batch. Options are: "slurm", "torque", or "local".
    scheduler = "slurm",

    #' @field scheduler_options An optional character vector of scheduler arguments to be included in the batch script header
    #'    that control additional features such as job emails or group permissions. These directives are added with #SBATCH
    #'    or #PBS headings, depending on the scheduler, and are ignored if the scheduler is "local".
    scheduler_options = NULL,

    #' @field repolling_interval The number of seconds to wait between successive checks on whether parent jobs have completed.
    #'    This is mostly relevant to the 'local' scheduler.
    repolling_interval = 300, # seconds

    #' @field print_session_info If TRUE, print the `sessionInfo()` and `Sys.info()` when the job starts. Useful for debugging
    #'   problems with the compute environment or R installation.
    print_session_info = TRUE,

    #' @field print_environment If `TRUE`, print the `Sys.getenv()` when the job starts. This can produce a lot of output,
    #'   but can be useful if certain environment variables are not being found when your job runs, leading it to fail.
    print_environment = FALSE,

    #' @field sqlite_cache_obj If `TRUE`, copy this object into the SQLite database at submission time. This is mostly useful for
    #'   detailed debugging and is not generally recommended because it can increase the size of the database considerably
    sqlite_cache_obj = FALSE,

    #' @description Create a new R_batch_job object for execution on an HPC cluster
    #' @param batch_directory The location of batch scripts to be generated
    #' @param parent_jobs A vector of one or more job ids that are parents to this job. This can be a named vector, to
    #'    be used in conjunction with \code{depends_on_parents} to specify which parent jobs must be completed before this
    #'    job begins.
    #' @param job_name The name of the job used in dependency specification and job scheduler naming
    #' @param n_nodes The number of compute nodes to be requested on the scheduler
    #' @param n_cpus The number of cpus to be requested on the scheduler
    #' @param wall_time The compute time requested on the cluster dd-HH:MM:SS
    #' @param mem_per_cpu The amount of memory to be requested per cpu
    #' @param mem_total The total amount of memory to requested by the job
    #' @param batch_id The batch id (not currently used)
    #' @param r_code A character vector or expression containing R code to be executed
    #' @param r_script The path to an R script to be executed by the batch (mutually exclusive with \code{r_code}).
    #' @param post_children_r_code A character vector of R code to be executed after waiting for child jobs finishes
    #' @param batch_code A character vector of code to be included in the batch script for job scheduling
    #' @param r_packages A character vector of R packages to be loaded when compute script runs
    #' @param scheduler The scheduler to be used for this compute. Options are 'slurm', 'torque', or 'local'.
    #' @param wait_for_children If TRUE, do not end this job until all child jobs have completed
    #' @param all_children_successful If TRUE, don't count this job as successful unless all child jobs are successful
    #' @param input_rdata_file The name of the environment to be loaded at the beginning of the R batch prior to executing code
    #' @param input_objects A list object in the current execution environment to be cached and used as input to the R batch.
    #'   This is mutually exclusive with input_rdata_file at present.
    #' @param output_rdata_file The name of the environment to be saved at the end of the R batch execution
    #' @param sqlite_db The location of the SQLite database to be used for job tracking. If `NULL`, job tracking will be disabled.
    #' @param scheduler_options A character vector of scheduler options to be added to the header of the batch script
    #' @param repolling_interval The number of seconds to wait before rechecking whether parent jobs have completed
    #' @param print_session_info If TRUE (default), print information about the R environment `sessionInfo()` and
    #'   compute environment `Sys.info()` when the job starts.
    #' @param print_environment If TRUE, print the session environment via `Sys.getenv()` when the job starts. Default: FALSE.
    initialize = function(batch_directory = NULL, parent_jobs = NULL, job_name = NULL, n_nodes = NULL, n_cpus = NULL,
                          wall_time = NULL, mem_per_cpu = NULL, mem_total = NULL, batch_id = NULL, r_code = NULL, r_script = NULL,
                          post_children_r_code = NULL, batch_code = NULL, r_packages = NULL, scheduler = NULL, wait_for_children = NULL,
                          all_children_successful = NULL, input_rdata_file = NULL, input_objects = NULL, output_rdata_file = NULL, sqlite_db = NULL,
                          scheduler_options = NULL, repolling_interval = NULL, print_session_info = TRUE, print_environment = FALSE) {

      if (!is.null(batch_directory)) {
        self$batch_directory <- batch_directory
      } else {
        hostname <- Sys.info()["nodename"]
        user <- Sys.info()["user"]
        if (isTRUE(grepl("longleaf", hostname))) {
          self$batch_directory <- glue("/work/users/{substr(user, 1, 1)}/{substr(user, 2, 2)}/{user}")
          message(glue("On Longleaf, defaulting to batch_directory on the scratch system: {self$batch_directory}"))
        } else {
          self$batch_directory <- normalizePath(("~/"))
          message(glue("Defaulting batch_directory to your home directory: {self$batch_directory}"))
        }
      }
      if (!is.null(parent_jobs)) self$parent_jobs <- parent_jobs
      if (!is.null(job_name)) self$job_name <- as.character(job_name)
      if (!is.null(n_nodes)) self$n_nodes <- as.character(n_nodes)
      if (!is.null(n_cpus)) self$n_cpus <- as.character(n_cpus)
      if (is.null(wall_time)) {
        message("Using default wall_time of: ", self$wall_time)
      } else {
        self$wall_time <- as.character(wall_time)
      }

      if (!is.null(mem_per_cpu)) {
        checkmate::assert_string(mem_per_cpu)
        self$mem_per_cpu <- mem_per_cpu
      }

      if (!is.null(mem_total)) {
        checkmate::assert_string(mem_total)
        self$mem_total <- mem_total
      }
      
      if (!is.null(batch_id)) self$batch_id <- as.character(batch_id)

      if (!is.null(r_script)) {
        if (!is.null(r_code)) {
          stop("Both r_script and r_code provided to R_batch_job. These are mutually exclusive!")
        }

        checkmate::assert_file_exists(r_script)
        r_code <- readLines(r_script) # read in r_code from external script
      }

      if (is.null(r_code)) {
        stop("Unable to initialize R_batch_job object without r_code")
      } else {
        checkmate::assert_multi_class(r_code, c("expression", "character"))
        if (is.expression(r_code)) { # expand r_code expression as character vector
          r_code <- capture.output(cat(as.character(r_code), sep = "\n"))
        }
        self$r_code <- r_code
      }

      if (!is.null(batch_code)) {
        checkmate::assert_character(batch_code)
        self$batch_code <- batch_code
      }

      if (!is.null(r_packages)) {
        checkmate::assert_character(r_packages)
        self$r_packages <- r_packages
      }

      checkmate::assert_subset(scheduler, c("torque", "qsub", "slurm", "sbatch", "sh", "local"))
      if (!is.null(scheduler)) self$scheduler <- scheduler

      if (!is.null(input_objects)) {
        if (!is.null(input_rdata_file)) {
          stop("At present, you cannot specify both input_rdata_file and input_objects as inputs.")
        }

        checkmate::assert_multi_class(input_objects, c("list", "environment"))
        if (checkmate::test_list(input_objects)) {
          if (is.null(names(input_objects))) {
            stop("For list input_objects input, elements of the list must be named.")
          }
          input_objects <- as.environment(input_objects) # always convert to environment for type consistency
        }
        self$input_objects <- input_objects # store objects internally for output when write_compute_file is called
        self$input_rdata_file <- "R_batch_job_environment.RData"
      }

      if (!is.null(input_rdata_file)) self$input_rdata_file <- input_rdata_file
      if (!is.null(output_rdata_file)) self$output_rdata_file <- output_rdata_file
      if (!is.null(sqlite_db)) self$sqlite_db <- sqlite_db

      if (!is.null(scheduler_options)) {
        checkmate::assert_character(scheduler_options)
        self$scheduler_options <- scheduler_options
      }

      if (!is.null(repolling_interval)) {
        checkmate::assert_number(repolling_interval, lower = 0.1, upper = 2e5)
        self$repolling_interval <- repolling_interval
      }

      if (!is.null(wait_for_children)) {
        checkmate::assert_logical(wait_for_children, len = 1L)
        self$wait_for_children <- wait_for_children
      }
      
      if (!is.null(all_children_successful)) {
        checkmate::assert_logical(all_children_successful, len = 1L)
        self$all_children_successful <- all_children_successful
      }

      if (!is.null(post_children_r_code)) {
        stopifnot(wait_for_children == TRUE) # don't let the user try it
        checkmate::assert_multi_class(post_children_r_code, c("expression", "character"))
        self$post_children_r_code <- as.character(post_children_r_code)
      }

      if (checkmate::test_logical(print_session_info, len=1L)) {
        self$print_session_info <- print_session_info
      }

      if (checkmate::test_logical(print_environment, len=1L)) {
        self$print_environment <- print_environment
      }
    },

    #' @description Helper function that generates the batch and compute files for a job
    #' @param force if TRUE, the RData, batch, and compute will be regenerated/rewritten
    #' @details this is called by \code{$submit} when a job is submitted and is provided
    #'   here in case the user wants to generate the batch files without executing them
    generate = function(force = FALSE) {
      if (isFALSE(force) && isTRUE(private$batch_generated)) {
        # skip out of generation if this has already completed
        return(invisible(self))
      }

      # create batch_directory, if missing
      if (!dir.exists(self$batch_directory)) dir.create(self$batch_directory, recursive = TRUE)
      private$write_rdata_file() # save input_objects to file if needed
      private$write_batch_file()
      private$write_compute_file()
      private$batch_generated <- TRUE
      return(invisible(self))
    },

    #' @description Submit job to scheduler or local compute
    submit = function() {
      if (isFALSE(private$batch_generated)) self$generate()
      cd <- getwd()
      setwd(self$batch_directory)

      batch_script <- private$get_batch_file_name()

      # submit job for computation, waiting for parents if requested
      wait_jobs <- NULL
      if (checkmate::test_logical(self$depends_on_parents, max.len = 1) && isTRUE(self$depends_on_parents)) {
        # wait on any/all parent jobs
        wait_jobs <- self$parent_jobs
      } else if (checkmate::test_character(self$depends_on_parents)) {
        # enforce that all parent job dependencies need to be named elements of $parent_jobs
        if (checkmate::test_subset(self$depends_on_parents, names(self$parent_jobs))) {
          wait_jobs <- self$parent_jobs[self$depends_on_parents]
        } else {
          # could make this more informative
          warning("Could not add parent job dependency because one or more depend_on_parents elements were not in parent_jobs")
        }
      }
            
      message("Submitting job: ", self$job_name)

      # TODO: if a job_id already exists and submit is called again, do we check job status, insist a 'forced' submission?
      private$job_id <- fmri.pipeline::cluster_job_submit(batch_script,
        scheduler = self$scheduler,
        wait_jobs = wait_jobs, repolling_interval = self$repolling_interval, 
        tracking_sqlite_db = self$sqlite_db,
        tracking_args = private$gather_tracking_args()
      )

      message("Job received job id: ", private$job_id)

      if (!is.null(cd) && dir.exists(cd)) setwd(cd) # reset working directory (don't attempt if that directory is absent)

      # return job id in case there are subsidiary scripts that depend on this
      return(private$job_id)
    },

    #' @description Function to create a deep copy of a batch job
    #'
    #' @details Note that this also resets the compute_file_name and batch_file_name fields so that the
    #'    copied object doesn't create files that collide with the original.
    #'    This method also exposes a few named parameters that can be used to override the copied fields
    #'    with new values to avoid needing to change these one-by-one using obj$<field> <- x syntax
    #' @param job_name The name of the job used in dependency specification and job scheduler naming
    #' @param n_nodes The number of compute nodes to be requested on the scheduler
    #' @param n_cpus The number of cpus to be requested on the scheduler
    #' @param wall_time The compute time requested on the cluster dd-HH:MM:SS
    #' @param mem_total The total amount of memory (RAM) requested by the job
    #' @param r_code A character vector or expression containing R code to be executed
    copy = function(job_name=NULL, n_nodes=NULL, n_cpus=NULL, wall_time=NULL, mem_total=NULL, r_code=NULL) {
      cloned <- self$clone(deep = TRUE)
      cloned$reset_file_names()

      if (!is.null(job_name)) cloned$job_name <- as.character(job_name)
      if (!is.null(n_nodes)) cloned$n_nodes <- as.character(n_nodes)
      if (!is.null(n_cpus)) cloned$n_cpus <- as.character(n_cpus)
      if (!is.null(wall_time)) cloned$wall_time <- as.character(wall_time)
      if (!is.null(mem_total)) cloned$mem_total <- as.character(mem_total)
      if (!is.null(r_code)) {
        checkmate::assert_multi_class(r_code, c("expression", "character"))
        if (is.expression(r_code)) { # expand r_code expression as character vector
          r_code <- capture.output(cat(as.character(r_code), sep = "\n"))
        }
        cloned$r_code <- r_code
      }

      return(cloned)
    },

    #' @description helper function to reset names of compute and batch files that will be generated by this job.
    #' @details This needs to be exposed as a public method for copied objects to be able to reset these private fields.
    reset_file_names = function() {
      private$batch_generated <- FALSE
      private$batch_file_name <- NULL
      private$compute_file_name <- NULL
    },

    #' @description Return the job id of this job (populated by job submission)
    get_job_id = function() {
      private$job_id
    },

    #' @description Return the ids of all child jobs launched by this job
    get_child_ids = function() {
      private$child_job_ids
    }    
  )
)

#' Description of R_batch_sequence R6 class
#' @export
R_batch_sequence <- R6::R6Class("batch_sequence",
  private = list(
    # list of R_batch_job classes to be run sequentially
    sequence_jobs = list()
  ),
  public = list(
    #' @description create a new R_batch_sequence object
    #' @param ... One or more R_batch_job objects to be run in sequence
    #' @param joblist Optional list of jobs to be used instead of ...
    initialize = function(..., joblist = NULL) {
      others <- list(...)
      if (!is.null(joblist) && is.list(joblist)) {
        all_jobs <- joblist
      } else {
        all_jobs <- others
      }

      # tolerate NULLs in inputs, which are ignored
      private$sequence_jobs <- all_jobs[!sapply(all_jobs, is.null)]
      sapply(private$sequence_jobs, checkmate::assert_class, "batch_job")
    },

    #' @description add one or more R_batch_job objects to the sequence
    #' @param ... One or more R_batch_job objects to be added to sequence
    add = function(...) {
      others <- list(...)
      others <- others[!sapply(others, is.null)] # remove NULLs
      if (length(others) > 1L) {
        sapply(others, checkmate::assert_class, "batch_job")
        private$sequence_jobs <- c(private$sequence_jobs, others)
      }
    },

    #' @description submit the job sequence to the scheduler or local compute
    submit = function() {
      job_list <- private$sequence_jobs
      njobs <- length(job_list)
      for (ii in seq_len(njobs)) {
        this_job <- job_list[[ii]]
        this_job$submit()
        if (ii < njobs) {
          dependent_children <- sapply(seq_along(job_list), function(jnum) {
            this_job$job_name %in% job_list[[jnum]]$depends_on_parents && jnum > ii
          })

          if (any(dependent_children)) {
            job_list[dependent_children] <- lapply(job_list[dependent_children], function(job) {
              # Always add parent job id to any downstream jobs in sequence that depend on this job.
              job$parent_jobs[this_job$job_name] <- this_job$get_job_id()
              return(job)
            })
          }
        }
      }
      return(self)
    },

    #' @description Calls each job's $generate() method so that scripts can be examined without running the sequence
    generate = function() {
      # call generation steps for each job (mostly for testing)
      lapply(private$sequence_jobs, function(x) x$generate())
      return(self)
    },
    
    #' @description Return a named vector of the job ids for all jobs in this sequence
    get_job_ids = function() {
      ids <- sapply(private$sequence_jobs, function(x) setNames(x$get_job_id(), x$job_name))
      dupe_names <- duplicated(names(ids))
      if (any(dupe_names)) {
        nm <- paste(unique(names(ids)[dupe_names]), collapse=", ")
        warning("The following sequence objects have identical names: ", nm, ". Check that you find the right id!")
      }
      
      return(ids)
    }
  )
)
UNCDEPENdLab/fmri.pipeline documentation built on April 3, 2025, 3:21 p.m.