
# Defines functions message_collector wait_and_check

# Waits until a logical expression is true or the time limit runs out
# Doesn't work for implicit futures (ie `resolved(futureOf(x))`)
wait_and_check <- function(logical_expr, total_sleep_time = 10, checks = 100) {
  expr <- rlang::enquo(logical_expr)
  counter <- 0
  while (!is.null(rlang::eval_tidy(expr)) && !rlang::eval_tidy(expr) == TRUE && counter < checks) {
    counter <- counter + 1

# Used for gathering up lists of errors from nodes, basically
message_collector <- function(l, print_function, title,
                              collect_all_name = NULL,
                              ...) {
  if (!is.null(collect_all_name)) {
    l <- purrr::map(l, ~.[[collect_all_name]]) %>%
      keep(~length(.) > 0)
  if (length(l) > 0) {
        title, ":",
          l, .init = "",
          ~ paste0(..1, "\n", ..2, ": ", paste(..3, collapse=" | ")))),

#' Assign futures from different levels of the topology
#' Let's say you want to run a job on a node \emph{behind} a gateway server, which
#' would require doing two nested future plans. Instead of having to
#' type out something like: \code{x \%<-\% { y \%<-\% { 1 + 1 }; y}}, you can
#' now just type \code{x \%2\% { 1 + 1 }}, which will run \code{1 + 1} on the
#' second layer of the future topology.  Likewise, \code{\%3\%} will assign
#' the expression when evaluated on the third level.
#' @rdname nested_pipe
#' @export
`%2%` <- function(x, value) {
  target <- substitute(x)
  inner_expr <- substitute(value)
  expr <- substitute({.zach %<-% inner_expr; .zach })
  envir <- parent.frame(1)
  future:::futureAssignInternal(target, expr, envir = envir, substitute = FALSE)

#' @rdname nested_pipe
#' @export
`%3%` <- function(x, value) {
  target <- substitute(x)
  inner_expr <- substitute(value)
  expr <- substitute({.zach %<-% {.is_da_best %<-% inner_expr; .is_da_best}; .zach })
  envir <- parent.frame(1)
  future:::futureAssignInternal(target, expr, envir = envir, substitute = FALSE)

#' The default way to start a cluster plan
#' Runs `future`'s plan, in a way that works well with the Rochester cluster nodes
#' @param login_node a string for the gateway login, e.g., \"zacharyburchill@gateway.rochester.edu\" (not real)
#' @param n the number of clusters to use
#' @param \dots additional bare arguments for `filter()`
#' @param use_abbreviations because the way ssh adds known hosts, if you've only sshed into a node via `ssh nodeN`, you'll want to set this to `TRUE`
#' @return By default, nothing, but possibly an expression if `goahead=FALSE`
#' @export
default_planner <- function(login_node, n,
                            ...) {
  get_n_best_nodes(login_node, n, ...) %>%
    # Turn the nodes into the plan
    nodes_to_plan(login_node, use_abbreviations = TRUE)

#' Returns the n best nodes
#' To do: add documentation
#' @export
get_n_best_nodes <- function(login_node, n,
                             use_abbreviations = TRUE,
                             timeout_sec = 10) {
  filterers <- rlang::enquos(...)

  node_df <- get_nodes_info(login_node, timeout_sec = 10)
  node_df %>%
    # Make the data frame clean
    default_cleanup() %>%
    # Filter out the ones that are down/overused
    default_filter() %>%
    # Custom filters
    filter(!!! filterers) %>%
    # Pick the n best

#' Test nodes' connectivity
#' Test and see if a node is reachable by actually trying to connect to them.
#' `test_node` is for individual nodes. `test_nodes` takes a list of node names, tests each of them, and returns a list of the ones that work.
#' @param login_node a string for the gateway login, e.g., \"zacharyburchill@gateway.rochester.edu\" (not real)
#' @param timeout_sec the number of seconds to wait after trying to connect to a node before declaring it dead
#' @param .connection_timer the number of additional seconds to wait before declaring that the \emph{attempt} to connect to the node was unsuccessful
#' @param nodename a string of the node to try to connect to
#' @param node_list a list/vector of strings of node names to try to connect to
#' @param verbose a boolean on whether to print the errors, messages, and warnings that happend while testing the nodes, if any

#' @rdname testing_nodes
#' @export
test_node <- function(nodename, login_node,
                      timeout_sec, .connection_timer = 3) {

    # if you're already on the login node, this should be NA
    if (is.na(login_node))
      da_plan <- list(tweak(remote, workers = nodename))
    # If you're not logged in already from a server:
    else {
      da_plan <- list(tweak(remote, workers = login_node), tweak(remote, workers = nodename))
      oplan <- plan("list")
      on.exit(plan(oplan), add = TRUE)

    # This makes sure that there's never a delay *trying* to connect (ie in `plan(da_plan)`)
    outer_test %<-% {
      if (is.na(login_node))
        tester %<-% { "a" }
        tester %2% { "a" }
      tester_fut <- futureOf(tester)
      # This waits until tester is resolved or total_sleep_time elapses
      its_resolved <- wait_and_check(resolved(tester_fut), total_sleep_time = timeout_sec)
      if (its_resolved) tester
      else FALSE

    fut <- futureOf(outer_test)
    its_resolved <- wait_and_check(resolved(fut), total_sleep_time = .connection_timer + timeout_sec)
    if (its_resolved && outer_test=="a") return(TRUE)
    else return(FALSE)
  error = function(err) {
    warning("Error raised in testing node '", nodename, "': ", err$message,
            immediate. = TRUE, call.=FALSE)
# # The older design
# test_node <- function(nodename, login_node, timeout_sec) {
#   stopifnot(!is.null(login_node))
#   # Sometimes you can get time-out errors
#   tryCatch(
#     {
#       plan(list(
#         tweak(remote, workers=login_node),
#         tweak(remote, workers=nodename)
#       ))
#       tester %2% { "a" }
#       Sys.sleep(timeout_sec)
#       if (resolved(futureOf(tester)) && tester=="a") return(TRUE)
#       else return(FALSE)
#     },
#     error = function(e) {
#       warning(paste0("Timeout error in node ", nodename))
#       return(FALSE)
#     }
#   )
# }

#' @rdname testing_nodes
#' @export
test_nodes <- function(node_list, login_node,
                       timeout_sec = 5,
                       verbose = FALSE,
                       .connection_timer = 3,
                       .debug = FALSE) {
  # Make sure there's a login node

  # After we're done getting the information, revert to the original plan
  oplan <- plan("list")
  on.exit(plan(oplan), add = TRUE)

  # This checks to see that we can login
  plan(remote, workers = login_node)
  login_tester <- future({ "yo" })
  isitresolved <- wait_and_check(resolved(login_tester), total_sleep_time = 5)
  if ( !(isitresolved) || value(login_tester) != "yo")
    stop("Can't even log in to login node, yo")
  message("Successful login to gateway node")

  if (.debug==T) {
    print("Debug mode on. Printing out more and going slower")
    good_nodes <- c()
    for (n in node_list) {
      print(paste0("Testing node `", n, "`"))
      test_node_value %<-% {test_node(nodename = n,
                                      login_node = NA,
                                      timeout_sec = timeout_sec,
                                      .connection_timer = .connection_timer)}
      test_node_value <- ifelse(is.null(test_node_value) | is.na(test_node_value) |  test_node_value == FALSE,
                                FALSE, TRUE)
      good_nodes <- c(good_nodes, test_node_value)
    good_nodes <- purrr::set_names(good_nodes, node_list)
    print("Node values:")
  } else {
    # Test the nodes
    good_nodes <- future_map(
      function(n) {
          test_node(nodename = n,
                    login_node = NA,
                    timeout_sec = timeout_sec,
                    .connection_timer = .connection_timer)
        }, catchErrors = TRUE)
      }) %>% purrr::set_names(node_list)

    # If verbose
    if (verbose==TRUE) {
      good_nodes %>% message_collector(message, "Messages", "messages")
      good_nodes %>% message_collector(warning, "Warnings", "warnings", call.=FALSE)
      good_nodes %>% message_collector(warning, "Errors", "errors", call.=FALSE)

    good_nodes <- map_lgl(
      function(n) {
        if (is.null(n$value) | is.na(n$value)) FALSE
        else n$value == TRUE })

  message(paste0("Bad nodes: ", paste0(node_list[!good_nodes], collapse=", ")))

#' Get the status of all the nodes
#' Finds the statuses of all the nodes in the cluster via logging in and
#' running `pbsnodes` on one of the nodes, and then parsing that information
#' into a data frame. \cr \cr **NOTE: This only works with a cluster that has `pbsnodes` set up and
#' working in a particular format.** This is completely untested with other clusters.
#' @param login_node a string for the gateway login, e.g., \"zacharyburchill@gateway.rochester.edu\" (not real)
#' @param check_node the name of any "nodeN" hostnames, to get the data from. Will iterate through until it finds one that works.
#' @return A data frame with all the node information
#' @export
get_nodes_info <- function(login_node,
                           check_node=c("node33", "node34", "node64"),
                           timeout_sec = 10) {
  # After we're done getting the information, revert to the original plan
  oplan <- plan("list")
  on.exit(plan(oplan), add = TRUE)
  # Generally not needed, but will iterate through check nodes!
  i <- 1
  while (i <= length(check_node) && ! test_node(check_node[[i]], login_node, timeout_sec)) {
    i <- i + 1

  if (i > 1) {
    if (i > length(check_node))
      stop(paste0("All 'check' nodes failed to be accessed (",
                  paste0(check_node, collapse=", "), ")"))
    message(paste0("Check nodes failed: ", paste0(check_node[1:(i-1)], collapse=", ")))
    message(paste0("Using ", check_node[[i]]))
  # Change the plan
    tweak(remote, workers = login_node),
    tweak(remote, workers = check_node[[i]])

  node_text %2% { system("pbsnodes", intern = TRUE) %>% paste0(collapse="\n") }

  if (node_text=="")
    stop("Seems like the pbsnodes system is down (i.e., node64). Try it manually!")


#' Convert the output of `pbsnodes` into a list/data frame
#' Runs `future`'s plan, in a way that works well with the Rochester cluster nodes
#' @param info_string a single string of all the output of `pbsnodes`. Each line should be separated by a newline character
#' @param as_df if `TRUE`, it returns it as a data frame, otherwise it returns it as a list
#' @return A data frame / list with all the node information
#' @export
get_node_data <- function(info_string, as_df=TRUE) {
  node_start <- "(?<=^|\\n)node[0-9]{2}\\.cs\\.rochester\\.edu"

  node_names <- stringr::str_match_all(info_string, node_start) %>%
  node_info <- stringr::str_split(info_string, node_start) %>% purrr::simplify()
  # Get rid of the first empty string
  node_info <- node_info[2:length(node_info)]
  node_list <- node_info %>%
    # give each list a
    purrr::set_names(node_names) %>%
    purrr::map(function(info) {
      l <- list(state = stringr::str_match(info, "(?<=state = ).+")[1])
      nps <- list(np = stringr::str_match(info, "(?<=np = )[0-9]+")[1])
      status <- stringr::str_match(info, "(?<=status = ).+") %>%
        stringr::str_split(",") %>% {
          val_pairs <- purrr::map(., function(x) {
            stringr::str_split(x, "=")}) %>%
          purrr::map(val_pairs, 2) %>%
            purrr::set_names(purrr::map(val_pairs, 1))

      if (!is.null(status) & length(status) > 1) { append(append(l, nps), status) }
      else {l}
  if (as_df==TRUE) {
    # Turn it into a data frame
    imap_dfr(node_list, function(x, name) {
      as.data.frame(x, stringsAsFactors=FALSE) %>%
        mutate(node = name)
    }) %>%
      mutate(number = as.numeric(stringr::str_extract(node,"[0-9]+")))
  } else {

#' Default functions to help determine which cluster node to use
#' These functions just apply certain default assumptions to the node information
#' data frames generated from `get_node_data`:
#' * `default_cleanup` turns the important numeric values in the data frame (from `get_node_data`) from
#'   character columns to actual numeric columns, removing the "kb" from the columns pertaining to memory,
#'   and adding a column called `percent_free` which is the proportion of available memory out of the total memory.
#' * `default_filter` simply removes nodes that aren't "free," have numbers greater than 64,
#'   or have a proportion of free memory (`percent_free`) less than a certain threshold.
#' * `pick_n_best_nodes` returns `n` remaining nodes that have the most available memory.
#' @param df a dataframe of node information, as produced by `get_node_data`
#' @param threshold the threshold for the proportion of memory currently available by which to exclude nodes (e.g., 0.7 excludes nodes with less than 70\% of their memory currently available)
#' @param n the number of nodes to select
#' @export
default_cleanup <- function(df) {
  df %>%
              ~sub("kb", "", .)) %>%
    mutate_at(vars(loadave:nsessions), as.numeric) %>%
    {if (length(.$np) > 0)  mutate(., np = as.numeric(np))
     else . } %>%
    mutate(percent_free = availmem/totmem)
#' @rdname default_cleanup
#' @export
default_filter <- function(df,
                           threshold = 0.7) {
  df %>%
           percent_free >= threshold)
#' @rdname default_cleanup
#' @export
pick_n_best_nodes <- function(df, n) {
  nodes <- df %>%
    arrange(-availmem) %>%
  if (nrow(nodes) < n) rlang::warn(paste0("Less than ", n, " nodes available (", nrow(nodes),")"))

name_if_possible <- function(x, nm = x, ...) {
  len_diff <- length(x) - length(nm)
  if (len_diff < 0) stop("`nm` cannot be LONGER than `x`")
  nm <- append(nm, rep("", len_diff))
  purrr::set_names(x, nm, ...)

.name_expander <- function(named_cores, ntabs=2) {
  if (rlang::is_callable(named_cores)) return ("`function()`")
  checker <- function(x) {
    ifelse(rlang::is_character(x), paste0("\"", x, "\""),
           ifelse(rlang::is_callable(x), "`function()`",
  tabs <- function(n) paste0(rep("  ", n), collapse="")

  head <- paste0("function() {\n", tabs(ntabs), "switch(Sys.info()[[\"nodename\"]],\n")
  body <- map(named_cores, checker) %>%
    imap(function(x, i) {
      if (i == "") return(paste0(tabs(ntabs+1), x))
      else paste0(tabs(ntabs+1), "\"", i, "\" = ", x, ",")
    }) %>%
    reduce(~paste0(.x, "\n", .y))
  tail <- paste0("\n", tabs(ntabs), ")\n", tabs(ntabs-1), "}")

#' Make a plan, given a set of nodes
#' This is relatively complicated, because I made it needlessly flexible.
#' Basically, it will either execute `future::plan` in a topology that
#' logs into the login node, has a cluster set up with the nodes specified
#' where on each of these nodes, it uses a `multiprocess`. The needlessly flexible
#' bit is where I let the user specify the number of cores used for each cluster
#' node either by a custom function or a list corresponding to the list of cluster
#' hostnames (which can be numerics or functions).
#' The bloat in this code comes from my desire to show the user what
#' command might be run. Unfortunately, it's too much of a pain in the
#'  butt to give users the proper amount of information about the
#' functions they're calling while at the same time letting them
#' customize those functions. Therefore, I represent all the arbitrary
#' custom core-setting functions with \`function()\`.
#' @param df a vector/list of node hostnames or a data frame with a column called "nodes" that has the same information
#' @param login_node a string for the gateway login, e.g., \"zacharyburchill@gateway.rochester.edu\" (not real)
#' @param core_mapper if `NULL`, the `multiprocess` plan uses its default arguments. If a function, `core_mapper` determines the number of cores used for each machine. If a list/vector, it will be assumed to be the number of cores for each particular node, in the same order as the nodes were given. `nodes_to_plan` will then convert this into a function automatically.
#' @param goahead indicates whether this function should execute the plan or just return an expression representing the plan
#' @param use_abbreviations because the way ssh adds known hosts, if you've only sshed into a node via `ssh nodeN`, you'll want to set this to `TRUE`
#' @return Either nothing or an expression representing the structure of the plan.
#' @export
nodes_to_plan <- function(nodes, # the node df
                          login_node, # the string for the remote worker
                          core_mapper = NULL, # Not currently functional
                          goahead = TRUE, # If true, goes ahead and runs the command.
                          # If false, returns the expression
                          use_abbreviations = TRUE # will attempt to ssh into "node33" rather than "node33.cs.rochester.edu"
) {
  if (is.data.frame(nodes)) nodes <- nodes$node

  if (!is.null(core_mapper)) {
    # If core_mapper is a function
    if (rlang::is_callable(core_mapper)) {
      customWorkers <- core_mapper
      named_cores <- core_mapper
      # If core_mapper is a (named) list of core numbers/core functions
    } else if (rlang::is_list(core_mapper)) {
      named_cores <- purrr::set_names(core_mapper, nodes)
      customWorkers <- function() {
        numcores <- do.call(switch, append(Sys.info()[["nodename"]], named_cores))
        if (rlang::is_callable(numcores)) numcores()
    } else {
      stop(paste0("`core_mapper` must be a function or list, not ", typeof(core_mapper)))

  if (use_abbreviations) nodes <- stringr::str_extract(nodes, "node[0-9]+")

  head_s <- paste0("plan(list(
                   tweak(remote, workers = \"", login_node, "\"),
                   tweak(cluster, workers = c(\"", paste0(nodes, collapse="\", \""), "\")),
  if (is.null(core_mapper)) {
    full_s <- paste0(head_s, "multiprocess\n))")
  } else {
    full_s <-paste0(head_s, "tweak(multiprocess, workers = ", .name_expander(named_cores), ")\n))")

  message(paste0("Plan being executed (minus user-defined functions):\n", full_s))

  if (goahead==TRUE) {
      tweak(remote, workers = login_node),
      tweak(cluster, workers = nodes),
    return_null <- NULL
  } else {
