R/afni_3dfwhmx_list.R

#' R6 class for running 3dFWHMx on a group of input files using a scheduler/cluster
#'
#' @importFrom R6 R6Class
#' @export
afni_3dfwhmx_list <- R6::R6Class("afni_3dfwhmx_list",
  private = list(
    fwhmx_batch = NULL, # batch object
    wall_time = "10:00:00", # 10 hours default for all jobs
    fwhmx_objs = list(),
    fwhmx_df = data.frame(),
    fwhmx_acf_avg = setNames(rep(NA_real_, 4), c("a", "b", "c")),
    fwhmx_effective_fwhm = NA_real_,
    all_fwhmx_complete = FALSE,
    scheduler = "slurm",
    input_files = NULL,
    mask_files = NULL,

    populate_acf_df = function() {
      res <- lapply(private$fwhmx_objs, function(x) x$get_acf_params())
      inp_files <- sapply(private$fwhmx_objs, function(x) x$get_input_file())
      mask_files <- sapply(private$fwhmx_objs, function(x) x$get_mask_file())
      lens <- sapply(res, length)
      if (!all(lens == 4)) {
        stop("The length of some ACF outputs from $get_acf_params is not 4. Don't know how to proceed!")
      }
      acf_mat <- do.call(rbind, res)
      colnames(acf_mat) <- c("a", "b", "c", "effective_fwhm")

      if (is.null(mask_files)) {
        private$fwhmx_df <- data.frame(input = private$input_files, acf_mat)
      } else {
        private$fwhmx_df <- data.frame(input = private$input_files, mask = private$mask_files, acf_mat)
      }

      private$fwhmx_effective_fwhm <- mean(acf_mat[, "effective_fwhm"])

      # always return self for side-effect methods
      return(invisible(self))
    },
    get_fwhmx_calls = function(include_complete = FALSE) {
      if (isTRUE(include_complete)) {
        fwhmx_return <- rep(TRUE, length(private$fwhmx_objs))
      } else {
        fwhmx_return <- sapply(private$fwhmx_objs, function(x) !x$is_complete())
      }

      return(sapply(private$fwhmx_objs[fwhmx_return], function(x) x$get_call()))
    },
    generate_batch = function(include_complete = FALSE) {
      # this will return all 3dfwhmx calls

      fwhmx_calls <- private$get_fwhmx_calls(include_complete = include_complete)
      if (length(fwhmx_calls) == 0L) {
        message("All 3dFWHMx runs have already completed. If you want to force a re-run, use $submit(force=TRUE)")
        private$fwhmx_batch <- NULL # reset to NULL for consistency
        return(invisible(NULL))
      }

      # create parent batch job to run all 3dFWHMx scripts
      private$fwhmx_batch <- R_batch_job$new(
        job_name = "run_3dfwhmx", n_cpus = 1,
        wall_time = private$wall_time, scheduler = private$scheduler,
        input_objects = list(fwhmx_calls = fwhmx_calls), # export this object to the job
        wait_for_children = TRUE, r_packages = "fmri.pipeline",
        r_code = glue(
          "child_job_ids <- cluster_submit_shell_jobs(fwhmx_calls, memgb_per_command=8, fork_jobs=TRUE, scheduler='{private$scheduler}', job_script_prefix='job_3dfwhmx')"
        )
        # cleanup_r_code = glue(
        #   "batch_obj$refresh()" #update afni_3dfwhmx objs/params, 
        # )
      )
    }
  ),
  public = list(
    #' @description create a new afni_3dfwhmx_list object
    #' @param input_files A vector of input files that should each be passed to 3dFWHMx (usually first-level residuals)
    #' @param mask_files A vector of mask files corresponding to \code{input_files} (i.e., these should align),
    #'   use to set the volume over which smoothness is estimated by 3dFWHMx
    #' @param scheduler The HPC scheduler to be used: 'local', 'slurm', or 'torque'. Default: 'slurm'
    #' @param wall_time The total time required to run all 3dFWHMx objects on the job scheduler/HPC
    initialize = function(input_files = NULL, mask_files = NULL, scheduler = NULL, wall_time="10:00:00") {
      if (is.null(input_files)) {
        stop("afni_3dfwhmx_list requires at least one input file")
      }

      checkmate::assert_character(input_files)
      checkmate::assert_file_exists(input_files)
      private$input_files <- input_files

      if (!is.null(mask_files)) {
        checkmate::assert_file_exists(mask_files)
        stopifnot(length(mask_files) == length(input_files)) # enforce length match for inputs and masks (otherwise, how do they line up?)
        private$mask_files <- mask_files
      }

      self$refresh() # populates fwhmx_objs and all_fwhmx_complete based on inputs

      if (!is.null(scheduler)) {
        checkmate::assert_string(scheduler)
        checkmate::assert_subset(scheduler, c("torque", "slurm", "local"))
        private$scheduler <- scheduler
      }

      if (!is.null(wall_time)) {
        private$wall_time <- validate_dhms(wall_time)
      }
    },

    #' @description recreate afni_3dfwhmx objects and completion status
    #' @details useful for updating job status and ACF params after a run completes
    #' @param ... Passes through any arguments to afni_3dfwhmx$new()
    refresh = function(...) {
      # N.B. This should refresh from the object generated by the batch... basically the batch runs, then saves the modified copy
      # in an RData object. Then we pull this in from that object and refresh the parent/calling object.
      # so if output_rdata_file is /tmp/test.RData, we need to pull cobj out of that, then get its results into this object.
      # basically, the parent object should always know where to look for the output RData of the batch job

      # create 3dFWHMx object for each input file
      private$fwhmx_objs <- lapply(seq_along(private$input_files), function(ii) {
        afni_3dfwhmx$new(input_file = private$input_files[ii], mask_file = private$mask_files[ii], ...)
      })

      private$all_fwhmx_complete <- all(sapply(private$fwhmx_objs, function(x) x$is_complete()))
    },

    #' @description Submit the 3dFWHMx batch for all inputs to the scheduler
    #' @param force If \code{TRUE}, completed 3dFWHMx runs will be included in the batch. Default: FALSE
    submit = function(force = FALSE) {
      private$generate_batch(include_complete = force)
      if (!is.null(private$fwhmx_batch)) {
        private$fwhmx_batch$submit()
      } else {
        message("Nothing to submit")
      }
    },

    #' @description Return R_batch_job objects used to submit 3dFWHMx for all inputs
    #' @param include_complete If TRUE, complete 3dFWHMx jobs will be included in the batch, leading these to be re-run
    get_batch = function(include_complete = FALSE) {
      private$generate_batch(include_complete = include_complete)
      return(private$fwhmx_batch)
    },

    #' @description method to calculate the overall ACF average across all input datasets
    #' @param allow_incomplete_fwhmx If \code{TRUE}, the average will be calculated even if some datasets
    #'   do not have 3dFWHMx parameter estimates (e.g., if they crashed or were never run). Default. \code{FALSE}.
    #' @return a three-element vector containing the ACF estimates averaged across all datasets
    get_acf_average = function(allow_incomplete_fwhmx=FALSE) {
      private$populate_acf_df()
      if (any(is.na(private$fwhmx_df)) && isFALSE(allow_incomplete_fwhmx)) {
        stop("Unable to get ACF average because parameters are missing for some datasets. Make sure you've run 3dFWHMx for all inputs provided!")
      }

      private$fwhmx_acf_avg <- colMeans(private$fwhmx_df[, c("a", "b", "c")], na.rm = TRUE)
      return(private$fwhmx_acf_avg)
    },

    #' @description return the 3dFWHMx results as a data.frame
    get_acf_df = function() {
      private$populate_acf_df()
      private$fwhmx_df
    },

    #' @description return the effective FWHM estimated by 3dFWHMx -ACF
    get_effective_fwhm = function() {
      if (is.na(private$fwhmx_effective_fwhm)) {
        private$populate_acf_df() # make sure that the FWHM is calculated
      }
      return(private$fwhmx_effective_fwhm)
    },

    #' @description Simple method to return whether 3dFWHMx is complete for all input files
    #' @return returns \code{TRUE} if 3dFWHMx has completed for all runs
    is_complete = function() {
      private$all_fwhmx_complete
    }
  )
)
UNCDEPENdLab/fmri.pipeline documentation built on April 3, 2025, 3:21 p.m.