R/cali_ensemble.R

Defines functions cali_ensemble

Documented in cali_ensemble

#' Calibrate models
#'
#' Use one of three methods to calibrate specified models.
#'
#' @param config_file file path; to LakeEnsemblr yaml master config file
#' @param num integer; the number of random parameter sets to generate. If param file is provided
#' num = number of parameters in that file.
#' @param param_file file path; to previously created parameter file set. If NULL creates a new
#' parameter set. Defaults to NULL
#' @param cmethod character; Method for calibration. Can be "LHC", "LHC_old,
#'  "MCMC" or "modFit". Defaults to "LHC". LHC and LHC_old only differ in the
#'  way the parallelization is set up, whereas the new and default LHC version is
#'  more efficient and LHC_old is only kept for possible backwards compatibility.
#' @param config_file file path; to LakeEnsemblr yaml master config file
#' @param model vector; model to export driving data. Options include c("GOTM", "GLM", "Simstrat",
#' "FLake", "MyLake")
#' @param folder file path; to folder which contains the model folders generated by export_config()
#' @param spin_up numeric; Number of days to disregard as spin-up for analysis.
#' @param out_f character; name of the folder to store results into
#' @param qualfun function; function that calculates measure of fit from observed and simulated
#'    variables, takes the two arguments Observed and Simulated
#' @param parallel Boolean; should the model calibration be parallelized
#' @param job_name character; optional name to use as an RStudio job and as output variable
#'  name. It has to be a syntactically valid name. Check out this webpage for more info on jobs:
#'  https://blog.rstudio.com/2019/03/14/rstudio-1-2-jobs/
#' @param ncores numeric; number of cores to be used. If NULL, will default to 
#'    `parallel::detectCores() - 1`.
#' @param tmp_dir location where the temporary files for LHC calibration in parallel are stored
#' @param ... additional arguments passed to FME::modFit or FME::modMCMC. Only used when method is
#'    modFit or MCMC
#' @details Parallelisation is done using the `parallel` package and `parLapply()`. The number of
#'    cores used is set to the value specified in `ncores`.
#'
#' @examples
#' \dontrun{
#'
#' config_file <- 'LakeEnsemblR.yaml'
#'
#' # LHC method
#' cali_ensemble(config_file = config_file, num = 200, cmethod = "LHC",
#'              model = c("FLake", "GLM", "GOTM", "Simstrat", "MyLake"))
#'
#' # MCMC method
#' resMCMC <- cali_ensemble(config_file = config_file, num = 200, cmethod = "MCMC",
#'                          model = c("FLake", "GLM", "GOTM", "Simstrat", "MyLake"))
#'
#' # modFit method using the Nelder-Mead algorithm
#' resMmodFit <- cali_ensemble(config_file = config_file, num = 200, cmethod = "modFit",
#'                             model = c("FLake", "GLM", "GOTM", "Simstrat", "MyLake"),
#'                             method = "Nelder-Mead")
#'
#' # LHC method using multiple cores
#' cali_ensemble(config_file = config_file, num = 200, cmethod = "LHC",
#'              model = c("FLake", "GLM", "GOTM", "Simstrat", "MyLake"),
#'              parallel = TRUE)
#'
#' # LHC method deployed as a job
#' cali_ensemble(config_file = config_file, num = 200, cmethod = "LHC",
#'              model = c("FLake", "GLM", "GOTM", "Simstrat", "MyLake"),
#'              job_name = "test")
#' test # View output from job
#'
#' }
#' @importFrom reshape2 dcast
#' @importFrom parallel detectCores parLapply clusterExport makeCluster stopCluster clusterEvalQ
#' @importFrom FME Latinhyper modMCMC
#' @importFrom gotmtools get_yaml_value calc_cc input_nml sum_stat input_yaml get_vari
#' @importFrom glmtools get_nml_value
#' @importFrom lubridate round_date seconds_to_period
#' @importFrom configr read.config write.config
#'
#' @export

cali_ensemble <- function(config_file, num = NULL, param_file = NULL, cmethod = "LHC",
                          qualfun = qual_fun, parallel = FALSE, job_name,
                          model = c("FLake", "GLM", "GOTM", "Simstrat", "MyLake"),
                          folder = ".", spin_up = NULL, out_f = "cali",
                          ncores = NULL, tmp_dir = NULL, ...) {


  # ---- Send to RStudio Jobs -----
  if (!missing(job_name)) {
    if (make.names(job_name) != job_name) {
      stop("job_name '",
           job_name,
           "' is not a syntactically valid variable name.")
    }

    # Evaluates all arguments.
    call <- match.call()
    call$config_file <- config_file
    call$num <- num
    call$param_file <- param_file
    call$cmethod <- cmethod
    # call$qualfun <- qualfun
    call$parallel <- parallel
    call$model <- model
    call$folder <- folder
    call$spin_up <- spin_up
    call$out_f <- out_f
    
    # get number of output arguments for qualfun
    nout_fun <-  length(qualfun(c(1, 1, 1, 1), c(1.1, 0.9, 1, 1.2)))


    call_list <- lapply(call, eval)
    call[names(call_list)[-1]] <- call_list[-1]

    script <- make_script(call = call, name = job_name)
    if (!requireNamespace("rstudioapi", quietly = TRUE)) {
      stop("Jobs are only supported in RStudio.")
    }

    if (!rstudioapi::isAvailable("1.2")) {
      stop(
        "Need at least version 1.2 of RStudio to use jobs. Currently running ",
        rstudioapi::versionInfo()$version,
        "."
      )
    }

    job <-
      rstudioapi::jobRunScript(path = script,
                               name = job_name,
                               exportEnv = "R_GlobalEnv")
    return(invisible(job))
  }

##----------------- check inputs and set things up -------------------------------------------------

  # check if method is one of the allowed
  if(!cmethod %in% c("modFit", "LHC", "LHC_old", "MCMC")) {
    stop(paste0("Method ", cmethod, " not allowed. Use one of: modFit, LHC,
                LHC_old, or MCMC"))
  }

  # check model input
  model <- check_models(model, check_package_install = TRUE)
  # check the master config file
  check_master_config(config_file, model)

  # It's advisable to set timezone to GMT in order to avoid errors when reading time
  original_tz <- Sys.getenv("TZ")
  Sys.setenv(TZ = "UTC")
  tz <- "UTC"

  # get number of output arguments for qualfun
  nout_fun <-  length(qualfun(c(1, 1, 1, 1), c(1.1, 0.9, 1, 1.2)))
  # Set working directory
  oldwd <- getwd()

  # this way if the function exits for any reason, success or failure, these are reset:
  on.exit({
    setwd(oldwd)
    Sys.setenv(TZ = original_tz)
    # Remove temporary files
    if(file.exists(file.path(folder, "LER_CNFG_TMP.yaml"))){
      file.remove(file.path(folder, "LER_CNFG_TMP.yaml"))
    }
  })


  # path to master config file
  yaml <- file.path(folder, config_file)
  # get setup parameter
  start <- gotmtools::get_yaml_value(yaml, label = "time", key = "start")
  stop <- gotmtools::get_yaml_value(yaml, label = "location", key = "stop")
  obs_file <- gotmtools::get_yaml_value(file = yaml, label = "temperature", key = "file")
  time_unit <- gotmtools::get_yaml_value(yaml, "output", "time_unit")
  if(time_unit == "second"){
    # Needed to create out_time vector
    time_unit <- "sec"
  }
  time_step <- gotmtools::get_yaml_value(yaml, "output", "time_step")
  cnfg_l <- lapply(model, function(m) gotmtools::get_yaml_value(yaml, "config_files", m))
  names(cnfg_l) <- model
  met_timestep <- get_meteo_time_step(file.path(folder,
                                                gotmtools::get_yaml_value(yaml, "meteo", "file")))

##----------------- read in observed data  ---------------------------------------------------------

  # Create output time vector
  if(is.null(spin_up)){
    out_time <- seq.POSIXt(as.POSIXct(start, tz = tz), as.POSIXct(stop, tz = tz), by =
                             paste(time_step, time_unit))
  }else{
    start <- as.POSIXct(start, tz = tz) + spin_up * 24 * 60 * 60
    stop <- as.POSIXct(stop, tz = tz)
    out_time <- seq.POSIXt(as.POSIXct(start, tz = tz), as.POSIXct(stop, tz = tz), by =
                             paste(time_step, time_unit))
  }
  out_time <- data.frame(datetime = out_time)

  if(met_timestep == 86400){
    out_hour <- lubridate::hour(start)
  }else{
    out_hour <- 0
  }

  # read in Observed data
  message("Loading observed wtemp data...")
  obs <- read.csv(file.path(folder, obs_file), stringsAsFactors = FALSE)
  obs$datetime <- as.POSIXct(obs$datetime, tz = tz)

  # Subset to out_time
  obs <- obs[obs$datetime %in% out_time$datetime, ]

  obs_deps <- sort(unique(obs$Depth_meter))
  
  # check if all entries are unique
  if(any(duplicated(paste0(obs$datetime, obs$Depth_meter)))){
    warning(paste0("There are non-unique observations in the observed",
                   " water temperature file ", obs_file, "! Non-unique ",
                   "observations are averaged."))
  }
  
  # change data format from long to wide
  obs_out <- reshape2::dcast(obs, datetime ~ Depth_meter, value.var = "Water_Temperature_celsius",
                   fun.aggregate = mean, na.rm = TRUE)
  str_depths <- colnames(obs_out)[2:ncol(obs_out)]
  colnames(obs_out) <- c("datetime", paste("wtr_", str_depths, sep = ""))
  obs_out$datetime <- as.POSIXct(obs_out$datetime)
  message("Finished!")

##---------------- read in  parameter initial values or create parameter sets ----------------------

  # if not existing create output file
  dir.create(file.path(folder, out_f), showWarnings = FALSE)

  ## use initial values from master config file as starting values
  # load master config file
  configr_master_config <- configr::read.config(yaml)
  # meteo parameter
  cal_section <- configr_master_config[["calibration"]][["met"]]
  params_met <- sapply(names(cal_section), function(n) cal_section[[n]]$initial)
  p_lower_met <- sapply(names(cal_section), function(n) cal_section[[n]]$lower)
  p_upper_met <- sapply(names(cal_section), function(n) cal_section[[n]]$upper)
  p_log_met <- sapply(names(cal_section), function(n) cal_section[[n]]$log)
  # Kw parameter
  cal_section <- configr_master_config[["calibration"]][["Kw"]]
  params_kw <- c(Kw = cal_section$initial)
  p_lower_kw <- c(Kw = cal_section$lower)
  p_upper_kw <- c(Kw = cal_section$upper)
  p_log_kw <- c(Kw = cal_section$log)
  # get names of models for which parameter are given
  model_p <- model[model %in% names(configr_master_config[["calibration"]])]
  # model specific parameters
  cal_section <- lapply(model_p, function(m) configr_master_config[["calibration"]][[m]])
  names(cal_section) <- model_p
  # get parameters
  params_mod <- lapply(model_p, function(m) {
                  sapply(names(cal_section[[m]]),
                         function(n) as.numeric(cal_section[[m]][[n]]$initial))})
  names(params_mod) <- model_p
  # get lower bound
  p_lower_mod <- lapply(model_p, function(m) {
    sapply(names(cal_section[[m]]),
           function(n) as.numeric(cal_section[[m]][[n]]$lower))})
  names(p_lower_mod) <- model_p
  # get upper bound
  p_upper_mod <- lapply(model_p, function(m) {
    sapply(names(cal_section[[m]]),
           function(n) as.numeric(cal_section[[m]][[n]]$upper))})
  names(p_upper_mod) <- model_p
  # log transform for LHC?
  log_mod <- lapply(model_p, function(m) {
    sapply(names(cal_section[[m]]),
           function(n) as.logical(cal_section[[m]][[n]]$log))})
  names(log_mod) <- model_p

  # create a list with parameters for every model
  pars_l <- lapply(model, function(m){
    df <- data.frame(pars = c(params_met, params_kw, params_mod[[m]], recursive = TRUE),
                     name = c(names(params_met), names(params_kw), names(params_mod[[m]]), recursive = TRUE),
                     upper = c(p_upper_met, p_upper_kw, p_upper_mod[[m]], recursive = TRUE),
                     lower = c(p_lower_met, p_lower_kw, p_lower_mod[[m]], recursive = TRUE),
                     type = c(rep("met", length(params_met)),
                              rep("kw", length(params_kw)),
                              rep("model", length(params_mod[[m]])), recursive = TRUE),
                     log = c(p_log_met, p_log_kw, log_mod[[m]], recursive = TRUE),
                     stringsAsFactors = FALSE)
    colnames(df) <- c("pars", "name", "upper", "lower", "type", "log")
    return(df)
  })
  names(pars_l) <- model

  # count number of different sets for LHC
  par_sets <- setNames(sapply(model, function(m) length(pars_l[[m]]$pars)), model)
  # output name
  outf_n <- paste0(cmethod, "_", format(Sys.time(), "%Y%m%d%H%M"))

  # if cmethod == LHC sample parameter or read from provided file
  if(cmethod %in% c("LHC_old", "LHC")) {

    # name for the output files
    if(!is.null(param_file)){
      # if the models have different number of pars to calibrate a file can not be supplied
      if(length(unique(par_sets)) > 1) {
        stop(paste0("The calibration configuration in the master config file ",
                    config_file, "results in ", length(unique(par_sets)),
                    " In this case providing own calibration file is not supported."))
      }
      # set name to name of supplied file
      outf_n <- gsub("_params_", "", basename(param_file))
    }

    if(is.null(param_file)) {
      pars_lhc <- list()
      for (m in model) {
        # range of parametes
        prange <- matrix(c(pars_l[[m]]$lower, pars_l[[m]]$upper), ncol = 2)
        # calculate log if wanted
        prange[pars_l[[m]]$log, ] <- log10(prange[pars_l[[m]]$log, ])
        # sample parameter sets
        pars_lhc[[m]] <- FME::Latinhyper(parRange = prange, num = num)
        # retransform log parameter
        pars_lhc[[m]][, pars_l[[m]]$log] <- 10^pars_lhc[[m]][, pars_l[[m]]$log]
        # only use 5 significant digits
        pars_lhc[[m]] <- signif(pars_lhc[[m]], 5)
        # set colnames
        colnames(pars_lhc[[m]]) <- pars_l[[m]]$name
        pars_lhc[[m]] <- as.data.frame(pars_lhc[[m]])
        # add identifier for every set
        pars_lhc[[m]]$par_id <- paste0("p", formatC(seq_len(num), width = round(log10(num)) + 1,
                                                    format = "d", flag = "0"))
        # write parameter sets to file
        write.table(pars_lhc[[m]], file = file.path(folder, out_f,
                                                    paste0("params_", m, "_", outf_n, ".csv")),
                    quote = FALSE, row.names = FALSE, sep = ",")

      }

    } else {
      # if file is supplied read it in
      pars_lhc <- lapply(model, function(m) read.csv(param_file, stringsAsFactors = FALSE))
      names(pars_lhc) <- model
      # check if the number of columns in the file fit the number of parameters to be calibrated
      if((ncol(pars_lhc[[1]]) - 1) != unique(par_sets)) {
        stop(paste0("Number of parameters in file ", param_file, " (", (ncol(pars_lhc[[1]]) - 1),
                    ") ", "and number of parameters to calibrate in master config file (",
                    unique(par_sets), ") do not match!"))
      }
      num <- nrow(pars_lhc[[1]])
    }
  } else {
    # we just need an empty variable to export to parallel clusters (not a good fix, but works)
    pars_lhc <- NULL
  }

  # 2020-08-05: Call to export_config removed on, users need to run export_config before
  #             calling function.
  # 2021-07-02: Call to export_meteo was re-added, to make sure that calibrated variables are
  #             always calibrated between the specified boundaries, and not affected by pre-set
  #             scaling factors. 

# ##--------- Reset scaling factors that are to be calibrated to 1.0 --------------------------
  
  # Do not work in original file
  if(file.exists(file.path(folder, "LER_CNFG_TMP.yaml"))){
    
    warning(strwrap("The file 'LER_CNFG_TMP.yaml' exists in your folder which
                    is a reserved file name. This will be overwritten."))
    unlink(file.path(folder, "LER_CNFG_TMP.yaml"))
  }
  file.copy(yaml, file.path(folder, "LER_CNFG_TMP.yaml"))

  # If scaling factors are in the calibration section, set them to 1.0 in the TMP file
  lst_config_tmp <- configr::read.config(file.path(folder, "LER_CNFG_TMP.yaml"))
  scfctrs_to_calibrate <- names(lst_config_tmp[["calibration"]][["met"]])
  
  # Set these factors to 1 in the "all" section, and also in model-specific sub-sections
  names_scale_section <- names(lst_config_tmp[["scaling_factors"]])
  
  for(i in names_scale_section){
    if(i == "all"){
      for(j in scfctrs_to_calibrate){
        lst_config_tmp[["scaling_factors"]][["all"]][[j]] <- 1.0
      }
    }else{
      for(j in scfctrs_to_calibrate){
        if(!is.null(lst_config_tmp[["scaling_factors"]][[i]][[j]])){
          lst_config_tmp[["scaling_factors"]][[i]][[j]] <- 1.0
        }
      }
    }
  }
  
  configr::write.config(lst_config_tmp, file.path = file.path(folder, "LER_CNFG_TMP.yaml"),
                        write.type = "yaml", indent = 3)
  
  export_meteo(config_file = "LER_CNFG_TMP.yaml", model = model, folder = folder)
  
##----------------- read in model meteo files ----------------------------------------------------

  ## read in meteo
  met_l <- lapply(model, function(m){
    met_name <- get_model_met_name(m, cnfg_l[[m]])
    ## list with long standard names
    l_names <- as.list(met_var_dic$standard_name)
    names(l_names) <- met_var_dic$short_name

    if(m == "MyLake") {
      met_m <- read.table(file.path(folder, m, met_name), sep = "\t",
                        header = FALSE)
      colnames(met_m) <- c(l_names$time, l_names$swr, l_names$cc, l_names$airt, l_names$relh,
                           l_names$p_surf, l_names$wind_speed, l_names$precip)
    } else if (m == "GLM") {
      met_m <- read.table(file.path(folder, m, met_name), sep = ",", header = TRUE)
    } else if (m == "FLake") {
      # read in meteo file
      met_m <- read.table(file.path(folder, m, met_name), sep = "\t",
                              header = FALSE)
      colnames(met_m) <- c("!Shortwave_Radiation_Downwelling_wattPerMeterSquared",
                           "Air_Temperature_celsius", "Vapour_Pressure_milliBar",
                           "Ten_Meter_Elevation_Wind_Speed_meterPerSecond",
                           "Cloud_Cover_decimalFraction", "datetime")
    } else if (m == "GOTM") {
      # read in meteo file
      met_m <- read.table(file.path(folder, m, met_name), sep = "\t", header = TRUE)
      colnames(met_m)[1] <- "!datetime"
    } else if(m == "Simstrat") {
      # read in meteo file
      met_m <- read.table(file.path(folder, m, met_name), sep = "\t",
                                 header = TRUE)

    }
    return(met_m)
  })

  names(met_l) <- model

##------------------------- parallel LHC calibration -----------------------------------------------

  if(parallel){
    
    if (is.null(ncores)) {
      ncores <- parallel::detectCores() - 1
    }
    
    cl <- parallel::makeCluster(ncores)
    on.exit(parallel::stopCluster(cl))
    parallel::clusterExport(cl = cl, 
                            unclass(lsf.str(envir = asNamespace("LakeEnsemblR"), 
                                            all = T)),
                            envir = as.environment(asNamespace("LakeEnsemblR"))
    )
    
    if (cmethod == "LHC_old") {
      parallel::clusterExport(cl, varlist = list("pars_lhc", "pars_l", "model", "config_file", "met_l",
                                                 "folder", "out_f", "cnfg_l", "obs_deps",
                                                 "obs_out", "out_hour", "qualfun",
                                                 "outf_n"),
                              envir = environment())
      message("\nStarted parallel LHC [", Sys.time(), "]\n")
      model_out <- setNames(
        parLapply(cl, model, function(m) LHC_model(pars = pars_lhc[[m]],
                                                      type = pars_l[[m]]$type,
                                                      model = m, var = "temp",
                                                      config_file = config_file,
                                                      met = met_l[[m]], folder = folder,
                                                      out_f = out_f, config_f = cnfg_l[[m]],
                                                      obs_deps = obs_deps, obs_out = obs_out,
                                                      out_hour = out_hour, qualfun = qualfun,
                                                      nout_fun = nout_fun, outf_n = outf_n
        )),
        model
      )
      message("\nFinished parallel LHC [", Sys.time(), "]\n")
    }
    
    if (cmethod == "LHC") {
      
      model_out <- setNames(
        lapply(model, \(m) {
        
          temp_dirs <- make_temp_dir(model = m, folder = folder, n = ncores,
                                     tmp_dir = tmp_dir)
          param_list <- split(pars_lhc[[m]], rep(1:ncores))
          type <- pars_l[[m]]$type
          met <- met_l[[m]]
          config_f <- cnfg_l[[m]]
          
          varlist = list("config_file", "m", "temp_dirs", "type", "met",
                         "obs_out", "out_hour","config_f", "nout_fun",
                         "qualfun", "folder", "out_f", "obs_deps",
                         "outf_n")
          
          parallel::clusterExport(cl, varlist = varlist,
                                  envir = environment())
          message(m, ": Starting LHC calibration with ", num, " parameters using ", 
                  ncores, " cores. [", Sys.time(), "]")
    
          # model_out <- lapply(seq_along(param_list), \(pars, i) {
          model_out <- parallel::parLapply(cl, seq_along(param_list), \(pars, i) {
            
            # Make temporary directorires for running the models
            temp_dir <- temp_dirs[i]
    
            names_out_qfun <- colnames(qualfun(c(1, 1), c(0.9, 0.8)))
            # Create dataframe for storing the results
            out_i <- as.data.frame(matrix(NA, nrow = nrow(pars[[i]]), 
                                          ncol = nout_fun + 1))
            names(out_i) <- c(names_out_qfun, "par_id")
            out_i$par_id <- pars[[i]]$par_id
            # loop over all parameter sets
            for (p in seq_len(nrow(pars[[i]]))) {
              # change the paremeter/meteo scaling
              change_pars(config_file = config_file, model = m,
                          pars = pars[[i]][p, -ncol(pars[[i]]), drop = FALSE],
                          type = type, met = met, folder = temp_dir)
              # calculate quality measure
              qual_i <- cost_model(config_file = config_file, model = m, var = "temp", folder = temp_dir,
                                   obs_deps = obs_deps, obs_out = obs_out, out_hour = out_hour,
                                   qualfun = qualfun, config_f = config_f)
              if(any(is.na(qual_i))) {
                qual_i <- setNames(rep(NA, nout_fun), names_out_qfun)
              } 
              out_i[p, -ncol(out_i)] <- qual_i
            }
            return(out_i)
          }, pars = param_list)
          
          message(m, ": Finished LHC calibration. [", Sys.time(), "]")
          
          # Bind all the results together and write to file
          g1 <- do.call(rbind, model_out)
          g1 <- g1[order(g1$par_id), ]
          out_name <- paste0(m, "_", outf_n, ".csv")
          # switch if file is existing
          flsw <- file.exists(file.path(oldwd, out_f, out_name))
          write.table(x = g1, file = file.path(oldwd, out_f, out_name),
                      append = ifelse(flsw, TRUE, FALSE), sep = ",", row.names = FALSE,
                      col.names = ifelse(flsw, FALSE, TRUE), quote = FALSE)
          return(g1)
          }),
        model)
    }

    # if own filepath for temp dirs was provided delete files on exit
    on.exit({
      if(!is.null(tmp_dir)) {
      unlink(tmp_dir, recursive = TRUE, force = TRUE)
      }
    })
##------------------------- parallel MCMC calibration ----------------------------------------------

    if(cmethod == "MCMC") {
      parallel::clusterExport(cl, varlist = list("pars_lhc", "pars_l", "model", "config_file", "met_l",
                                                 "folder", "out_f", "cnfg_l", "obs_deps",
                                                 "obs_out", "out_hour", "qualfun",
                                                 "outf_n"),
                              envir = environment())
      message("\nStarted parallel MCMC\n")
      model_out <- setNames(
        parLapply(cl, model, function(m){
          FME::modMCMC(f = wrap_model,
                       p = setNames(pars_l[[m]]$pars,
                                    pars_l[[m]]$name),
                       type = pars_l[[m]]$type,
                       model = m,
                       var = "temp",
                       config_file = config_file,
                       met = met_l[[m]],
                       folder = folder,
                       config_f = cnfg_l[[m]],
                       out_f = out_f,  obs_deps = obs_deps, obs_out = obs_out,
                       out_hour = out_hour,
                       qualfun = function(O, P){
                         ssr = sum((as.matrix(O[, -1]) - as.matrix(P[, -1]))^2, na.rm = TRUE)},
                       outf_n = outf_n,
                       niter = num,
                       lower = setNames(pars_l[[m]]$lower,
                                        pars_l[[m]]$name),
                       upper = setNames(pars_l[[m]]$upper,
                                        pars_l[[m]]$name),
                       ...)}),
        model
      )
      message("\nFinished parallel MCMC\n")
    }

##------------------------- parallel modFit calibration ------------------------------------------
    if(cmethod == "modFit") {
      parallel::clusterExport(cl, varlist = list("pars_lhc", "pars_l", "model", "config_file", "met_l",
                                                 "folder", "out_f", "cnfg_l", "obs_deps",
                                                 "obs_out", "out_hour", "qualfun",
                                                 "outf_n"),
                              envir = environment())
      message("\nStarted parallel modFit\n")
      model_out <- setNames(
        parLapply(cl, model, function(m){
          FME::modFit(f = wrap_model,
                      p = setNames(pars_l[[m]]$pars,
                                   pars_l[[m]]$name),
                      type = pars_l[[m]]$type,
                      model = m,
                      var = "temp",
                      config_file = config_file,
                      met = met_l[[m]],
                      folder = folder,
                      config_f = cnfg_l[[m]],
                      out_f = out_f,  obs_deps = obs_deps, obs_out = obs_out,
                      out_hour = out_hour,
                      qualfun = function(O, P){
                        res = na.exclude(as.vector(as.matrix(O[, -1]) - as.matrix(P[, -1])))},
                      outf_n = "",
                      write = FALSE,
                      lower = setNames(pars_l[[m]]$lower,
                                       pars_l[[m]]$name),
                      upper = setNames(pars_l[[m]]$upper,
                                       pars_l[[m]]$name),
                      ...)}),
        model
      )
      message("\nFinished parallel modFit\n")
    }
  } else {

##------------------------- LHC calibration --------------------------------------------------------

    if(cmethod == "LHC") {
      model_out <- setNames(
        lapply(model, function(m) LHC_model(pars = pars_lhc[[m]],
                                            type = pars_l[[m]]$type,
                                            model = m, var = "temp",
                                            config_file = config_file,
                                            met = met_l[[m]], folder = folder,
                                            out_f = out_f, config_f = cnfg_l[[m]],
                                            obs_deps = obs_deps, obs_out = obs_out,
                                            out_hour = out_hour, qualfun = qualfun,
                                            nout_fun = nout_fun, outf_n = outf_n
                                                  )),
        model
      )
    }

##------------------------- MCMC calibration -------------------------------------------------------

    if(cmethod == "MCMC") {
      model_out <- setNames(
                  lapply(model, function(m){
                    message(paste0("\nStarted MCMC for model ", m, "\n"))
                      res <- FME::modMCMC(f = wrap_model,
                                          p = setNames(pars_l[[m]]$pars,
                                                       pars_l[[m]]$name),
                                          type = pars_l[[m]]$type,
                                          model = m,
                                          var = "temp",
                                          config_file = config_file,
                                          met = met_l[[m]],
                                          folder = folder,
                                          config_f = cnfg_l[[m]],
                                          out_f = out_f,  obs_deps = obs_deps, obs_out = obs_out,
                                          out_hour = out_hour,
                                          qualfun = function(O, P){
                                           ssr = sum((as.matrix(O[, -1]) - as.matrix(P[, -1]))^2,
                                                     na.rm = TRUE)},
                                          outf_n = outf_n,
                                          niter = num,
                                          lower = setNames(pars_l[[m]]$lower,
                                                           pars_l[[m]]$name),
                                          upper = setNames(pars_l[[m]]$upper,
                                                           pars_l[[m]]$name), ...)
                      message(paste0("\nFinished MCMC for model ", m, "\n"))
                      return(res)}),
                  model
      )
    }

##------------------------- modFit calibration ---------------------------------------------------

    if(cmethod == "modFit") {
      model_out <- setNames(
        lapply(model, function(m){
          message(paste0("\nStarted fitting of model ", m, "\n"))
          res <- FME::modFit(f = wrap_model,
                             p = setNames(pars_l[[m]]$pars,
                                   pars_l[[m]]$name),
                             type = pars_l[[m]]$type,
                             model = m,
                             var = "temp",
                             config_file = config_file,
                             met = met_l[[m]],
                             folder = folder,
                             config_f = cnfg_l[[m]],
                             out_f = out_f,  obs_deps = obs_deps, obs_out = obs_out,
                             out_hour = out_hour,
                             qualfun = function(O, P){
                             res = na.exclude(as.vector(as.matrix(O[, -1]) - as.matrix(P[, -1])))},
                             outf_n = "",
                             write = FALSE,
                             lower = setNames(pars_l[[m]]$lower,
                                              pars_l[[m]]$name),
                             upper = setNames(pars_l[[m]]$upper,
                                              pars_l[[m]]$name),
                             ...)
          message(paste0("\nFinished fitting of model ", m, "\n"))
          return(res)}),
        model
      )
    }
  }
  # return calibration results
  return(model_out)
}
aemon-j/LakeEnsemblR documentation built on April 11, 2025, 10:09 p.m.