R/pipeline-tools.R

#' @name parameters_setup
#' @title Set Up Parameters
#' @description Initiates cache of parameters.
#' @param wd Working directory
#' @param ncbi_execs File directories for NCBI tools, see \code{blast_setup()}
#' @param ... Set parameters, see parameters()
#' @return NULL
#' @family run-private
parameters_setup <- function(wd, ncbi_execs, ...) {
  if (!file.exists(wd)) {
    stop(paste0('Invalid `wd`. [', wd, '] does not exist.'))
  }
  # get parameters
  ps <- parameters(...)
  ps[['wd']] <- wd
  # calc wait times
  dff <- ps[['mxrtry']] - length(ps[['wt_tms']])
  if (dff > 0) {
    ps[['wt_tms']] <- c(ps[['wt_tms']],
                        rep(ps[['wt_tms']][length(ps[['wt_tms']])], dff))
  } else {
    ps[['wt_tms']] <- ps[['wt_tms']][1:ps[['mxrtry']]]
  }
  if (sum(names(ncbi_execs) %in%
         c('mkblstdb', 'blstn')) == 2) {
    ps[['mkblstdb']] <- ncbi_execs[['mkblstdb']]
    ps[['blstn']] <- ncbi_execs[['blstn']]
  } else {
    error(ps = ps, 'Invalid NCBI execs provided.
          Must be named list as generated by `blast_setup`')
  }
  # log parameters
  msg <- paste0('Setting up pipeline with the following parameters:')
  info(lvl = 1, ps = ps, msg)
  mxnchrs <- max(vapply(names(ps), nchar, integer(1))) + 3
  pnms <- names(ps)[names(ps) != 'wt_tms'] # too big to print
  pnms <- sort(pnms)
  for (pnm in pnms) {
    spcr <- paste0(rep(' ', mxnchrs - nchar(pnm)), collapse = '')
    prmtr_msg <- paste0(pnm, spcr, '[', ps[[pnm]], ']')
    info(lvl = 2, ps = ps, prmtr_msg)
  }
  cache_setup(ps = ps)
  progress_init(wd = wd)
}

#' @name blast_setup
#' @title Ensures NCBI BLAST tools are installed
#' @description Ensures NCBI BLAST executables are installed on the system. Tests
#' version number of BLAST tools.
#' @param d Directory to NCBI BLAST tools
#' @param wd Working directory
#' @param v v, T/F
#' @details BLAST tools must be version >= 2.0
#' @family run-private
#' @return list
blast_setup <- function(d, v, wd) {
  .log(v = v, wd = wd, 'Checking for valid NCBI BLAST+ Tools ...\n')
  sccdd <- TRUE
  mkblstdb <- file.path(d, 'makeblastdb')
  blstn <- file.path(d, 'blastn')
  for (ech in c(mkblstdb, blstn)) {
    args <- '-version'
    res <- cmdln(cmd = ech, args = args)
    if (res[['status']] != 0) {
      tst <- FALSE
      stderr <- rawToChar(res[['stderr']])
      .log(v = v, wd = wd, paste0('Failed to run: [', ech, ']. Reason:\n',
                                  '[', stderr, ']'))
      sccdd <- FALSE
    } else {
      # test version
      stdout <- rawToChar(res[['stdout']])
      stdout <- strsplit(x = stdout, split = '\n')[[1]]
      vrsn <- gsub('[a-zA-Z:+]', '', stdout[[1]])
      vrsn <- gsub('\\s', '', vrsn)
      write(x = paste0('Filepath: ', ech, ', version: ', vrsn, '\n'),
            append = TRUE, file = file.path(wd, 'blast_versions.txt'))
      vrsn <- as.numeric(strsplit(vrsn, '\\.')[[1]])
      tst <- vrsn[1] >= 2 & vrsn[2] >= 0
      if (tst) {
        .log(v = v, wd = wd, paste0('Found: [', ech, ']\n'))
      } else {
        .log(v = v, wd = wd, paste0('Incorrect version: [', ech, ']\n'))
        sccdd <- FALSE
      }
    }
  }
  if (!sccdd) {
    msg <- 'Unable to find correct versions of NCBI BLAST+ tools\n'
    .log(v = v, wd = wd, paste0('Error:', msg))
    stop(msg)
  }
  ncbi_execs <- list('mkblstdb' = mkblstdb, 'blstn' = blstn)
  ncbi_execs
}

#' @name stage_args_check
#' @title Check stage arguments
#' @description Ensures stage arguments are valid, raises an error if not.
#' @param to ending stage
#' @param frm starting stage
#' @family run-private
#' @return character, stage message
stage_args_check <- function(to, frm) {
  if (to < 1 | frm < 1) {
    stop('Total stages to run cannot be less than 1.')
  }
  if (to > 4 | frm > 4) {
    stop('Total stages to run cannot be more than 4.')
  }
  if (frm > to) {
    stop('Starting stage must always come before ending stage.')
  }
  stgs <- c('taxise', 'download', 'cluster', 'cluster2')
  stgs_msg <- paste0(stgs[frm:to], collapse = ', ')
  paste0('Running stages: ', stgs_msg)
}

#' @name stages_run
#' @title Sequentially run each stage
#' @description Runs stages from \code{frm} to \code{to}. Records stage progress
#' in cache.
#' @param wd Working directory
#' @param to Total number of stages to run
#' @param frm Starting stage to run from
#' @param stgs_msg Printout stage message for log
#' @param rstrt Restarting, T/F
#' @return NULL
#' @family run-private
stages_run <- function(wd, to, frm, stgs_msg, rstrt=FALSE) {
  .run <- function() {
    if (frm <= 1 & to >= 1) {
      if (!ps[['v']]) {
        cat('... Taxise\n')
      }
      taxise_run(wd)
      progress_save(wd, 'taxise')
    }
    if (frm <= 2 & to >= 2) {
      if (!ps[['v']]) {
        cat('... Download\n')
      }
      download_run(wd)
      progress_save(wd, 'download')
    }
    if (frm <= 3 & to >= 3) {
      if (!ps[['v']]) {
        cat('... Cluster\n')
      }
      clusters_run(wd)
      progress_save(wd, 'cluster')
    }
    if (frm <= 4 & to >= 4) {
      if (!ps[['v']]) {
        cat('... Cluster2\n')
      }
      clusters2_run(wd)
      progress_save(wd, 'cluster2')
    }
  }
  ps <- parameters_load(wd)
  # header log
  if (rstrt) {
    msg <- paste0('Restarting pipeline on [', .Platform$OS.type,
                  '] at [', Sys.time(), ']')
  } else {
    msg <- paste0('Running pipeline on [', .Platform$OS.type,
                  '] at [', Sys.time(), ']')
  }
  brdr <- paste0(rep('-', nchar(msg)), collapse = '')
  msg <- paste0(brdr, '\n', msg, '\n', brdr)
  info(ps = ps, lvl = 1, msg)
  info(ps = ps, lvl = 1, stgs_msg)
  errmsg <- try(.run(), silent = TRUE)
  if ('try-error' %in% is(errmsg)) {
    # ctrl+c
    if (grepl('Operation was aborted by an application callback',
              errmsg[[1]])) {
      msg <- paste0('---- Halted by user [', Sys.time(), '] ----')
      .log(v = ps[['v']], wd = ps[['wd']], msg)
      stop(msg)
    }
    # unexpected pipeline error
    msg <- paste0('Unexpected ', errmsg[[1]], '\n', 'Occurred [', Sys.time(),
                  ']\n', 'Contact package maintainer for help.\n')
    .log(v = ps[['v']], wd = ps[['wd']], msg)
    stop(msg)
  }
  # footer log
  msg <- paste0('Completed pipeline at [', Sys.time(), ']')
  brdr <- paste0(rep('-', nchar(msg)), collapse = '')
  msg <- paste0(brdr, '\n', msg, '\n', brdr)
  info(ps = ps, lvl = 1, msg)
}

Try the phylotaR package in your browser

Any scripts or data that you put into this service are public.

phylotaR documentation built on May 1, 2019, 9:26 p.m.