#' R6 class for running 3dFWHMx on a group of input files using a scheduler/cluster
#'
#' @importFrom R6 R6Class
#' @export
afni_3dfwhmx_list <- R6::R6Class("afni_3dfwhmx_list",
private = list(
fwhmx_batch = NULL, # batch object
wall_time = "10:00:00", # 10 hours default for all jobs
fwhmx_objs = list(),
fwhmx_df = data.frame(),
fwhmx_acf_avg = setNames(rep(NA_real_, 4), c("a", "b", "c")),
fwhmx_effective_fwhm = NA_real_,
all_fwhmx_complete = FALSE,
scheduler = "slurm",
input_files = NULL,
mask_files = NULL,
populate_acf_df = function() {
res <- lapply(private$fwhmx_objs, function(x) x$get_acf_params())
inp_files <- sapply(private$fwhmx_objs, function(x) x$get_input_file())
mask_files <- sapply(private$fwhmx_objs, function(x) x$get_mask_file())
lens <- sapply(res, length)
if (!all(lens == 4)) {
stop("The length of some ACF outputs from $get_acf_params is not 4. Don't know how to proceed!")
}
acf_mat <- do.call(rbind, res)
colnames(acf_mat) <- c("a", "b", "c", "effective_fwhm")
if (is.null(mask_files)) {
private$fwhmx_df <- data.frame(input = private$input_files, acf_mat)
} else {
private$fwhmx_df <- data.frame(input = private$input_files, mask = private$mask_files, acf_mat)
}
private$fwhmx_effective_fwhm <- mean(acf_mat[, "effective_fwhm"])
# always return self for side-effect methods
return(invisible(self))
},
get_fwhmx_calls = function(include_complete = FALSE) {
if (isTRUE(include_complete)) {
fwhmx_return <- rep(TRUE, length(private$fwhmx_objs))
} else {
fwhmx_return <- sapply(private$fwhmx_objs, function(x) !x$is_complete())
}
return(sapply(private$fwhmx_objs[fwhmx_return], function(x) x$get_call()))
},
generate_batch = function(include_complete = FALSE) {
# this will return all 3dfwhmx calls
fwhmx_calls <- private$get_fwhmx_calls(include_complete = include_complete)
if (length(fwhmx_calls) == 0L) {
message("All 3dFWHMx runs have already completed. If you want to force a re-run, use $submit(force=TRUE)")
private$fwhmx_batch <- NULL # reset to NULL for consistency
return(invisible(NULL))
}
# create parent batch job to run all 3dFWHMx scripts
private$fwhmx_batch <- R_batch_job$new(
job_name = "run_3dfwhmx", n_cpus = 1,
wall_time = private$wall_time, scheduler = private$scheduler,
input_objects = list(fwhmx_calls = fwhmx_calls), # export this object to the job
wait_for_children = TRUE, r_packages = "fmri.pipeline",
r_code = glue(
"child_job_ids <- cluster_submit_shell_jobs(fwhmx_calls, memgb_per_command=8, fork_jobs=TRUE, scheduler='{private$scheduler}', job_script_prefix='job_3dfwhmx')"
)
# cleanup_r_code = glue(
# "batch_obj$refresh()" #update afni_3dfwhmx objs/params,
# )
)
}
),
public = list(
#' @description create a new afni_3dfwhmx_list object
#' @param input_files A vector of input files that should each be passed to 3dFWHMx (usually first-level residuals)
#' @param mask_files A vector of mask files corresponding to \code{input_files} (i.e., these should align),
#' use to set the volume over which smoothness is estimated by 3dFWHMx
#' @param scheduler The HPC scheduler to be used: 'local', 'slurm', or 'torque'. Default: 'slurm'
#' @param wall_time The total time required to run all 3dFWHMx objects on the job scheduler/HPC
initialize = function(input_files = NULL, mask_files = NULL, scheduler = NULL, wall_time="10:00:00") {
if (is.null(input_files)) {
stop("afni_3dfwhmx_list requires at least one input file")
}
checkmate::assert_character(input_files)
checkmate::assert_file_exists(input_files)
private$input_files <- input_files
if (!is.null(mask_files)) {
checkmate::assert_file_exists(mask_files)
stopifnot(length(mask_files) == length(input_files)) # enforce length match for inputs and masks (otherwise, how do they line up?)
private$mask_files <- mask_files
}
self$refresh() # populates fwhmx_objs and all_fwhmx_complete based on inputs
if (!is.null(scheduler)) {
checkmate::assert_string(scheduler)
checkmate::assert_subset(scheduler, c("torque", "slurm", "local"))
private$scheduler <- scheduler
}
if (!is.null(wall_time)) {
private$wall_time <- validate_dhms(wall_time)
}
},
#' @description recreate afni_3dfwhmx objects and completion status
#' @details useful for updating job status and ACF params after a run completes
#' @param ... Passes through any arguments to afni_3dfwhmx$new()
refresh = function(...) {
# N.B. This should refresh from the object generated by the batch... basically the batch runs, then saves the modified copy
# in an RData object. Then we pull this in from that object and refresh the parent/calling object.
# so if output_rdata_file is /tmp/test.RData, we need to pull cobj out of that, then get its results into this object.
# basically, the parent object should always know where to look for the output RData of the batch job
# create 3dFWHMx object for each input file
private$fwhmx_objs <- lapply(seq_along(private$input_files), function(ii) {
afni_3dfwhmx$new(input_file = private$input_files[ii], mask_file = private$mask_files[ii], ...)
})
private$all_fwhmx_complete <- all(sapply(private$fwhmx_objs, function(x) x$is_complete()))
},
#' @description Submit the 3dFWHMx batch for all inputs to the scheduler
#' @param force If \code{TRUE}, completed 3dFWHMx runs will be included in the batch. Default: FALSE
submit = function(force = FALSE) {
private$generate_batch(include_complete = force)
if (!is.null(private$fwhmx_batch)) {
private$fwhmx_batch$submit()
} else {
message("Nothing to submit")
}
},
#' @description Return R_batch_job objects used to submit 3dFWHMx for all inputs
#' @param include_complete If TRUE, complete 3dFWHMx jobs will be included in the batch, leading these to be re-run
get_batch = function(include_complete = FALSE) {
private$generate_batch(include_complete = include_complete)
return(private$fwhmx_batch)
},
#' @description method to calculate the overall ACF average across all input datasets
#' @param allow_incomplete_fwhmx If \code{TRUE}, the average will be calculated even if some datasets
#' do not have 3dFWHMx parameter estimates (e.g., if they crashed or were never run). Default. \code{FALSE}.
#' @return a three-element vector containing the ACF estimates averaged across all datasets
get_acf_average = function(allow_incomplete_fwhmx=FALSE) {
private$populate_acf_df()
if (any(is.na(private$fwhmx_df)) && isFALSE(allow_incomplete_fwhmx)) {
stop("Unable to get ACF average because parameters are missing for some datasets. Make sure you've run 3dFWHMx for all inputs provided!")
}
private$fwhmx_acf_avg <- colMeans(private$fwhmx_df[, c("a", "b", "c")], na.rm = TRUE)
return(private$fwhmx_acf_avg)
},
#' @description return the 3dFWHMx results as a data.frame
get_acf_df = function() {
private$populate_acf_df()
private$fwhmx_df
},
#' @description return the effective FWHM estimated by 3dFWHMx -ACF
get_effective_fwhm = function() {
if (is.na(private$fwhmx_effective_fwhm)) {
private$populate_acf_df() # make sure that the FWHM is calculated
}
return(private$fwhmx_effective_fwhm)
},
#' @description Simple method to return whether 3dFWHMx is complete for all input files
#' @return returns \code{TRUE} if 3dFWHMx has completed for all runs
is_complete = function() {
private$all_fwhmx_complete
}
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.