#' run
#'
#' @param args_list TODO
#' @param parent_env TODO
#' @param parent_dir TODO
#' @param orig_code_path TODO
#' @param cluster_size TODO
#' @param wait_between_checks TODO
#' @param pause_to_space_io TODO
#' @param cache TODO
#' @param seed TODO
#' @param output_full_run_env TODO
#' @param kill_after_n_secs TODO
#'
#' @return run_output function output
#' @export
run <- function(args_list, parent_env, parent_dir, orig_code_path, cluster_size = 2, wait_between_checks = 10, pause_to_space_io = 5, cache = F, seed = 1, output_full_run_env = F, kill_after_n_secs = 30000) {
if (is.list(args_list) == F) {
stop("args_list is not a list.")
}
if (is.null(names(args_list))) {
names(args_list) <- paste0("job", seq_len(length(args_list)))
}
names(args_list) <- make.names( names(args_list))
parent_dir <- gsub("\\\\", "/", parent_dir)
if (file.exists(orig_code_path) == F) {
stop("orig_code_path=", orig_code_path, " Does not exist.")
}
if (dir.exists(parent_dir) == T & cache == F) {
unlink(parent_dir, recursive = T)
}
if (dir.exists(parent_dir) == F) {
dir.create(parent_dir, recursive = T)
}
code_path <- file.path(parent_dir, "code.R")
file.copy(from = orig_code_path, to = code_path, overwrite = T)
parent_env_path <- file.path(parent_dir, "run_env.rds")
saveRDS(parent_env, file = parent_env_path)
number_of_jobs <- length(args_list)
job_names <- names(args_list)
process_paths <- rep(NA, number_of_jobs)
names(process_paths) <- job_names
output_paths <- rep(NA, number_of_jobs)
names(output_paths) <- job_names
finished_vec <- rep(F, number_of_jobs)
names(finished_vec) <- job_names
running_vec <- rep(F, number_of_jobs)
names(running_vec) <- job_names
process_list <- vector("list", number_of_jobs)
names(process_list) <- job_names
for (job_name in job_names) {
temp_child <- run_create_process_i(i = job_name, args = args_list[[job_name]], parent_dir = parent_dir, seed = seed, output_full_run_env = output_full_run_env)
process_paths[job_name] <- temp_child$process_path
output_paths[job_name] <- temp_child$output_path
# don't check here is the process has been complete or not.
}
t1 <- Sys.time()
while (any(finished_vec == F)) {
# ********************************************************************************************************
if (sum(running_vec) == cluster_size) {
# All clusters are running
Sys.sleep(wait_between_checks)
} else if (all(finished_vec | running_vec)) {
# all process have finished or started
Sys.sleep(wait_between_checks)
} else { # Start of trying to start one
#-------------------------------------------------------------------
started_or_finished <- which(finished_vec | running_vec)
# determine which process to try to start
if (length(started_or_finished) == 0) {
# none have started
next_i <- 1
} else {
# next item that hasn't started or finished
next_i <- max(started_or_finished) + 1
}
if (next_i <= length(job_names)) { # 1st
# Check to see if output for that child already exists
while (next_i <= length(job_names) && file.exists(output_paths[job_names[next_i]])) {
message("Already exists: ", job_names[next_i])
finished_vec[next_i] <- T
running_vec[next_i] <- F
next_i <- next_i + 1
}
if (next_i <= length(job_names)) { # 2nd
job_name <- job_names[next_i]
output_path <- output_paths[job_name]
process_path <- process_paths[job_name]
message("Starting: ", job_name)
process_list[[job_name]] <- processx::process$new(rscript_path(), process_path)
running_vec[next_i] <- T
Sys.sleep(pause_to_space_io) # pause_to_space_io at the end
} # if (next_i <= length(job_names)) { # 2nd
} # if (next_i <= length(job_names)) { # 1st
#-------------------------------------------------------------------
} # End of trying to start one
# ********************************************************************************************************
which_running <- which(running_vec)
for (next_i in which_running) {
job_name <- job_names[next_i]
p1 <- process_list[[job_name]]
time_diff <- difftime(Sys.time(), p1$get_start_time(), units = "secs")
if (process_list[[job_name]]$is_alive() == F) {
finished_vec[next_i] <- T
running_vec[next_i] <- F
if (file.exists(output_paths[job_name])) {
run_env <- readRDS(output_paths[job_name])
if (run_env$u__run_success) {
message("Finished Success: ", job_name, " ", round(time_diff, 2))
} else {
message("Finished Error: ", job_name, " ", round(time_diff, 2))
message(run_env$u__run_error)
}
} else {
message("Finished No Output: ", job_name, " ", round(time_diff, 2))
}
message("Estimated Minutes left: ", (difftime(Sys.time(), t1, units = "mins") / sum(finished_vec)) * sum(finished_vec == F))
# break so processes can be spaced out.
break
} else if (time_diff > kill_after_n_secs) {
process_list[[job_name]]$kill()
finished_vec[next_i] <- T
running_vec[next_i] <- F
message("Killed: ", job_name, " ", round(time_diff, 2))
message("Estimated Minutes left: ", (difftime(Sys.time(), t1, units = "mins") / sum(finished_vec)) * sum(finished_vec == F))
# break so processes can be spaced out.
break
}
}
# ********************************************************************************************************
} # while (any(finished_vec == F)) {
return(run_output(output_paths, output_full_run_env))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.