R/workflow_steps.R

Defines functions do_sample_deplex do_gather_samples do_marker_deplex

#' @importFrom dplyr filter as_tibble arrange_all mutate
#' @importFrom tidyr gather spread
do_sample_deplex <- function(work_dir, sample_table, reads_fwd, reads_rev, barcodes_fwd, barcodes_rev,
                             dest_dir = work_dir) {

  message("---- demultiplexing samples ----")

  check_dir_creation(work_dir, overwrite = TRUE)
  stopifnot(is_string(dest_dir), dir.exists(dest_dir))

  res_1 <-
    HaplotypR::demultiplexReads(fastqFileFwd = reads_fwd,
                                fastqFileRev = reads_rev,
                                barcodeFileFwd = barcodes_fwd,
                                barcodeFileRev = barcodes_rev,
                                outputDir = work_dir)

  res_2 <-
    HaplotypR::renameDemultiplexedFiles(sampleTab = sample_table,
                                        resTab = res_1) %>%
    as_tibble() %>%
    arrange_all()

  if (work_dir != dest_dir) {
    # mv files to dest dir

    dest_work_dir <- file.path(dest_dir, basename(work_dir))
    check_dir_creation(dest_work_dir, overwrite = TRUE)

    res_3 <-
      res_2 %>%
      gather(FileR1, FileR2, key = 'dr', value='fn') %>%
      mutate(fn = if_else(!is.na(SampleID),
                          file.path(dest_dir, basename(fn)),
                          file.path(dest_work_dir, basename(fn)))) %>%
      spread(dr, fn) %>%
      arrange_all()

    fno <- c(res_2$FileR1, res_2$FileR2)
    fnn <- c(res_3$FileR1, res_3$FileR2)
    success <- file.rename(fno, fnn)
    if (! all(success)) {
      warning('Failed to move ', sum(!success), ' files, including: ', stringr::str_c(head(fno[!success]), collapse = ', '))
    } else {
      unlink(work_dir, recursive = TRUE)
    }

    return(res_3)
  }

  return(res_2)
}

#' @importFrom dplyr "%>%" select filter mutate summarise group_by ungroup
#' @importFrom tidyr gather spread
do_gather_samples <- function(out_dir, sample_deplex_table, nread_gather_target,
                              split_suffix = '_split_[0-9]+$') {

  message("---- gathering split sample fastqs ----")

  filter(sample_deplex_table, !is.na(SampleID), ReadNumbers > 0) %>%
    mutate(sample_id = str_remove(SampleID, split_suffix)) %>%
    group_by(sample_id) %>%
    arrange(ReadNumbers) %>%
    mutate(cs = cumsum(ReadNumbers),
           group = ceiling(cs / nread_gather_target)) %>%
    gather(FileR1, FileR2, key = 'direction', value = 'readfile') %>%
    group_by(sample_id, group, direction) %>%
    summarise(SampleID = str_c(sample_id[1], '_gather_', group[1]),
              SampleName = SampleName[1],
              BarcodePair = BarcodePair[1],
              ReadNumbers = sum(ReadNumbers),
              readfile = list(readfile)) %>%
    ungroup() %>%
    mutate(newreadfile = file.path(out_dir, str_c(SampleID,
                                                  if_else(direction == 'FileR1', '_R1', '_R2'),
                                                  '.fastq.gz')),
           readfile = future_map2_chr(readfile, newreadfile, function(rf, nrf) {
             `if`(length(rf) > 1,
                  HaplotypReportR::concatFastq(rf, nrf),
                  { file.copy(rf, nrf); nrf })
           })) %>%
    select(-newreadfile) %>%
    spread(direction, readfile) %>%
    select(colnames(sample_deplex_table))
}


#' @importFrom dplyr filter "%>%"
#' @importFrom furrr future_map_dfr
do_marker_deplex <- function(out_dir, marker_table, sample_deplex_table) {

  message("---- demultiplexing markers ----")

  sample_deplex_table %>%
    split.data.frame(seq_len(nrow(.))) %>%
    future_map_dfr(function(sm_tab) {
      HaplotypR::demultiplexByMarker(sampleTable = sm_tab,
                                     markerTable = marker_table,
                                     outputDir = out_dir)
    }) %>%
    filter(numReadOut > 0)
}

#' @importFrom dplyr "%>%" select filter mutate summarise group_by ungroup
#' @importFrom tidyr gather spread
do_gather_markers <- function(out_dir, marker_deplex_table) {

  message("---- gathering split marker fastqs ----")

  marker_deplex_table %>%
    mutate(SampleID = str_remove(SampleID, '_gather_[0-9]+$')) %>%
    gather(FileR1, FileR2, key = 'direction', value = 'readfile') %>%
    group_by(MarkerID, SampleID, direction) %>%
    summarise(SampleName = SampleName[1],
              BarcodePair = BarcodePair[1],
              numReadIn = sum(numReadIn, na.rm = T),
              numReadOut = sum(numReadOut, na.rm = T),
              readfile = list(readfile)) %>%
    ungroup() %>%
    mutate(newreadfile = file.path(out_dir, str_c(SampleID, '_', MarkerID,
                                                  if_else(direction == 'FileR1', '_F', '_R'),
                                                  '.fastq.gz')),
           readfile = future_map2_chr(readfile, newreadfile, function(rf, nrf) {
             `if`(length(rf) > 1,
                  HaplotypReportR::concatFastq(rf, nrf),
                  { file.copy(rf, nrf); nrf })
           })) %>%
    select(-newreadfile) %>%
    spread(direction, readfile) %>%
    select(colnames(marker_deplex_table))
}

#' @importFrom dplyr filter "%>%"
#' @importFrom furrr future_map
do_quality_inspection <- function(marker_deplex_table,
                                  trim_target_qual,
                                  trim_target_qual_percent,
                                  trim_target_len_percent) {

  message("---- inspecting read quality ----")

  qual_res <-
    marker_deplex_table %>%
    split.data.frame(.$MarkerID) %>%
    future_map(function(mkr_tab) {

      smry <-
        summarise_read_quality(fwd_fqs = mkr_tab$FileR1,
                               rev_fqs = mkr_tab$FileR2)

      trim_par <-
        get_quality_trim_pos(denisty_by_pos_and_qual = smry$denisty_by_pos_and_qual,
                             summary_by_pos = smry$summary_by_pos,
                             trim_target_qual=trim_target_qual,
                             trim_target_qual_percent=trim_target_qual_percent,
                             trim_target_len_percent=trim_target_len_percent)

      c(smry, list(trim_par = trim_par))
    })
}

#' @importFrom dplyr filter "%>%" tibble select anti_join bind_cols rowwise ungroup
#' @importFrom furrr future_pmap_dfr
#' @importFrom purrr map2_df pmap_dfr
#' @importFrom Biostrings xscat width
#' @importFrom ShortRead narrow
do_merge_or_trim <- function(out_dir, marker_deplex_table, min_merge_overlap, trim_par, ref_seq, read_join_strategy) {

  message("---- merging/trimming reads ----")

  merge_tbl <-
    trim_par %>%
    map2_df(names(.), function(x, n) { mutate(x$len, MarkerID = n) }) %>%
    mutate(width = width(ref_seq[MarkerID])) %>%
    filter(trim_fwd + trim_rev > width + min_merge_overlap) %>%
    mutate(postfix = 'merge', action = 'merge')

  trim_tbl <-
    tibble(MarkerID = names(ref_seq), width = width(ref_seq)) %>%
    left_join(trim_par %>%
                map2_df(names(.), function(x, n) { bind_rows(x) %>% mutate( MarkerID = n) }) %>%
                group_by(MarkerID) %>%
                summarise(trim_fwd = min(trim_fwd),
                          trim_rev = min(trim_rev)), 'MarkerID') %>%
    mutate(f_right = pmin(trim_fwd, width),
           r_left = pmax(1, width - trim_rev),
           mid = (f_right + r_left) / 2,
           trim_fwd = if_else(f_right >= r_left, as.integer(floor(mid)), trim_fwd),
           trim_rev = if_else(f_right >= r_left, as.integer(ceiling(width - mid)), trim_rev),
           postfix = str_c('bind', trim_fwd, trim_rev, sep = '_'),
           action = 'bind') %>%
    select(MarkerID, trim_fwd, trim_rev, width, postfix, action)

  if (read_join_strategy == 'bind') {
    merge_trim_tbl <- trim_tbl
  } else {
    merge_trim_tbl <- bind_rows(merge_tbl, anti_join(trim_tbl, merge_tbl, 'MarkerID'))
  }

  merge_set <- filter(merge_trim_tbl, action == 'merge') %>%  pull(MarkerID)

  ref_seq_trim <-
    with(trim_tbl, {
      xscat(narrow(ref_seq[MarkerID], start = 1, width = trim_fwd),
            narrow(ref_seq[MarkerID], end = width, width = trim_rev)) %>%
        magrittr::set_names(MarkerID)
    })

  ref_seq_trim[merge_set] <- ref_seq[merge_set]

  # merge or trim and fuse paired read
  processed_reads <-
    merge_trim_tbl %>%
    pmap_dfr(function(MarkerID, trim_fwd, trim_rev, action, ...) {
      filter(marker_deplex_table, MarkerID == !!MarkerID) %>% {
        if (action == 'bind') {

          bind_cols(
            select(., SampleID, SampleName, BarcodePair, MarkerID),
            future_pmap_dfr(., function(FileR1, FileR2, ...) {
              HaplotypR::bindAmpliconReads(fastqFileR1 = FileR1,
                                           fastqFileR2 = FileR2,
                                           outputDir = out_dir,
                                           read1Length = trim_fwd,
                                           read2Length = trim_rev)
            })) %>%
            mutate(action = 'bind')
        } else {

          bind_cols(
            select(., SampleID, SampleName, BarcodePair, MarkerID),
            future_pmap_dfr(., function(FileR1, FileR2, ...) {
              HaplotypR::mergeAmpliconReads(fastqFileR1 = FileR1,
                                            fastqFileR2 = FileR2,
                                            outputDir = out_dir)
            })) %>%
            mutate(action = 'merge')
        }
      }
    })

  list(trimmed_ref_seq = ref_seq_trim,
       merge_trim_tbl = merge_trim_tbl,
       processed_reads_table = processed_reads)
}

#' @importFrom dplyr filter "%>%" mutate bind_rows
#' @importFrom furrr future_map_chr
do_subsample_reads <- function(processed_reads_table,
                               max_proc_reads) {

  message("---- subsampling processed reads ----")

  bind_rows(
    filter(processed_reads_table, numRead <= max_proc_reads),
    filter(processed_reads_table, numRead > max_proc_reads) %>%
      mutate(ReadFile = future_map_chr(ReadFile, function(rf) { sampleFastq(rf, max_proc_reads) }),
             numRead = max_proc_reads))

}

#' @importFrom dplyr filter "%>%" tibble select mutate as_tibble n summarise group_by
#' @importFrom furrr future_pmap_dfr
#' @importFrom tidyr everything
do_snp_analysis <- function(processed_reads_table,
                            trimmed_ref_seq,
                            snp_min_mismatch_rate = 0.50,
                            snp_min_obs = 2,
                            snp_min_coverage = 100) {

  message("---- running SNP anlaysis ----")

  mismatch_rate <-
    processed_reads_table %>%
    future_pmap_dfr(function(SampleID, MarkerID, ReadFile, ...) {
      HaplotypR::calculateMismatchFrequencies(ReadFile,
                                              referenceSequence = trimmed_ref_seq[MarkerID],
                                              method = "pairwiseAlignment",
                                              minCoverage = 1L) %>%
        { .[[1]] } %>%
        as_tibble() %>%
        mutate(Pos = seq_len(n()),
               MarkerID = MarkerID,
               sample_id = SampleID)
    }) %>%
    select(MarkerID, sample_id, everything()) %>%
    mutate(rate = MisMatch / Coverage) %>%
    filter(MisMatch > 0)

  snp_set <-
    mismatch_rate %>%
    filter(Coverage >= snp_min_coverage,
           rate >= snp_min_mismatch_rate) %>%
    group_by(MarkerID, Pos) %>%
    summarise(count = n()) %>%
    filter(count >= snp_min_obs) %>%
    mutate(Ref = as.character(Biostrings::subseq(trimmed_ref_seq[MarkerID], start=Pos, width=1)),
           Alt = 'N') %>%
    ungroup() %>%
    select(-count)

  list(mismatch_rate = mismatch_rate,
       snp_set = snp_set)
}

#' @importFrom dplyr filter "%>%" mutate n summarise group_by semi_join left_join
#' @importFrom furrr future_map_dfr
#' @importFrom purrr map flatten
do_haplotype_analysis <- function(marker_table,
                                  processed_reads_table,
                                  trimmed_ref_seq,
                                  marker_trim_par,
                                  snp_set,
                                  out_dir,
                                  hap_sample_min_coverage = 300,
                                  hap_min_coverage = 3,
                                  hap_min_freq_detection = 1/100) {

  message("---- running haplotype anlaysis ----")

  snp_list <-
    split.data.frame(snp_set, snp_set$MarkerID) %>%
    map(~ select(., -MarkerID))

  haplotype_results <-
    marker_table %>%
    left_join(marker_trim_par, 'MarkerID') %>%
    # TODO - warn earlier if no snps are detected for a marker
    semi_join(snp_set, 'MarkerID') %>%
    split.data.frame(.$MarkerID) %>%
    future_map_dfr(function(marker_tab) {

      mid <- marker_tab$MarkerID
      out_marker_hap <- file.path(out_dir, mid)
      check_dir_creation(out_marker_hap, overwrite = TRUE)

      sample_marker_reads <-
        processed_reads_table %>%
        filter(MarkerID == mid)

      `if`(nrow(sample_marker_reads) > 0,
           HaplotypR::createFinalHaplotypTable(outputDir = out_marker_hap,
                                               sampleTable = sample_marker_reads,
                                               markerTable = marker_tab,
                                               referenceSequence = trimmed_ref_seq[mid],
                                               snpList = snp_list[mid],
                                               postfix = marker_tab$postfix,
                                               minHaplotypCoverage = hap_min_coverage,
                                               minReplicate = 2,
                                               detectability = hap_min_freq_detection,
                                               minSampleCoverage = hap_sample_min_coverage) %>%
             flatten() %>%
             as.data.frame() %>%
             mutate_if(is.factor, as.character) %>%
             group_by(SampleID, SampleName, MarkerID, Haplotype) %>%
             summarise(Reads = sum(Reads, na.rm = T)) %>%
             group_by(SampleID, SampleName, MarkerID) %>%
             mutate(Frequency = Reads / sum(Reads, na.rm = TRUE)) %>%
             ungroup(),
           NULL)

    })
}

#' @importFrom dplyr filter "%>%" pull
#' @importFrom purrr map
#' @importFrom Biostrings readDNAStringSet
do_read_haplotypes <- function(trimmed_ref_seq,
                               haplotypes_dir,
                               haplotype_res) {

  haplotypes <-
    names(trimmed_ref_seq) %>%
    map(function(marker) {

      haps <- readDNAStringSet(list.files(file.path(haplotypes_dir, marker), pattern = 'HaplotypeSeq', full.names = T))

      hap_set <-
        haplotype_res %>% filter(MarkerID == marker) %>% pull(Haplotype) %>%
        unique() %>% na.omit() %>% intersect(names(haps)) %>% sort()

      haps[hap_set]
    }) %>%
    magrittr::set_names(names(trimmed_ref_seq))
}

#' @importFrom dplyr left_join rename "%>%" select mutate ungroup group_by filter arrange_all
#' @importFrom purrr map2_df
do_rename_haplotypes <- function(haplotype_res,
                                 haplotype_seqs,
                                 marker_panel,
                                 novel_hap_name = 'novel') {

  map2_df(names(haplotype_seqs), haplotype_seqs, function(marker, seqs){
    tibble(MarkerID = marker,
           OldHaplotype = names(seqs),
           Sequence = as.character(seqs))
  }) %>%
    left_join(rename(marker_panel, NewHaplotype = Haplotype),
              by = c('MarkerID', 'Sequence')) %>%
    { bind_rows(filter(., !is.na(NewHaplotype)),
                filter(., is.na(NewHaplotype)) %>%
                  arrange(OldHaplotype) %>%
                  group_by(MarkerID) %>%
                  mutate(i = seq_len(n())) %>%
                  ungroup() %>%
                  mutate(NewHaplotype = str_c(MarkerID, '-', novel_hap_name, '-', format_int(i, min_width = 2L)))
    ) } %>%
    select(OldHaplotype, NewHaplotype) %>%
    arrange_all()
}

#' @importFrom purrr map
do_align_haplotypes <- function(trimmed_ref_seq,
                                haplotype_seqs) {

  alignments <-
    names(trimmed_ref_seq) %>%
    map(function(marker) {

      ref <- magrittr::set_names(trimmed_ref_seq[marker], str_c(marker, '-reference'))
      seq_set <- c(ref, haplotype_seqs[[marker]])

      if(length(seq_set) > 1){
        DECIPHER::AlignSeqs(seq_set, verbose = FALSE) %>%
          { c(.[1], .[sort(names(.[-1]))] ) }
      } else {
        ref
      }
    }) %>%
    magrittr::set_names(names(trimmed_ref_seq))
}
bahlolab/HaplotypReportR documentation built on Dec. 2, 2019, 7:36 p.m.