R/temp/parallel_sbatch.R

Defines functions get_sbatch par_sbatch

#' par_sbatch
#'
#' R parallel function, designed for linux sbatch model.
#'
#' @param X a vector (atomic or list) or an expressions vector. Other objects
#' (including classed objects) will be coerced by as.list.
#' @param FUN the function to be applied to (mclapply) each element of X or
#' (mcmapply) in parallel to ....
#' @param ... other parameters passed to mclapply
#' @param return.res Default is no return result.
#' @param Save save result in rda file or not?
#' @param outdir String, output directory for rda file if Save = true.
#' @param prefix File name prefix.
#' @param overwrite Boolean, overwrite exist files or not.
#' 
#' @importFrom parallel mclapply
#' @export
par_sbatch <- function(X, FUN, ..., 
                       return.res = F, Save = F, 
                       outdir = '.', prefix = 'results_', overwrite = T, 
                       cpus_per_node = NULL)
{
    nparams <- length(X)
    # NODES, CPUS_PER_NODE are defined in sbatch file
    # READ NODES AND CPUS FROM SYSTEM ENV
  
    # @param nodes The (maximum) number of cluster nodes to spread the calculation
    # over. slurm_apply automatically divides params in chunks of approximately
    # equal size to send to each node. Less nodes are allocated if the parameter
    # set is too small to use all CPUs on the requested nodes.
    # @param cpus_per_node The number of CPUs per node on the cluster;
    # determines how many processes are run in parallel per node.
    I_node        <- as.numeric(Sys.getenv('SLURM_ARRAY_TASK_ID')) #node ID
    nodes         <- as.numeric(Sys.getenv('SLURM_JOB_NUM_NODES'))

    if (is.null(cpus_per_node)){
        cpus_per_node <- as.numeric(Sys.getenv('SLURM_CPUS_ON_NODE'))
    }
    
    if (is.na(I_node)) I_node <- 0 # sinteractive mode
    if (is.na(nodes))  nodes  <- 1
    if (is.na(cpus_per_node)) cpus_per_node <- 1
    
    str_node <- ifelse(nodes > 1, I_node, "")
    outfile = paste0(outdir, '/', prefix, str_node, '.RDS')

    if (Save && file.exists(outfile) && !overwrite) return(NULL)

    if (nparams < cpus_per_node * nodes) {
        nchunk <- cpus_per_node
    }else {
        nchunk <- ceiling(nparams/nodes) # one CPUS more than one tasks
    }

    # nodes <- ceiling(nparams/nchunk)
    I_beg  <- I_node * nchunk + 1
    I_end  <- min((I_node + 1) * nchunk, nparams)
    if (I_beg > I_end){
        fprintf('It is empty in this node!')
        return(NULL)
    }
    # cl  <- parallel::makeCluster(cpus_per_node, type = "FORK") #fork only work in linux
    # print(length(cl))

    # If error, still can close cluster works
    # https://stat.ethz.ch/R-manual/R-devel/library/parallel/html/mclapply.html
    res <- tryCatch({
        ## parallel function for parLapplyLB
        # parLapplyLB(cl, sites[I_beg:I_end], calsite_pheno, df = df) #mcmapply,
        print(names(X[I_beg:I_end])) 
        mclapply(X[I_beg:I_end], FUN, mc.cores = cpus_per_node, ...)  #SIMPLIFY
    }, error = function(e){
        message(sprintf('[error]: %s', e$message))
        # stopCluster(cl)
    })
    # , warning = function(w){
    #     message(sprintf('[warning]: %s', w$message))
    #     # stopCluster(cl)
    # } #can't write warning here, code break
    if (Save){
        if (!dir.exists(outdir)) dir.create(outdir, recursive = T)
        saveRDS(res, file = outfile)        
    }

    # stopCluster(cl)
    if (return.res) return(res)
    NULL 
}

#' get_sbatch
#' 
#' Merge the slurm result.
#' 
#' @param indir directory to search rda files
#' @param pattern an optional regular expression. Only file names which match 
#' the regular expression will be returned.
#' @param Save save result in rda file or not?
#' @param outfile If Save = T, output rda file name.
#' 
#' @export
get_sbatch <- function(indir = '.', pattern = 'result.*.RDS',
                          outfile = NULL, Save = !is.null(outfile)){
    files <- dir(indir, pattern, full.names = T)
    cat('OUTPUTs:', "\n")
    print(basename(files))
    RES <- lapply(files, readRDS) %>% do.call(c, .)

    if (Save) save(RES, file = outfile)
    return(RES)
}
kongdd/Ipaper documentation built on March 27, 2024, 5:34 a.m.