R/cluster_ssh_functions.R

Defines functions upload_cluster_scripts experiment_setup sbatch_experiments execute_next_setup check_jobs download_data check_jobs download_results

Documented in check_jobs download_data download_results execute_next_setup experiment_setup sbatch_experiments upload_cluster_scripts

#' @title Export cluster scripts
#'
#' @param project_name String containing project name as returned by
#'  \code{\link{get_project_name}}.
#' @param p_numbers a string with the user's p-number as returned by
#' \code{\link{get_available_accounts}}.
#' @export
#'
#' @author Giovanni Laudanno
#' @description Export cluster scripts to Peregrine HPC in folder
#' with \code{project_name}, as returned by \code{\link{get_project_name}}.
#' @return \code{NULL}
upload_cluster_scripts <- function(
  project_name = utilSIE::get_project_name(),
  p_numbers = utilSIE::get_available_accounts()
) {

  accounts <- p_numbers

  for (account in accounts) {
    cluster_address <- paste0(account, "@peregrine.hpc.rug.nl")
    connection <- ssh::ssh_connect(cluster_address)

    # folder structure
    remote_project_folder <- file.path(project_name)
    local_cluster_folder <- "bash_scripts/"
    testit::assert(dir.exists(local_cluster_folder))

    ssh::ssh_exec_wait(connection, command = paste0("mkdir -p ", project_name))

    ssh::scp_upload(
      session = connection,
      files = paste0(
        local_cluster_folder,
        "/",
        list.files(local_cluster_folder, pattern = ".bash")
      ),
      to = remote_project_folder
    )
    ssh::scp_upload(
      session = connection,
      files = paste0(
        local_cluster_folder,
        "/",
        list.files(local_cluster_folder, pattern = ".R")
      ),
      to = remote_project_folder
    )
    ssh::ssh_disconnect(connection); gc()
  }
  return()
}


#' @title Setup DAISIE experiment parameter combinations
#'
#' @description Returns a matrix containing all possible permutations of
#' parameters input by the user. Each line of the matrix can then be used
#' to initialize an experiment.
#' @param time Numeric vector containing length of the simulation in time units.
#'  For example, if an island is know to be 4 million years old,
#'  setting time = 4 will simulate entire life span of the island;
#'  setting time = 2 will stop the simulation at the mid-life of the island.
#' @param M Numeric vector containing the size of the mainland pool, i.e
#'  the number of species that can potentially colonize the island
#' @param lac Numeric vector containing the cladogenesis rate (lambda^c)
#' @param mu Numeric vector containing the extinction rate. Used only on
#' non ontogeny scenarios.
#' @param K Numeric vector containing the carrying capacity (clade level).
#' Set \code{K = Inf} for no diversity dependence.
#' @param gam Numeric vector containing the immigration rate (gamma)
#' @param laa Numeric vector containing the anagenesis rate (lambda^a)
#' @param island_ontogeny Vector of strings or \code{NULL} indicating type of
#'  island ontogeny process. Can be \code{"beta"}. For no ontogeny
#'  \code{NULL} is used.
#' @param replicates Numeric vector specifying how many replicates should
#'  be run for each experiment
#' @param mu_min Numeric vector containing the minimum extinction when area is
#'  at peak, used only on ontogeny scenarios
#' @param mu_max Numeric vector containing the extinction rate when current
#' area is 0.10 of maximum area, used only on ontogeny scenarios
#' @param Amax Numeric vector containing the maximum area, used only on ontogeny
#' scenarios
#' @param Apeak Numeric vector containing the value from 0 to 1 indicating where
#'  in the island's history the peak area is achieved,
#'  used only on ontogeny scenarios
#' @param Asharpness Numeric vector containing the sharpness of peak,
#'  used only on ontogeny scenarios
#' @param Atotalage Numeric vector containing the total island age, used
#'  only on ontogeny scenarios
#' @param divdepmodel Numeric vector specifying the type of diversity
#' dependence. \code{divdepmodel = "CS"} runs a model with clade-specific
#' carrying capacity, where diversity-dependence operates only within single
#'  clades, i.e. only among species originating from the same mainland colonist.
#'  Option divdepmodel = 'IW' runs a model with island-wide carrying capacity,
#'  where diversity-dependence operates within and among clades.
#' @export
#' @author Giovanni Laudanno, Pedro Neves
#' @return Matrix with all possible combinations of experiment setups
experiment_setup <- function(
  time = c(2, 3, 6),
  M = 1000, # nolint
  lac = c(7.48223 * 10 ^ - 6, 0.0000224467, 0.0000748223),
  mu = c(1),
  K = c(100 / 13500, 30 / 13500, 10 / 13500), # nolint
  gam = c(0.001),
  laa = c(1),
  island_ontogeny = "beta",
  replicates = 1000,
  mu_min = 0.1,
  mu_max = mu_min + mu_min * .1,
  Amax = c(13500), # nolint
  Apeak = c(0.1), # nolint
  Asharpness = c(1), # nolint
  Atotalage = c(9), # nolint
  divdepmodel = 1
) {
  if (island_ontogeny == "beta" || island_ontogeny == 2) {
    setups <- expand.grid(
      time,
      M,
      lac,
      mu,
      K,
      gam,
      laa,
      replicates,
      mu_min,
      mu_max,
      Amax,
      Apeak,
      Asharpness,
      Atotalage,
      divdepmodel
    )
    colnames(setups) <- c(
      "time",
      "M",
      "lac",
      "mu",
      "K",
      "gam",
      "laa",
      "replicates",
      "mu_min",
      "mu_max",
      "Amax",
      "Apeak",
      "Asharpness",
      "Atotalage",
      "divdepmodel"
    )
  } else {
    setups <- expand.grid(
      time,
      M,
      lac,
      mu,
      K,
      gam,
      laa,
      replicates,
      divdepmodel
    )
    colnames(setups) <- c(
      "time",
      "M",
      "lac",
      "mu",
      "K",
      "gam",
      "laa",
      "replicates",
      "divdepmodel"
    )
  }
  setups
}


#' Starts peregrine cluster runs
#'
#' @param right_setup Matrix with experiment setup as returned by
#' \code{\link{experiment_setup}}.
#' @param bash_file_sims String with path for bash file that starts simulation
#' in the cluster.
#' @param connection ssh connection object created within the main function.
#' @inheritParams execute_next_setup
#'
#' @return nothing.
#'
#' @export
#' @author Pedro Neves
sbatch_experiments <- function(project_name,
                               island_ontogeny,
                               right_setup,
                               bash_file_sims,
                               connection,
                               partition,
                               complete_analysis) {

  if (island_ontogeny == "beta") {
    for (setup_number in seq_along(right_setup[, 1])) {
      seed <- 1
      ssh::ssh_exec_wait(session = connection, command = "sleep 2")
      ssh::ssh_exec_wait(session = connection, command = paste(
        "sbatch",
        bash_file_sims,
        seed,
        right_setup[setup_number, 1],
        right_setup[setup_number, 2],
        right_setup[setup_number, 3],
        right_setup[setup_number, 4],
        right_setup[setup_number, 5],
        right_setup[setup_number, 6],
        right_setup[setup_number, 7],
        right_setup[setup_number, 8],
        right_setup[setup_number, 9],
        right_setup[setup_number, 10],
        right_setup[setup_number, 11],
        right_setup[setup_number, 12],
        right_setup[setup_number, 13],
        right_setup[setup_number, 14],
        right_setup[setup_number, 15],
        partition,
        sep = " "
      ))
    }
  } else {
    for (setup_number in seq_along(right_setup[, 1])) {
      seed <- 1
      ssh::ssh_exec_wait(session = connection, command = "sleep 5")
      ssh::ssh_exec_wait(session = connection, command = paste(
        "sbatch ",
        bash_file_sims,
        seed,
        right_setup[setup_number, 1],
        right_setup[setup_number, 2],
        right_setup[setup_number, 3],
        right_setup[setup_number, 4],
        right_setup[setup_number, 5],
        right_setup[setup_number, 6],
        right_setup[setup_number, 7],
        right_setup[setup_number, 8],
        right_setup[setup_number, 9],
        partition,
        sep = " "
      ))
    }
  }
  # Submit corresponding ML jobs
  # NO OTHER JOBS MUST RUN; TO FIX, add check by jobname
  if (complete_analysis) {
    if (right_setup[setup_number, 15] == "IW") {
      stop("Chained jobs are not available for the IW model")
    }
    job_ids <- sort(check_jobs()$job_ids) # nolint
    ssh::ssh_exec_wait(session = connection, command = "sleep 5")
    print(job_ids)
    if (island_ontogeny == "beta") {
      ssh::ssh_exec_wait(session = connection, command = "sleep 5")
      bash_file_ML <- file.path( # nolint
        project_name,
        paste0(project_name, "_ML.bash")
      )

      for (setup_number in seq_along(right_setup[, 1])) {
        seed <- 1
        ssh::ssh_exec_wait(session = connection, command = "sleep 5")
        print(paste(
          "sbatch",
          bash_file_ML,
          seed,
          right_setup[setup_number, 1],
          right_setup[setup_number, 2],
          right_setup[setup_number, 3],
          right_setup[setup_number, 4],
          right_setup[setup_number, 5],
          right_setup[setup_number, 6],
          right_setup[setup_number, 7],
          right_setup[setup_number, 8],
          right_setup[setup_number, 9],
          right_setup[setup_number, 10],
          right_setup[setup_number, 11],
          right_setup[setup_number, 12],
          right_setup[setup_number, 13],
          right_setup[setup_number, 14],
          right_setup[setup_number, 15],
          partition,
          job_ids[setup_number], # must match correct sim file
          sep = " "
        ))
        ssh::ssh_exec_wait(session = connection, command = paste(
          "sbatch",
          bash_file_ML,
          seed,
          right_setup[setup_number, 1],
          right_setup[setup_number, 2],
          right_setup[setup_number, 3],
          right_setup[setup_number, 4],
          right_setup[setup_number, 5],
          right_setup[setup_number, 6],
          right_setup[setup_number, 7],
          right_setup[setup_number, 8],
          right_setup[setup_number, 9],
          right_setup[setup_number, 10],
          right_setup[setup_number, 11],
          right_setup[setup_number, 12],
          right_setup[setup_number, 13],
          right_setup[setup_number, 14],
          right_setup[setup_number, 15],
          partition,
          job_ids[setup_number], # must match correct sim file
          sep = " "
        ))
      }
    } else if (island_ontogeny == "const") {

      ssh::ssh_exec_wait(session = connection, command = "sleep 5")
      bash_file_ML <- file.path( # nolint
        project_name,
        paste0(project_name, "_0_ML.bash"))

      for (setup_number in seq_along(right_setup[, 1])) {
        seed <- 1
        ssh::ssh_exec_wait(session = connection, command = "sleep 5")
        ssh::ssh_exec_wait(session = connection, command = paste(
          "sbatch",
          bash_file_ML,
          seed,
          right_setup[setup_number, 1],
          right_setup[setup_number, 2],
          right_setup[setup_number, 3],
          right_setup[setup_number, 4],
          right_setup[setup_number, 5],
          right_setup[setup_number, 6],
          right_setup[setup_number, 7],
          right_setup[setup_number, 8],
          right_setup[setup_number, 9],
          partition,
          job_ids[setup_number], # must match correct sim file
          sep = " "
        ))
      }
    }
  }
}


#' @title Execute experimental setup
#'
#' @description Prepares and executes the one or a set of experiments to be run
#' on the
#' \href{https://www.rug.nl/society-business/centre-for-information-technology/research/services/hpc/facilities/peregrine-hpc-cluster?lang=en}{Peregrine HPC} at the University of Groningen. # nolint
#'
#' @param project_name String containing project name as returned by
#'  \code{\link{get_project_name}}.
#' @param account String with p-number to be used for connecting with peregrine.
#' Use as returned by \code{\link{get_available_accounts}}.
#' @param download_files Boolean stating if files in cluster should be
#' downloaded locally before new analysis is run.
#' Default is \code{TRUE}. If downloaded, files are removed from peregrine.
#' @param partition String with peregrine partition name that shuold be used.
#' Defaults to \code{"gelifes"}. Can be \code{"regular"} or \code{"short"}.
#' @param complete_analysis Boolean stating if a ML run from simulation output
#' generated with current parameters should be started as soon
#' as simulations are over.
#' @param branch String with branch of github repository to be used for
#' installation. Default is \code{"@develop"}.
#' @param force Boolean stating if package should be force installed from
#' github. See \code{\link[devtools]{install_github}} for details.
#' @inheritParams experiment_setup
#'
#' @export
#' @author Giovanni Laudanno, Pedro Neves
#'
#' @return nothing
execute_next_setup <- function(
  project_name = utilSIE::get_project_name(),
  account = utilSIE::get_available_accounts(),
  download_files = TRUE,
  partition = "gelifes",
  time = c(2, 3, 6),
  M = 1000, # nolint
  lac = c(7.48223 * 10 ^ - 6, 0.0000224467, 0.0000748223),
  mu = c(1),
  K = c(100 / 13500, 30 / 13500, 10 / 13500), # nolint
  gam = c(0.001),
  laa = c(1),
  divdepmodel = 1,
  island_ontogeny = "beta",
  replicates = 1000,
  mu_min = 0.1,
  mu_max = mu_min + mu_min * .1,
  Amax = c(13500), # nolint
  Apeak = c(0.1), # nolint
  Asharpness = c(1), # nolint
  Atotalage = c(9), # nolint
  complete_analysis = FALSE,
  branch = "@develop",
  force = FALSE
) {

  assertthat::assert_that((partition == "gelifes" ||
                              partition == "regular" ||
                              partition == "short"))


  project_folder <- utilSIE::get_project_folder(project_name)

  # download files
  if (download_files == TRUE) {
    utilSIE::download_results(project_name = project_name)
  }

  # upload scripts
  upload_cluster_scripts(project_name = project_name)

  right_setup <- experiment_setup(
    time = time,
    M = M,
    lac = lac,
    mu = mu,
    K = K,
    gam = gam,
    laa = laa,
    island_ontogeny = island_ontogeny,
    replicates = replicates,
    mu_min = mu_min,
    mu_max = mu_max,
    Amax = Amax,
    Apeak = Apeak,
    Asharpness = Asharpness,
    Atotalage = Atotalage,
    divdepmodel = divdepmodel
  )

  if (is.null(project_folder)) {
    if (.Platform$OS.type == "windows") {
      project_folder <- system.file("extdata", package = project_name)
    }
  }
  assertthat::is.dir(project_folder)
  if (length(list.files(project_folder)) == 0) {
    stop(paste0(project_folder, " is empty."))
  }

  # Install packages
  cluster_address <- paste0(account, "@peregrine.hpc.rug.nl")

  connection <- ssh::ssh_connect(cluster_address)

  ssh::ssh_exec_wait(session = connection, command = paste0(
    "chmod +x ", file.path(project_name, "install_packages.bash")
  ))
  ssh::ssh_exec_wait(session = connection, command = paste(paste0(
    "./",
    file.path(project_name, "install_packages.bash"),
    " 'rsetienne/",
    project_name,
    branch,
    "'"
  ),
  force
  ))
  # Submit simulation jobs
  if (island_ontogeny == "beta" || island_ontogeny == 2) {
    bash_file_sims <- file.path(
      project_name,
      paste0(project_name, "_single_seed.bash")
    )
  } else if (island_ontogeny == "const" || island_ontogeny == 0) {
    bash_file_sims <- file.path(
      project_name,
      paste0(project_name, "_0_single_seed.bash")
    )
  }
  ssh::ssh_exec_wait(
    session = connection,
    command = paste0("cat ", bash_file_sims)
  )
  cat("\n")
  sbatch_experiments(
    project_name = project_name,
    island_ontogeny = island_ontogeny,
    right_setup = right_setup,
    bash_file_sims = bash_file_sims,
    connection = connection,
    partition = partition,
    complete_analysis = complete_analysis
  )
  # Close ssh connection
  ssh::ssh_disconnect(connection); gc()
}

#' @title Check jobs on cluster
#' @author Giovanni Laudanno, Pedro Neves
#' @description Check jobs on cluster
#' @export
#' @return list with job ids, job info and sshare
check_jobs <- function(account = utilSIE::get_available_accounts()) {

  # connection
  cluster_address <- paste0(account, "@peregrine.hpc.rug.nl")

  connection <- ssh::ssh_connect(cluster_address)

  jobs <- utils::capture.output(ssh::ssh_exec_wait(
    session = connection,
    command = "squeue -u $USER --long"
  ))

  job_ids <- job_names <- c()
  for (i in 3:(length(jobs) - 1)) {
    job_id_i <- substr(jobs[i], start = 12, stop = 18)
    job_ids <- c(job_ids, job_id_i)
    job_info <- utils::capture.output(ssh::ssh_exec_wait(
      session = connection,
      command = paste("jobinfo", job_id_i)
    ))
    job_name_i <- substr(job_info[1], start = 23, stop = nchar(job_info[1]))
    job_names <- c(job_names, job_name_i)
  }
  sshare_output <- utils::capture.output(ssh::ssh_exec_wait(
    session = connection,
    command = "sshare -u $USER"
  ))

  ssh::ssh_disconnect(connection); gc()
  out <- list(
    jobs = jobs,
    job_ids = as.numeric(job_ids),
    sshare_output = sshare_output
  )
  return(out)
}



#' @title Download the data to the data folder of the project
#'
#' @param project_name String containing project name as returned by
#'  \code{\link{get_project_name}}
#'
#' @author Giovanni Laudanno
#' @description Download the data to the results folder of the project
#' @export
#' @return nothing
download_data <- function(
  project_name = utilSIE::get_project_name()
) {

  project_folder <- utilSIE::get_project_folder(project_name)
  remote_data_folder <- file.path(utilSIE::get_project_name(), "data")
  local_data_folder <- file.path(project_folder, "data")
  testit::assert(dir.exists(local_data_folder))

  # download files

  accounts <- utilSIE::get_available_accounts()
  for (account in accounts) {
    cluster_address <- paste0(account, "@peregrine.hpc.rug.nl")
    connection <- ssh::ssh_connect(cluster_address)

    system.time(
      ssh::scp_download(
        session = connection,
        files = paste0(remote_data_folder, "/*"),
        to = local_data_folder
      )
    )
    ssh::ssh_disconnect(connection); gc()
  }
  return()
}

#' @param account String with p-number of user.
#' Defaults to function that returns it.
#'
#' @title Check jobs on cluster
#' @author Giovanni Laudanno, Pedro Neves
#' @description Check jobs on cluster
#' @return list with job ids, job info and sshare
check_jobs <- function(account = utilSIE::get_available_accounts()) {

  cluster_address <- paste0(account, "@peregrine.hpc.rug.nl")

  connection <- ssh::ssh_connect(cluster_address)

  jobs <- utils::capture.output(
    ssh::ssh_exec_wait(session = connection, command = "squeue -u $USER --long")
  )
  if ((length(jobs) - 1) >= 3) {
    job_ids <- job_names <- c()
    for (i in 3:(length(jobs) - 1)) {
      job_id_i <- substr(jobs[i], start = 12, stop = 18)
      job_ids <- c(job_ids, job_id_i)
      job_info <- utils::capture.output(ssh::ssh_exec_wait(
        session = connection,
        command = paste("jobinfo", job_id_i)
      ))
      job_name_i <- substr(job_info[1], start = 23, stop = nchar(job_info[1]))
      job_names <- c(job_names, job_name_i)
    }
    job_ids <- as.numeric(job_ids)
  } else {
    job_ids <- job_names <- NULL
  }
  sshare_output <- utils::capture.output(ssh::ssh_exec_wait(
    session = connection,
    command = "sshare -u $USER"
  ))
  ssh::ssh_disconnect(connection); gc()
  list(
    jobs = jobs,
    job_ids = job_ids,
    job_names = job_names,
    sshare_output = sshare_output
  )
}



#' @title Download the results to the results folder of the project
#'
#' @param project_name String containing project name as returned by
#'  \code{\link{get_project_name}}
#' @param remove_remote Boolean specifying if files on the server should be
#' deleted. Default is \code{TRUE}
#'
#' @export
#' @author Giovanni Laudanno
#' @description Download the results to the results folder of the project
#' @return nothing
download_results <- function(project_name = utilSIE::get_project_name(),
                             remove_remote = TRUE) {

  project_folder <- utilSIE::get_project_folder(project_name)
  remote_results_folder <- file.path(utilSIE::get_project_name(), "results")
  local_results_folder <- file.path(project_folder, "results")
  if (!(dir.exists(local_results_folder))) {
    dir.create(local_results_folder)
  }

  # download files

  accounts <- utilSIE::get_available_accounts()
  for (account in accounts) {

    n_running_jobs <- length(check_jobs(account = account)$job_ids)
    cluster_address <- paste0(account, "@peregrine.hpc.rug.nl")
    connection <- ssh::ssh_connect(cluster_address)

    system.time(
      ssh::scp_download(
        session = connection,
        files = file.path(remote_results_folder, "*"),
        to = local_results_folder
      )
    )

    if (remove_remote) {
      if (n_running_jobs == 0) {
        ssh::ssh_exec_wait(
          session = connection,
          command = paste0("rm -rf ", remote_results_folder)
        )
        ssh::ssh_exec_wait(
          session = connection,
          command = 'ls | find . -name "slurm*" | xargs rm'
        )
      }
    }
    ssh::ssh_disconnect(connection); gc()
  }
  return()
}
Neves-P/utilSIE documentation built on Nov. 20, 2019, 7 a.m.