R/run.R

Defines functions run

Documented in run

#' run
#'
#' @param args_list TODO
#' @param parent_env TODO
#' @param parent_dir TODO
#' @param orig_code_path TODO
#' @param cluster_size TODO
#' @param wait_between_checks TODO
#' @param pause_to_space_io TODO
#' @param cache TODO
#' @param seed TODO
#' @param output_full_run_env TODO
#' @param kill_after_n_secs TODO
#'
#' @return run_output function output
#' @export
run <- function(args_list, parent_env, parent_dir, orig_code_path, cluster_size = 2, wait_between_checks = 10, pause_to_space_io = 5, cache = F, seed = 1, output_full_run_env = F, kill_after_n_secs = 30000) {
  if (is.list(args_list) == F) {
    stop("args_list is not a list.")
  }

  if (is.null(names(args_list))) {
    names(args_list) <- paste0("job", seq_len(length(args_list)))
  }
  names(args_list) <- make.names( names(args_list))


  parent_dir <- gsub("\\\\", "/", parent_dir)
  if (file.exists(orig_code_path) == F) {
    stop("orig_code_path=", orig_code_path, " Does not exist.")
  }

  if (dir.exists(parent_dir) == T & cache == F) {
    unlink(parent_dir, recursive = T)
  }

  if (dir.exists(parent_dir) == F) {
    dir.create(parent_dir, recursive = T)
  }


  code_path <- file.path(parent_dir, "code.R")
  file.copy(from = orig_code_path, to = code_path, overwrite = T)

  parent_env_path <- file.path(parent_dir, "run_env.rds")
  saveRDS(parent_env, file = parent_env_path)

  number_of_jobs <- length(args_list)
  job_names <- names(args_list)

  process_paths <- rep(NA, number_of_jobs)
  names(process_paths) <- job_names
  output_paths <- rep(NA, number_of_jobs)
  names(output_paths) <- job_names
  finished_vec <- rep(F, number_of_jobs)
  names(finished_vec) <- job_names
  running_vec <- rep(F, number_of_jobs)
  names(running_vec) <- job_names


  process_list <- vector("list", number_of_jobs)
  names(process_list) <- job_names


  for (job_name in job_names) {
    temp_child <- run_create_process_i(i = job_name, args = args_list[[job_name]], parent_dir = parent_dir, seed = seed, output_full_run_env = output_full_run_env)
    process_paths[job_name] <- temp_child$process_path
    output_paths[job_name] <- temp_child$output_path
    # don't check here is the process has been complete or not.
  }



  t1 <- Sys.time()
  while (any(finished_vec == F)) {

    # ********************************************************************************************************
    if (sum(running_vec) == cluster_size) {
      # All clusters are running
      Sys.sleep(wait_between_checks)
    } else if (all(finished_vec | running_vec)) {
      # all process have finished or started
      Sys.sleep(wait_between_checks)
    } else { # Start of trying to start one
      #-------------------------------------------------------------------
      started_or_finished <- which(finished_vec | running_vec)
      # determine which process to try to start
      if (length(started_or_finished) == 0) {
        # none have started
        next_i <- 1
      } else {
        # next item that hasn't started or finished
        next_i <- max(started_or_finished) + 1
      }
      if (next_i <= length(job_names)) { # 1st
        # Check to see if output for that child already exists
        while (next_i <= length(job_names) && file.exists(output_paths[job_names[next_i]])) {
          message("Already exists: ", job_names[next_i])
          finished_vec[next_i] <- T
          running_vec[next_i] <- F
          next_i <- next_i + 1
        }

        if (next_i <= length(job_names)) { # 2nd
          job_name <- job_names[next_i]
          output_path <- output_paths[job_name]

          process_path <- process_paths[job_name]
          message("Starting: ", job_name)
          process_list[[job_name]] <- processx::process$new(rscript_path(), process_path)
          running_vec[next_i] <- T

          Sys.sleep(pause_to_space_io) # pause_to_space_io at the end
        } # if (next_i <= length(job_names)) {  # 2nd
      } # if (next_i <= length(job_names)) { # 1st
      #-------------------------------------------------------------------
    } # End of trying to start one

    # ********************************************************************************************************

    which_running <- which(running_vec)
    for (next_i in which_running) {
      job_name <- job_names[next_i]
      p1 <- process_list[[job_name]]
      time_diff <- difftime(Sys.time(), p1$get_start_time(), units = "secs")

      if (process_list[[job_name]]$is_alive() == F) {
        finished_vec[next_i] <- T
        running_vec[next_i] <- F
        if (file.exists(output_paths[job_name])) {
          run_env <- readRDS(output_paths[job_name])
          if (run_env$u__run_success) {
            message("Finished Success: ", job_name, " ", round(time_diff, 2))
          } else {
            message("Finished Error: ", job_name, " ", round(time_diff, 2))
            message(run_env$u__run_error)
          }
        } else {
          message("Finished No Output: ", job_name, " ", round(time_diff, 2))
        }

        message("Estimated Minutes left: ", (difftime(Sys.time(), t1, units = "mins") / sum(finished_vec)) * sum(finished_vec == F))
        # break so processes can be spaced out.
        break
      } else if (time_diff > kill_after_n_secs) {
        process_list[[job_name]]$kill()
        finished_vec[next_i] <- T
        running_vec[next_i] <- F
        message("Killed: ", job_name, " ", round(time_diff, 2))
        message("Estimated Minutes left: ", (difftime(Sys.time(), t1, units = "mins") / sum(finished_vec)) * sum(finished_vec == F))
        # break so processes can be spaced out.
        break
      }
    }

    # ********************************************************************************************************
  } # while (any(finished_vec == F)) {




  return(run_output(output_paths, output_full_run_env))
}
trevorlolsen/jobsR documentation built on Dec. 23, 2021, 12:56 p.m.