R/load_data.R

Defines functions init_peprDB .check_db_table load_peprMeta .parse_stat_name .read_metrics .met_df load_metrics .get_dataset_name load_fastqc .parse_sam_report load_purity .vcf_to_df load_pilon load_consensus .parse_depth_name .depth_to_df load_depth .parse_varscan_filename .read_varscan load_varscan createPeprDB

## Load Data ===================================================================
## Code to load data for ROA
## reads datafiles parse into dataframes and loads into sqlite DB
## =============================================================================

## Initiate DB -----------------------------------------------------------------
#' Initiates sqlite database for storing pepr pipeline output data.
#'
#' @param data_dir data directory generated by the pepr pipeline,
#' will generate new database in directory if db_path is not provided.
#' @param db_path optional path to initiate the database
#' @param Set to FALSE to have database be read-only, default TRUE
#' @return dplyr database ....
#' @examples
#' init_peprDB("/path/to/PEPR-DATA")
#' init_peprDB(db_path = "/path/to/peprDB.sqlite", create = TRUE)
init_peprDB <- function(data_dir = NULL, db_path = NULL, create = TRUE){
    if(is.null(db_path)){
        db_path <- stringr::str_c(data_dir, "peprDB.sqlite", sep = "/")
    }else if(is.null(db_path) & is.null(data_dir)){
            stop("Either data_dir or db_path are needed")
    }
        return(dplyr::src_sqlite(db_path, create = create))
}

# check to see if table already in databse if so, skips step and prints warning
# message
.check_db_table <- function(table, db_con){
    db_tables <- dplyr::src_tbls(db_con)
    if(is.null(db_tables)){
        return(FALSE)
    }else if(table %in% db_tables){
        warning(paste0(table, " table already in database skip loading"))
        return(TRUE)
    }
    FALSE
}

## Metadata --------------------------------------------------------------------
#' Loads metadata file from pepr pipeline into database
#' @param metadata yaml file with pipeline metadata
#' @param db_con database connection
#' @return database
#' @examples
#' load_peprMeta("/path/to/metadata.yaml", peprDB)
load_peprMeta <- function(metadata, db_con){
    if(.check_db_table("exp_design", db_con)){
        return()
    }
    yList <-  yaml::yaml.load_file(metadata)
    # need to figure out what to do with general metadata ....
    ydf <- dplyr::data_frame()
    for(i in 1:length(yList[["exp_design"]])){
        subList <- yList[["exp_design"]][[i]]
        for(j in 1:length(subList)){
            sdf <- dplyr::as_data_frame(subList[[j]])
            ydf <- dplyr::bind_rows(ydf, sdf)
        }
    }
    dplyr::copy_to(dest = db_con,df = ydf,
                   name = "exp_design",
                   temporary = FALSE)
}

#### Load metrics --------------------------------------------------------------
# Parse metric file names
.parse_stat_name <- function(file_name){
    name_split <- stringr::str_split(file_name,pattern = "/")[[1]]
    name_file <- length(name_split)
    stringr::str_split(name_split[length(name_split)],
                       pattern = "_stats")[[1]][1]
}

# loading individual metric files
.read_metrics <- function(metrics_file, nskip, n_rows){
    ## warning for n_rows - skips file instead of throwing error
    m_df <- try(read.table(metrics_file,
                        skip = nskip, sep ="\t", nrows = n_rows,
                       header =T, stringsAsFactors=F))
    if(class(m_df)=='try-error') {
     warning(paste0(metrics_file, " is empty"))
     return()
    }
    m_df%>%
        dplyr::mutate(accession = .parse_stat_name(metrics_file))
}

# generating df with metrics data
.met_df <- function(met_type, metrics_files){
    met_list <- grep(pattern = met_type[[1]], metrics_files,value = TRUE)
    plyr::ldply(.data = met_list,.fun = .read_metrics,
                nskip = met_type[[2]], n_rows = met_type[[3]])
}

#' Loads sequence dataset metric file generated by picard into database
#' @param metrics_dir directory with picard metrics output
#' @param db_con database connection
#' @return NULL
#' @examples
#' load_metrics("/path/to/metrics_dir", peprDB)
load_metrics <- function(metrics_dir, db_con){
    # alignment summary metrics
    metrics_files <- list.files(metrics_dir,"_stat",full.names = TRUE)
    if(length(metrics_files) == 0){
        stop('No "*_stat" files in provided directory')
    }
    metric_types <- list("align"=list("*alignment_summary_metrics", 6, -1),
                       "quality"= list("*quality_distribution_metrics", 6, -1),
                       "cycle"=list("*quality_by_cycle_metric", 6, -1),
                       "insert_size"=list("*insert_size_metric", 6,1),
                       "insert_hist"=list("*insert_size_metric", 10, -1))
    met_df_list <- purrr::map(metric_types, .f = .met_df, metrics_files)
    for(i in names(met_df_list)){
        if(!.check_db_table(i, db_con)){
        dplyr::copy_to(dest = db_con,df = met_df_list[[i]],name = i, temporary = FALSE)
        }
    }
}

## extract dataset name from file name
.get_dataset_name <- function(fastqc_data_file){
    split_dir_fqc <- stringr::str_split(fastqc_data_file,pattern = "/")[[1]]
    fqc_name <- grep(pattern = "*_fastqc", x= split_dir_fqc, value = TRUE) %>%
        stringr::str_replace(pattern = "_fastqc",replacement = "") %>%
        stringr::str_replace(pattern = "_1",replacement = "")
}

#' Loads sequence dataset metric files generated by fastqc into database
#' @param metrics_dir directory with fastqc output
#' @param db_con database connection
#' @return NULL
#' @examples
#' load_fastqc("/path/to/metrics_dir", peprDB)
load_fastqc <- function(metrics_dir, db_con){
    metrics_files <- list.files(metrics_dir,"fastqc_data.txt",
                                full.names = TRUE,recursive = TRUE)
    read_fastqc <- plyr::llply(metrics_files, readFastQC)
    names(read_fastqc) <-  plyr::llply(metrics_files, .get_dataset_name)

    for(i in c("Per_base_sequence_quality","Per_sequence_quality_scores",
               "Sequence_Length_Distribution")){
        if(!.check_db_table(i, db_con)){
            df <- plyr::ldply(read_fastqc,i) %>% dplyr::rename(accession=.id)
            dplyr::copy_to(dest = db_con,df = df,name = i, temporary = FALSE)
        }
    }
}

#### Purity --------------------------------------------------------------------
.parse_sam_report <- function(inputfile){
    # 'Parse pathoscope sam-report.tsv output files from pathoscope'
    if(file.info(inputfile)$size == 0){
        warning(paste0("Input file ", inputfile, " is empty"))
    }
    report <- read.table(file = inputfile,header = TRUE,
                         sep = "\t",skip = 1,
                         stringsAsFactors = FALSE)
    report$accession <- stringr::str_replace(string = inputfile,
                                             pattern = ".*/",
                                             replacement = "")  %>%
                        stringr::str_replace(pattern = "-sam-report.tsv",
                                             replacement = "")
    #adding meta-data
    meta <- stringr::str_split(string = readLines(inputfile, n = 1), pattern = "\t")[[1]]
    report$Aligned_Reads <- meta[2]
    report$Mapped_Genome <- meta[4]
    return(report)
}

#' Loads pathoscope output data into sqlite db
#' @param purity_dir directory with pathoscope output
#' @param db_con database connection
#' @return NULL
#' @examples
#' load_purity("/path/to/purity_dir", peprDB)
load_purity <- function(purity_dir, db_con){
    if(.check_db_table("purity", db_con)){
        return()
    }
    purity_df <- list.files(purity_dir, pattern = "*sam-report.tsv",
                            full.names = TRUE, recursive = TRUE)  %>%
                    plyr::ldply(.parse_sam_report)
    dplyr::copy_to(dest = db_con,df = purity_df,
                   name = "purity", temporary = FALSE)
    rm(purity_df)
}



#### Genome Assembly -----------------------------------------------------------
#converting pilon generated vcf to df
.vcf_to_df <- function(vcf_file){
    vcf_loc <- stringr::str_split(vcf_file,pattern = "/") %>% unlist()
    pilon_loc <- grep(".*pilon",x = vcf_loc,value = TRUE)
    ref <- stringr::str_replace(vcf_file, pattern = pilon_loc, replacement = "ref") %>%
        stringr::str_replace(pattern = "vcf", replacement = "fasta")
    vcf <- VariantAnnotation::readVcf(file = vcf_file,genome = ref)
    vcf_tbl <- cbind(fixed(vcf), info(vcf))
    vcf_tbl$CHROM <- rownames(vcf_tbl)
    vcf_tbl$POS <- start(ranges(rowData(vcf)))
    return(as.data.frame(vcf_tbl))
}

#' Loads pilon output data into sqlite db
#' @param pilon_dir directory with pilon results
#' @param db_con database connection
#' @return NULL
#' @examples
#' load_pilon("/path/to/purity_dir", peprDB)
load_pilon <- function(pilon_dir, db_con){
    if(.check_db_table("pilon_changes", db_con)){
        return()
    }
    # added miseq to file search need to change to analyze multiple platforms without hard coding platform
    changes_file <- list.files(pilon_dir, pattern = "*miseq.changes",
                                full.names = TRUE, recursive = TRUE)
    if(length(as.vector(changes_file))> 1){
        warning("more than one pilon changes file in directory skipping loading into database")
    }else if(length(as.vector(changes_file))==0){
        warning("no changes file in pilon directory")
    }else if(length(readLines(changes_file)) < 1){
            warning("no changes in pilon changes file")
    }else{
        changes_col_names <- c("chrom_ref", "chrom_pilon","seq_ref","seq_pilon")
        #try using fill = TRUE to fix issue with unequal number of columns in rows
        changes_df <- read.table(file = changes_file, header = FALSE, sep = " ",
                                 col.names = changes_col_names,
                                 stringsAsFactors = FALSE) %>%
            tidyr::separate(col = chrom_ref,
                            into = c("chrom_ref","coord_ref"), sep = ":") %>%
            tidyr::separate(col = chrom_pilon,
                            into = c("chrom_pilon","coord_pilon"), sep = ":")
        dplyr::copy_to(dest = db_con,df = changes_df,
                       name = "pilon_changes", temporary = FALSE)
    }
    #better way to do this than two if statements
    ## commenting out currently not working
#     vcf_file <- list.files(pilon_dir, pattern = "*vcf",
#                            full.names = TRUE, recursive = TRUE)
#     if(length(as.vector(vcf_file))> 1){
#         warning("more than one pilon vcf file in directory skipping loading into database")
#     }else if(length(as.vector(vcf_file))==0){
#         warning("no vcf file in pilon directory")
#     }else {
#         vcf_df <- .vcf_to_df(vcf_file)
#         dplyr::copy_to(dest = db_con,df = vcf_df,
#                        name = "pilon_vcf", temporary = FALSE)
#     }
}


##### Base level analysis ------------------------------------------------------
#' Loads full genome vcf output data into sqlite db
#' @param consensus_dir directory with consensus results
#' @param db_con database connection
#' @return NULL
#' @examples
#' load_consensus("/path/to/purity_dir", peprDB)
load_consensus <- function(consensus_dir, db_con, platforms = c("miseq", "pgm")){
    # error with input file for fread
    for(i in platforms){
        # need to add pacbio - change to read platforms from input metadata file
        # workout method to load using less RAM
        tsv_file <- list.files(consensus_dir,
                               pattern = paste0("*",i,".tsv"),
                               full.names = TRUE)

        cb_table <- paste0("cb_",i)
        if(!.check_db_table(cb_table, db_con)){
            .cbtsv_tosql(tsv_file, db_con,cb_table)
        }
        # reducing memory requrements after loading
        rm(tsv_file)

        pur_table <- paste0("pur_",i)
        if(!.check_db_table(pur_table, db_con)){
            .pur_tbl(cb_table, db_con = db_con, pur_table)
        }

        pur_plat_table <- paste0("pur_",i, "_pooled")
        if(!.check_db_table(pur_plat_table, db_con)){
            .pur_plat(pur_table, db_con = db_con, pur_plat_table)
        }
    }
    if(!.check_db_table("pur_join", db_con)){
        .pur_plat_join(pur_plat_tbl1 = "pur_miseq_pooled",
                       pur_plat_tbl2 = "pur_pgm_pooled",
                       db_con,
                       tbl_name = "pur_pooled_join",
                       plat1_name = "miseq", plat2_name = "pgm")
    }
}

##### Depth ------------------------------------------------------
# Parse depth file names
.parse_depth_name <- function(file_name){
    name_split <- stringr::str_split(file_name,pattern = "_")[[1]]
    name_file <- length(name_split)
    stringr::str_split(name_split[length(name_split)],
                       pattern = ".depth")[[1]][1]
}

.depth_to_df <- function(file_name){
    #acc <- .parse_depth_name(file_name)
    print(file_name)
    depth_df <- data.table::fread(file_name) %>%
        dplyr::rename(REF = V1, POS = V2, COV = V3)
    depth_df$accession <- .parse_depth_name(file_name)
    depth_df
}
#' Loads full genome depth output data into sqlite db
#' @param depth_dir directory with depth results
#' @param db_con database connection
#' @return NULL
#' @examples
#' load_depth("/path/to/purity_dir", peprDB)
load_depth <- function(depth_dir, db_con){
    # error with input file for fread
    if(!.check_db_table("seq_depth", db_con)){

        depth_df <- list.files(depth_dir,
                                  pattern = ".depth",
                                  full.names = TRUE) %>%
                           plyr::ldply(.depth_to_df)

#         depth_df_list <- list()
#         for(i in depth_files){
#             accession <- .parse_depth_name(i)
#             depth_df <- data.table::fread(i) %>%
#                 dplyr::rename(REF = V1, POS = V2, COV = V3) %>%
#                 dplyr::mutate(accession = accession) %>%
#                 dplyr::bind_rows(depth_df)
#         }

        dplyr::copy_to(dest = db_con,df = depth_df, name = "seq_depth",
                       temporary = FALSE)
    }
}

## Coverage
#' Per sample coverage
#'
#' @param db_con
#' @param platforms
#'
#' @return adds coverage table to db
#' @export
#'
#' @examples coverage_table(peprDB)
coverage_table <- function (db_con){
    if(.check_db_table("coverage", db_con)){
        return()
    }
    dplyr::tbl(src = db_con, from = "seq_depth") %>%
        dplyr::group_by(accession) %>%
        dplyr::summarise(COV = median(COV)) %>%
        dplyr::compute(name = "coverage", temporary = FALSE)
}

#### Homogeneity ---------------------------------------------------------------
.parse_varscan_filename <- function(filename){
    dat_name <- stringr::str_split(filename,pattern = "_") %>% unlist()
    dat_name[length(dat_name)-1] %>%
        stringr::str_split(pattern = "-") %>% unlist()
}

.read_varscan <- function(file){
    df <- read.table(file, sep ="\t",
                     header = T, stringsAsFactors = F)
    if(nrow(df)== 0){
        ds_names <- .parse_varscan_filename(file)
        df$normal <- character(0)
        df$tumor <- character(0)
        # added line to try and fix bug with df[1,] <- rep(NA, length(colnames(df)))
        return(df)
    }
    df$ref <- as.character(df$ref)
    df$var <- as.character(df$var)
    df$normal_gt <- as.character(df$normal_gt)
    ds_names <- .parse_varscan_filename(file)
    df$normal <- ds_names[1]; df$tumor <- ds_names[2]
    df
}

#' Loads varscan output data into sqlite db
#' @param homogeneity_dir directory with varscan results
#' @param db_con database connection
#' @return NULL
#' @examples
#' load_pilon("/path/to/homogeneity_dir", peprDB)
load_varscan <- function(homogeneity_dir, db_con){
    for(i in c("varscan-indel", "varscan-snp")){
        tbl_name <- stringr::str_replace(i, "-","_")
        if(!.check_db_table(tbl_name, db_con)){

            file_list <- list.files(homogeneity_dir,
                                    pattern = paste0("*", i, "*"),
                                    full.names = TRUE)
            varscan_df <- purrr::map(.x = file_list, .f = .read_varscan) %>%
                dplyr::bind_rows()
            if(nrow(varscan_df)== 0){
                message(paste0("No rows in ",i," files, not added to database"))
            } else {
                dplyr::copy_to(dest = db_con,df = varscan_df,
                               name = tbl_name,
                               temporary = FALSE)
            }
        }
    }
}
## Create db -------------------------------------------------------------------
#' Generates sqlite database from pipeline output
#' @param db_path path for database
#' @param params_yaml yaml pipeline configuration file
#' @param qc_stats_dir directory with qc metrics
#' @param homogeneity_dir directory with homogeneity results
#' @param consensus_dir directory with consensus results
#' @param purity_dir directory with purity results
#' @param pilon_dir directory with pilon results
#' @return NULL
createPeprDB <- function(db_path,
                         param_yaml,
                         qc_stats_dir,
                         homogeneity_dir,
                         consensus_dir,
                         purity_dir,
                         pilon_dir){
    peprDB <- init_peprDB(db_path = db_path,create = TRUE)
    load_peprMeta(param_yaml, db_con = peprDB)
    load_metrics(qc_stats_dir, db_con = peprDB)
    #commenting out fastqc data not used in summary table, issues with Pacbio data
    #load_fastqc(qc_stats_dir, db_con = peprDB)
    load_varscan(homogeneity_dir, db_con = peprDB)
    load_depth(qc_stats_dir, db_con = peprDB)
    load_consensus(consensus_dir, db_con = peprDB, platforms = c("miseq","pgm"))
    coverage_table(db_con = peprDB)
    load_purity(purity_dir, db_con = peprDB)
    load_pilon(pilon_dir, db_con = peprDB)
}
usnistgov/peprr documentation built on May 3, 2019, 2:38 p.m.