Collect: Collect and merge the computation results

View source: R/Collect.R

CollectR Documentation

Collect and merge the computation results

Description

The final step of the startR workflow after the data operation. It is used when the parameter 'wait' of Compute() is FALSE. It combines all the chunks of the results as one data array when the execution is done. See more details on practical guide. Collect() calls Collect_ecflow() or Collect_autosubmit() according to the chosen workflow manager.

Usage

Collect(startr_exec, wait = TRUE, remove = TRUE, on_remote = FALSE)

Arguments

startr_exec

An R object returned by Compute() when the parameter 'wait' of Compute() is FALSE. It can be directly from a Compute() call or read from the RDS file.

wait

A logical value deciding whether the R session waits for the Collect() call to finish (TRUE) or not (FALSE). If TRUE, it will be a blocking call, in which Collect() will retrieve information from the HPC, including signals and outputs, each polling_period seconds. The the status can be monitored on the workflow manager GUI. Collect() will not return until the results of all the chunks have been received. If FALSE, Collect() return an error if the execution has not finished, otherwise it will return the merged array. The default value is TRUE.

remove

A logical value deciding whether to remove of all chunk results received from the HPC after data being collected, as well as the local job folder under 'ecflow_suite_dir' or 'autosubmit_suite_dir'. To preserve the data and Collect() them as many times as desired, set remove to FALSE. The default value is TRUE.

on_remote

A logical value deciding to the function is run locally and sync the outputs back from HPC (FALSE, default), or it is run on HPC (TRUE).

Value

A list of merged data array.

Examples

 data_path <- system.file('extdata', package = 'startR')
 path_obs <- file.path(data_path, 'obs/monthly_mean/$var$/$var$_$sdate$.nc')
 sdates <- c('200011', '200012')
 data <- Start(dat = list(list(path = path_obs)),
               var = 'tos',
               sdate = sdates,
               time = 'all',
               latitude = 'all',
               longitude = 'all',
               return_vars = list(latitude = 'dat',
                                  longitude = 'dat',
                                  time = 'sdate'),
               retrieve = FALSE)
 fun <- function(x) {
           lat = attributes(x)$Variables$dat1$latitude
           weight = sqrt(cos(lat * pi / 180))
           corrected = Apply(list(x), target_dims = "latitude",
                             fun = function(x) {x * weight})
         }
 step <- Step(fun = fun,
              target_dims = 'latitude',
              output_dims = 'latitude',
              use_libraries = c('multiApply'),
              use_attributes = list(data = "Variables"))
 wf <- AddStep(data, step)
 ## Not run: 
 res <- Compute(wf, chunks = list(longitude = 2, sdate = 2),
                threads_load = 1,
                threads_compute = 4,
                cluster = list(queue_host = 'nord3',
                               queue_type = 'lsf',
                               temp_dir = '/on_hpc/tmp_dir/',
                               cores_per_job = 2,
                               job_wallclock = '05:00',
                               max_jobs = 4,
                               extra_queue_params = list('#BSUB -q bsc_es'),
                               bidirectional = FALSE,
                               polling_period = 10
                ),
                ecflow_suite_dir = '/on_local_machine/username/ecflow_dir/',
                wait = FALSE)
 saveRDS(res, file = 'test_collect.Rds')
 collect_info <- readRDS('test_collect.Rds')
 result <- Collect(collect_info, wait = TRUE)
 
## End(Not run)


startR documentation built on May 29, 2024, 5:33 a.m.

Related to Collect in startR...