R/pipeline-tools.R

Defines functions stages_run stage_args_check blast_setup parameters_setup

Documented in blast_setup parameters_setup stage_args_check stages_run

#' @name parameters_setup
#' @title Set Up Parameters
#' @description Initiates cache of parameters.
#' @param wd Working directory
#' @param ncbi_execs File directories for NCBI tools, see \code{blast_setup()}
#' @param overwrite Overwrite existing cache?
#' @param ... Set parameters, see parameters()
#' @return NULL
#' @family run-private
parameters_setup <- function(wd, ncbi_execs, overwrite = FALSE, ...) {
  if (!file.exists(wd)) {
    stop(paste0("Invalid `wd`. [", wd, "] does not exist."))
  }
  # get parameters
  ps <- parameters(...)
  ps[["wd"]] <- wd
  # calc wait times
  dff <- ps[["mxrtry"]] - length(ps[["wt_tms"]])
  if (dff > 0) {
    ps[["wt_tms"]] <- c(
      ps[["wt_tms"]],
      rep(ps[["wt_tms"]][length(ps[["wt_tms"]])], dff)
    )
  } else {
    ps[["wt_tms"]] <- ps[["wt_tms"]][1:ps[["mxrtry"]]]
  }
  if (sum(names(ncbi_execs) %in%
    c("mkblstdb", "blstn")) == 2) {
    ps[["mkblstdb"]] <- ncbi_execs[["mkblstdb"]]
    ps[["blstn"]] <- ncbi_execs[["blstn"]]
  } else {
    error(ps = ps, "Invalid NCBI execs provided.
          Must be named list as generated by `blast_setup`")
  }
  # check BLAST and get outfmt
  ps[["outfmt"]] <- outfmt_get(ps = ps)
  # log parameters
  msg <- paste0("Setting up pipeline with the following parameters:")
  info(lvl = 1, ps = ps, msg)
  mxnchrs <- max(vapply(names(ps), nchar, integer(1))) + 3
  # pnms <- names(ps)[names(ps) != 'wt_tms'] # too big to print
  pnms <- sort(names(ps))
  for (pnm in pnms) {
    spcr <- paste0(rep(" ", mxnchrs - nchar(pnm)), collapse = "")
    val <- ps[[pnm]]
    if (length(val) > 1) {
      val <- paste0(paste0(val[1:3], collapse = ", "), " ...")
    }
    prmtr_msg <- paste0(pnm, spcr, "[", val, "]")
    info(lvl = 2, ps = ps, prmtr_msg)
  }
  cache_setup(ps = ps, ovrwrt = overwrite)
  progress_init(wd = wd)
}

#' @name blast_setup
#' @title Ensures NCBI BLAST tools are installed
#' @description Ensures NCBI BLAST executables are installed on the system. Tests
#' version number of BLAST tools.
#' @param d Directory to NCBI BLAST tools
#' @param wd Working directory
#' @param v v, T/F
#' @param otsdr Run through \code{outsider}?
#' @details BLAST tools must be version >= 2.0
#' @family run-private
#' @return list
blast_setup <- function(d, v, wd, otsdr) {
  .log(v = v, wd = wd, "Checking for valid NCBI BLAST+ Tools ...\n")
  sccdd <- TRUE
  if (otsdr) {
    mkblstdb <- "makeblastdb"
    blstn <- "blastn"
  } else {
    mkblstdb <- file.path(d, "makeblastdb")
    blstn <- file.path(d, "blastn")
  }
  for (ech in c(mkblstdb, blstn)) {
    args <- "-version"
    res <- cmdln(
      cmd = ech, args = args,
      ps = list("outsider" = otsdr)
    )
    if (res[["status"]] != 0) {
      tst <- FALSE
      stderr <- rawToChar(res[["stderr"]])
      .log(v = v, wd = wd, paste0(
        "Failed to run: [", ech, "]. Reason:\n",
        "[", stderr, "]"
      ))
      sccdd <- FALSE
    } else {
      if (otsdr) {
        .log(v = v, wd = wd, paste0("Using otsdr BLAST.\n"))
      } else {
        # test version
        stdout <- rawToChar(res[["stdout"]])
        stdout <- strsplit(x = stdout, split = "\n")[[1]]
        # remove contraction symbol create on windows(?), issue 39
        stdout <- sub("~1:", "", stdout[[1]])
        vrsn <- gsub("[a-zA-Z:+]", "", stdout)
        vrsn <- gsub("\\s", "", vrsn)
        write(
          x = paste0("Filepath: ", ech, ", version: ", vrsn, "\n"),
          append = TRUE, file = file.path(wd, "blast_versions.txt")
        )
        vrsn <- as.numeric(strsplit(vrsn, "\\.")[[1]])
        tst <- vrsn[1] >= 2 & vrsn[2] >= 0
        if (tst) {
          .log(v = v, wd = wd, paste0("Found: [", ech, "]\n"))
        } else {
          .log(v = v, wd = wd, paste0("Incorrect version: [", ech, "]\n"))
          sccdd <- FALSE
        }
      }
    }
  }
  if (!sccdd) {
    msg <- "Unable to find correct versions of NCBI BLAST+ tools\n"
    .log(v = v, wd = wd, paste0("Error:", msg))
    stop(msg)
  }
  ncbi_execs <- list("mkblstdb" = mkblstdb, "blstn" = blstn)
  ncbi_execs
}

#' @name stage_args_check
#' @title Check stage arguments
#' @description Ensures stage arguments are valid, raises an error if not.
#' @param to ending stage
#' @param frm starting stage
#' @family run-private
#' @return character, stage message
stage_args_check <- function(to, frm) {
  if (to < 1 | frm < 1) {
    stop("Total stages to run cannot be less than 1.")
  }
  if (to > 4 | frm > 4) {
    stop("Total stages to run cannot be more than 4.")
  }
  if (frm > to) {
    stop("Starting stage must always come before ending stage.")
  }
  stgs <- c("taxise", "download", "cluster", "cluster2")
  stgs_msg <- paste0(stgs[frm:to], collapse = ", ")
  paste0("Running stages: ", stgs_msg)
}

#' @name stages_run
#' @title Sequentially run each stage
#' @description Runs stages from \code{frm} to \code{to}. Records stage progress
#' in cache.
#' @param wd Working directory
#' @param to Total number of stages to run
#' @param frm Starting stage to run from
#' @param stgs_msg Printout stage message for log
#' @param rstrt Restarting, T/F
#' @return NULL
#' @family run-private
stages_run <- function(wd, to, frm, stgs_msg, rstrt = FALSE) {
  .run <- function() {
    if (frm <= 1 & to >= 1) {
      if (!ps[["v"]]) {
        cat("... Taxise\n")
      }
      taxise_run(wd)
      progress_save(wd, "taxise")
    }
    if (frm <= 2 & to >= 2) {
      if (!ps[["v"]]) {
        cat("... Download\n")
      }
      download_run(wd)
      progress_save(wd, "download")
    }
    if (frm <= 3 & to >= 3) {
      if (!ps[["v"]]) {
        cat("... Cluster\n")
      }
      clusters_run(wd)
      progress_save(wd, "cluster")
    }
    if (frm <= 4 & to >= 4) {
      if (!ps[["v"]]) {
        cat("... Cluster2\n")
      }
      clusters2_run(wd)
      progress_save(wd, "cluster2")
    }
  }
  ps <- parameters_load(wd)
  # header log
  if (rstrt) {
    msg <- paste0(
      "Restarting pipeline on [", .Platform$OS.type,
      "] at [", Sys.time(), "]"
    )
  } else {
    msg <- paste0(
      "Running pipeline on [", .Platform$OS.type,
      "] at [", Sys.time(), "]"
    )
  }
  brdr <- paste0(rep("-", nchar(msg)), collapse = "")
  msg <- paste0(brdr, "\n", msg, "\n", brdr)
  info(ps = ps, lvl = 1, msg)
  info(ps = ps, lvl = 1, stgs_msg)
  errmsg <- try(.run(), silent = TRUE)
  if ("try-error" %in% is(errmsg)) {
    # ctrl+c
    if (grepl(
      "Operation was aborted by an application callback",
      errmsg[[1]]
    )) {
      msg <- paste0("---- Halted by user [", Sys.time(), "] ----")
      .log(v = ps[["v"]], wd = ps[["wd"]], msg)
      stop(msg)
    }
    # unexpected pipeline error
    msg <- paste0(
      "Unexpected ", errmsg[[1]], "\n", "Occurred [", Sys.time(),
      "]\n", "Contact package maintainer for help.\n"
    )
    .log(v = ps[["v"]], wd = ps[["wd"]], msg)
    stop(msg)
  }
  # footer log
  msg <- paste0("Completed pipeline at [", Sys.time(), "]")
  brdr <- paste0(rep("-", nchar(msg)), collapse = "")
  msg <- paste0(brdr, "\n", msg, "\n", brdr)
  info(ps = ps, lvl = 1, msg)
}
ropensci/phylotaR documentation built on July 9, 2023, 3:17 p.m.