R/normalize_reads.R

Defines functions check_fq write_normalize stackr_normalize normalize_reads

Documented in check_fq normalize_reads stackr_normalize write_normalize

#normalize_reads
#' @name normalize_reads
#' @title Rarefaction of reads samples
#' @description Rarefaction of fasq files by sub-sampling the reads
#' before de novo assembly or alignment.
#' The normalization/standardization/sample size correction step allows to check
#' if some statistics are increasing with read numbers (e.g. heterozygous markers).
#' It's a very easy way to disentangle artifact from biological signal caused by
#' varying read numbers across samples.

#' @param project.info (character, path, optional) When using the stackr pipeline,
#' a project info file is created. This file provides all the info and stats
#' generated by stacks and stackr.
#' The project info file will be updated with the new samples.
#' The project info filename will be appended \code{_normalize}.
#' The file should end with \code{.tsv}.
#' If no \code{project.info} file is provided, the function will have to look
#' at the number of reads in the fastq files and this will take longer.
#' Default: \code{project.info = NULL}.

#' @param fq.files (character, path) Path of folder containing the
#' samples to normalize.

#' @param sample.reads (integer) The number of reads to pick randomly.
#' Default: \code{sample.reads = 1000000}.

#' @param number.replicates (interger) The number of samples to generate.
#' With default, if 20 samples are in the folder, 100 new samples will be generated.
#' Default: \code{number.replicates = 5}.

#' @param random.seed (integer, optional) For reproducibility, set an integer
#' that will be used inside function that requires randomness. With default,
#' a random number is generated and printed in the appropriate output.
#' Default: \code{random.seed = NULL}.

#' @param parallel.core (optional) The number of core for parallel
#' programming. Each samples to normalize is sequentially treated and replicates
#' are generated in parallel.
#' By default, \code{parallel.core = parallel::detectCores() - 1}.
#' This number is adjusted automatically to the number of replicates.

#' @rdname normalize_reads
#' @export
#' @return fastq files with "-1", "-2", "..." appended to the original name.
#' If a project info file was provided, the new replicate samples info is integrated
#' to the file. The modified project info file will have \code{_normalize} appended
#' to the original filename.
#'

#' @examples
#' \dontrun{
#' library(stackr)
#' # To run this function, bioconductor \code{ShortRead} package is necessary:
#' source("http://bioconductor.org/biocLite.R")
#' biocLite("ShortRead")
#' # Using OpenMP threads
#' nthreads <- .Call(ShortRead:::.set_omp_threads, 1L)
#' on.exit(.Call(ShortRead:::.set_omp_threads, nthreads))
#' # using defaults:
#' stackr::normalize_reads(fq.files = "~/corals")
#'
#' # customizing the function:
#' stackr::normalize_reads(
#'    project.info = "project.info.corals.tsv",
#'    fq.files = "~/corals",
#'    sample.reads = 2000000,
#'    number.replicates = 5,
#'    random.seed = 3,
#'    parallel.core = 5)
#'
#' # You then need to run stackr: run_ustacks, run_sstacks, run_tsv2bam, run_gstacks, run_populations
#' # or equivalent if a reference genome.
#' }


# @seealso
# \href{http://catchenlab.life.illinois.edu/stacks/comp/process_radtags.php}{process_radtags}.

# @references todo

normalize_reads <- function(
  project.info = NULL,
  fq.files,
  sample.reads = 1000000,
  number.replicates = 3,
  random.seed = NULL,
  parallel.core = parallel::detectCores() - 1
) {
  opt.change <- getOption("width")
  options(width = 70)
  cat("#######################################################################\n")
  cat("###################### stackr::normalize_reads ########################\n")
  cat("#######################################################################\n")
  timing <- proc.time()

  # Missing argument -----------------------------------------------------------
  # folder is given
  if (missing(fq.files)) stop("fq.files argument is required")

  # Check for required package -------------------------------------------------
  if (!requireNamespace("ShortRead", quietly = TRUE)) {
    stop("ShortRead needed for this function to work.
           Please follow the example for install instructions", call. = FALSE)
  }

  # Check parallel.core and number.replicates -------------------------------------
  if (number.replicates < parallel.core) parallel.core <- number.replicates
  message("Setting parallel.core: ", parallel.core)

  # Set seed for random sampling -----------------------------------------------
  if (is.null(random.seed)) random.seed <- sample(x = 1:1000000, size = 1)
  set.seed(random.seed)

  # Change UPPER CASE file ending ----------------------------------------------
  # check_fq(list_sample_file(f = fq.files, full.path = TRUE))

  # FQ FILES  ------------------------------------------------------------------
  fq.files <- list_sample_file(f = fq.files, full.path = TRUE)
  names(fq.files) <- fq.files
  message("Number of samples: ", length(fq.files))
  fq.type <- stackr::fq_file_type(fq.files)
  path.fq <- unique(dirname(fq.files))
  if (length(path.fq) == 0) path.fq <- getwd()

  # import project info file if present ----------------------------------------
  count.reads <-  TRUE
  fq.to.normalize <- fq.files
  if (!is.null(project.info)) {
    project.info.filename <- stringi::stri_replace_all_fixed(
      str = project.info,
      pattern = ".tsv",
      replacement = "_normalize.tsv",
      vectorize_all = FALSE
    )
    project.info <- suppressMessages(readr::read_tsv(file = project.info))

    # Check FQ to normalize
    want <- c("RETAINED", "NUMBER_READS")
    read.column.name <- purrr::keep(.x = want, .p = want %in% colnames(project.info))
    if (length(read.column.name) == 0) {
      count.reads <- TRUE
      message("Impossible to extract RETAINED or NUMBER_READS in project.info")
      message("Number of reads will be extracted from fq files")
    } else {
      fq.to.normalize <- project.info %>%
        dplyr::filter(INDIVIDUALS %in% stackr::clean_fq_filename(basename(fq.files))) %>%
        dplyr::filter(.data[[read.column.name]] > sample.reads) %$%
        INDIVIDUALS
      if (length(fq.to.normalize) == 0) {
        rlang::abort("Problem between project.info file, fq.files and threshold of sample.reads to use")
      }
      fq.to.normalize <- file.path(path.fq, paste0(fq.to.normalize, fq.type))
      names(fq.to.normalize) <- fq.to.normalize
      message("Number of samples to normalize: ", length(fq.to.normalize))
      count.reads <- FALSE
    }
  }

  rep.name <- suppressWarnings(
    purrr::map(
      .x = fq.to.normalize,
      .f = stackr_normalize,
      count.reads = count.reads,
      number.replicates = number.replicates,
      sample.reads = sample.reads,
      parallel.core = parallel.core
    )
  )

  # Update project info --------------------------------------------------------
  # new.rep.names <- purrr::flatten_chr(new.names)
  if (!is.null(project.info)) {
    message("\nUpdating project info file")
    replicates <- list_sample_file(f = fq.files, full.path = FALSE)
    path.fq
    replicates <- list_sample_file(f = path.fq, full.path = FALSE)


    replicates <- purrr::keep(.x = replicates, .p = !replicates %in% fq.files.short)
    n.rep <- length(replicates)
    fq.file.type <- suppressWarnings(unique(fq_file_type(fq.files.short)))
    new.rep.id <- stringi::stri_replace_all_fixed(
      str = replicates,
      pattern = fq.file.type,
      replacement = "",
      vectorize_all = FALSE)


    rep <- stringi::stri_sub(str = new.rep.id, -(stringi::stri_count_regex(number.replicates, "[0-9]")))
    project.info.normalize <- tibble::add_row(
      .data = project.info,
      INDIVIDUALS_REP = new.rep.id,
      REPLICATES = rep,
      FQ_FILES = replicates,
      TOTAL = rep(sample.reads, n.rep),
      NO_RADTAG = rep(0, n.rep),
      LOW_QUALITY = rep(0, n.rep),
      RETAINED = rep(sample.reads, n.rep)
    )
    readr::write_tsv(x = project.info.normalize, path = stringi::stri_join(project.info.filename, "_normalize.tsv"))
  } else {
    project.info.normalize <- "check folder for normalized samples"
  }

  timing <- proc.time() - timing
  message("\nComputation time: ", round(timing[[3]]), " sec")
  cat("############################## completed ##############################\n")
  options(width = opt.change)
  return(project.info.normalize)
}#normalize_reads


# Internal function ------------------------------------------------------------
#' @title stackr_normalize
#' @description main normalize function
#' @rdname stackr_normalize
#' @export
#' @keywords internal
stackr_normalize <- function(
  fq.to.normalize,
  count.reads = FALSE,
  number.replicates,
  sample.reads,
  parallel.core = parallel::detectCores() - 1
) {
  # fq.to.normalize <- fq.to.normalize[1]
  clean.names <- stackr::clean_fq_filename(basename(fq.to.normalize))

  # check number of reads
  if (count.reads) {
    n.reads <- read_count_one(fastq.files = fq.to.normalize) %$% NUMBER_READS
    if (n.reads < sample.reads) {
      message("Number of reads lower than threshold: no normalization")
      return(rep.name = NULL)
    }
  }
  message("\nNormalizing sample: ", clean.names)

  suppressWarnings(subsample.random <- ShortRead::FastqSampler(con = fq.to.normalize, n = sample.reads, verbose = TRUE, ordered = TRUE))
  message("Generating ", number.replicates, " normalized replicates")
  message("Number of reads sampled for each replicates: ", sample.reads)
  rep.name <- suppressWarnings(
    .stackr_parallel_mc(
      X = 1:number.replicates,
      FUN = write_normalize,
      mc.cores = parallel.core,
      subsample.random = subsample.random,
      fq.to.normalize = fq.to.normalize,
      sample.reads = sample.reads
      )
    )
  return(rep.name)
}#End stackr_normalize


#' @title write_normalize
#' @description Write the normalize fastq file
#' @rdname write_normalize
#' @export
#' @keywords internal
write_normalize <- function(number.replicates, subsample.random, fq.to.normalize, sample.reads) {
  message("Writing normalized samples: ", number.replicates)
  new.sample <- ShortRead::yield(subsample.random)
  fq.type <- stackr::fq_file_type(fq.to.normalize)
  new.name <- stringi::stri_replace_all_fixed(
    str = fq.to.normalize,
    pattern = fq.type,
    replacement = stringi::stri_join("-", sample.reads, "-", number.replicates, fq.type),
    vectorize_all = FALSE)

  suppressWarnings(ShortRead::writeFastq(new.sample, new.name))
  return(new.name)
}#End write_normalize


#' @title check_fq
#' @description Check if weird fq UPPER CASE ending is used
#' @rdname check_fq
#' @export
#' @keywords internal

check_fq <- function(fq.files, parallel.core = parallel::detectCores() - 1) {
  change.fq <- tibble::as_tibble(list(OLD_FQ = c(fq.files))) %>%
    dplyr::mutate(NEW_FQ = TRUE %in% stringi::stri_detect_fixed(str = OLD_FQ, pattern = c(".FASTQ.GZ", ".FASTQ.gz"))) %>%
    dplyr::filter(NEW_FQ)
  fq.check <- nrow(change.fq)
  if (fq.check > 0) {
    message(fq.check," UPPER CASE fq file(s) ending found in the folder")
    message("    Renaming to lower case...")
    change.fq <- change.fq %>%
      dplyr::mutate(
        NEW_FQ = stringi::stri_replace_all_regex(
          str = OLD_FQ,
          pattern = fq_file_type(OLD_FQ),
          replacement = stringi::stri_trans_tolower(fq_file_type(OLD_FQ)),
          vectorize_all = FALSE))


    if (fq.check < parallel.core) parallel.core <- fq.check

    # purrr::walk(.x = list(change.fq$OLD_FQ), .f = rename_fq, change.fq = change.fq)
    rename_fq(change.fq, parallel.core = parallel.core)
  }
  change.fq <- fq.check <- NULL
}
thierrygosselin/stackr documentation built on Nov. 11, 2020, 11 a.m.