workflows/TypeSeq2.R

#### A. load packages ####
library(TypeSeq2)
library(drake)
library(tidyverse)
library(parallel)
library(rmarkdown)
library(furrr)
library(future)
library(fs)
library(jsonlite)
library(optigrab)
library(magrittr)

drake::clean("variants_final_table")

command_line_args = tibble(
    manifest = optigrab::opt_get('manifest'),
    control_definitions = optigrab::opt_get('control_definitions'),
    grouping_defs = optigrab::opt_get('grouping_defs'),
    barcode_file = optigrab::opt_get('barcode_file'),
    tvc_parameters = optigrab::opt_get('tvc_parameters'),
    reference = optigrab::opt_get('reference'),
    region_bed = optigrab::opt_get('region_bed'),
    hotspot_vcf = optigrab::opt_get('hotspot_vcf'),
    is_torrent_server = optigrab::opt_get('is_torrent_server'),
    is_clinical = optigrab::opt_get('is_clinical'),
    offload = optigrab::opt_get('offload'),
    debug = optigrab::opt_get('debug'),
    start_plugin = optigrab::opt_get('start_plugin'),
    config_file = optigrab::opt_get('config_file'),
    config_set = optigrab::opt_get('config_set'), # this option is ignored in the server mode
    lineage_defs = optigrab::opt_get('lineage_defs'),
    scaling_table = optigrab::opt_get('scaling_table'),
    pn_filters = optigrab::opt_get('pn_filters'),
    internal_control_defs = optigrab::opt_get('internal_control_defs'),
    ram = optigrab::opt_get('ram'),
    cores = optigrab::opt_get('cores'),
    tvc_cores = optigrab::opt_get('tvc_cores'),
    filteringTable = optigrab::opt_get('filteringTable')) %>%
    glimpse()

#### Define workers
future::plan(multiprocess)

# num_cores = availableCores() - 1
num_cores = availableCores(methods="system") - 1

tvc_cores <- as.numeric(command_line_args$tvc_cores)

if(is.null(tvc_cores)){
	tvc_scores <- 4
}

# workers <- floor(num_cores/tvc_cores)
workers <- num_cores - 1


vcf_file_func <- function(sorted_bam, args_df){
    tmp_workers <- floor(num_cores/tvc_cores)
        plan_workers(tmp_workers, num_cores)
 
        rv <- sorted_bam %T>%
        map_df(~ system(paste0("cp ", args_df$reference, " ./"))) %T>%
        map_df(~ system(paste0("samtools faidx ", basename(args_df$reference)))) %>%
        split(.$sample) %>%
        future_map_dfr(tvc_cli, args_df) %>%
        glimpse()

        plan_workers(workers, num_cores)
        rv      
}

plan_workers <- function(workers, num_cores){
    if(is.null(workers) || workers<1){
	    workers <- 1
    }
    cat(sprintf("Number of cores: %d and workers: %d \n", num_cores, workers))
    future::plan(multicore, workers = workers)
}




#### B. create workflow plan ####
pkgconfig::set_config("drake::strings_in_dots" = "literals")
ion_plan <- drake::drake_plan(

    #### 1. adjust command line arguments ####
    args_df = get_command_line_args(command_line_args) %>%
        glimpse(),

    #### 2. parse plugin data ####
    user_files = startplugin_parse(args_df) %>%
        glimpse(),

    #### 3. demux bams ####
    demux_bam = adam_demux(user_files, args_df$ram, args_df$cores) %>%
        glimpse(),

    #### 4. split, sort, and index bams ####
    sorted_bam = demux_bam %>%
        split(.$sample) %>%
        future_map_dfr(samtools_sort) %>%
        glimpse(),

    #### 5. run tvc on demux bams ####
    vcf_files = vcf_file_func(sorted_bam, args_df), 

    #### 6. merge vcf files in to 1 table ####
    variant_table = vcf_files %>%
        filter(file_exists(vcf_out)) %>%
        split(.$vcf_out) %>%
        future_map_dfr(vcf_to_dataframe) %>%
        glimpse() %>%
        mutate_if(numCheck, ~ as.numeric(.)) %>%
        mutate(barcode = str_sub(filename, 5, 10)) %>%
        glimpse() %>%
        write_csv("variant_table.csv"),

    #### 7. joining variant table with sample sheet and write to file ####
    # variants_final_table = typing_variant_filter(variants = variant_table,
    #                                              lineage_defs = args_df$lineage_defs,
    #                                              manifest = user_files$manifest,
    #                                              specimen_control_defs = user_files$control_definitions,
    #                                              internal_control_defs = args_df$internal_control_defs,
    #                                              pn_filters = args_df$pn_filters,
    #                                              scaling_table = args_df$scaling_table, args_df$is_clinical == "yes" ) ,
    
    variants_final_table = typing_variant_filter2(variants=variant_table, args_df, user_files),
    #### 8. generate qc report ####
    ion_qc_report = render_ion_qc_report(variants_final_table = variants_final_table,
                                         args_df = args_df,
                                         manifest = user_files$manifest,
                                         control_for_report = read.csv("control_for_report"),
                                         samples_only_for_report = read.csv("samples_only_for_report"),
                                         detailed_pn_matrix_for_report = read.csv("detailed_pn_matrix_report"),
                                         read_count_matrix_report = read.csv("read_count_matrix_report"),
                                         pn_filters = read.csv("pn_filters_report"),
                                         specimen_control_defs = user_files$control_definitions,
                                         lineage_for_report = read.csv("lineage_for_report") ) ,

    #### 9. render_batch_qc_report
    ion_batch_report = render_batch_qc_report(variants_final_table = variants_final_table,
                                         args_df = args_df,
                                         manifest = user_files$manifest,
                                         control_for_report = read.csv("control_for_report"),
                                         samples_only_for_report = read.csv("samples_only_for_report"),
                                         detailed_pn_matrix_for_report = read.csv("detailed_pn_matrix_report"),
                                         read_count_matrix_report = read.csv("read_count_matrix_report"),
                                         pn_filters = read.csv("pn_filters_report"),
                                         specimen_control_defs = user_files$control_definitions,
                                         lineage_for_report = read.csv("lineage_for_report") ) ,

    #### 10. generate grouped pn_matrix           
    grouped_outputs = get_grouped_df(simple_pn_matrix_final = read.csv("pn_matrix_for_groupings"),
                                     groups_defs = user_files$grouping_defs,
                                     ion_qc_report = ion_qc_report),

    ### 11. collect run matrics
    run_metrics = collect_metrics(user_files, variants_final_table)


)  

#### C. execute workflow plan ####
system("mkdir vcf")


# future::plan(multicore, workers = workers)
plan_workers(workers, num_cores)

drake::make(ion_plan)


### Rename read_summary.csv
new_fn <- renaming_read_summary(readd(user_files))

### load run_metrics here 
run_metrics <- readd(run_metrics)


### Compress file here
# include more files here
system(sprintf("zip -r TypeSeq2_outputs.zip *.read_summary.csv *results.csv *QC_report.pdf *.batch_metrics_summary.csv *.run_metrics.csv Scaled_min-filters.csv control_definitions barcode_file grouping_file typing_manifest.csv *.full.csv %s.Table*.csv %s.*_plot_data.csv", get_output_prefix(), get_output_prefix() ))

is_debug <- (! is.na(command_line_args$debug)) && command_line_args$debug == "yes"
if(is_debug){
    cat ("Debug mode!\n\n")
}

if( ! is.na(command_line_args$is_clinical) ){
    # clinical modes

    # zip file for the lab
    system("zip -r TypeSeq2_outputs.laboratory.zip *.read_summary.csv *control_results.csv  *failed_samples_pn_matrix_results.csv *-pn_matrix_results.laboratory.csv *laboratory_report.pdf  control_definitions barcode_file grouping_file typing_manifest.csv *.laboratory.csv ")

    # encrypt the zip file
    gpg_status <- system(sprintf("gpg2 -e -R %s --batch --yes -o TypeSeq2_outputs.zip.pgp TypeSeq2_outputs.zip", command_line_args$is_clinical ))

    if(gpg_status == 0 || gpg_status == 2 ){
        # remove the unencrypted files containing clinical outcomes
        system(" ls *.Table*.csv *.full.csv *QC_report.pdf *_plot_data.csv *results.csv TypeSeq2_outputs.zip | grep -v -e control_results.csv -e failed_samples_pn_matrix_results.csv | xargs rm -f ")
    }
    # # hide all files by default
    # system("chmod -R  go-rxw *")
    # system("chmod go+r drmaa_stdout.txt") # allow to view log file via web portal

    # # allow the selected files to view
    # system("chmod go+r TypeSeq2_outputs.zip.pgp TypeSeq2_outputs.laboratory.zip *laboratory_report.pdf")
    if (! is.na(command_line_args$offload)){
        # Running at CGR lab as TypeSeq2_IMS
        cat("Running at the CGR lab in clinical mode!\n")

        folders <- unlist(strsplit(command_line_args$offload, split=','))
        
        print(folders)

        ### Transfer files to the target folders
        # cp *-control_results.csv *-pn_matrix_results.laboratory.csv to $folder1
        system(sprintf("cp *-control_results.csv *-pn_matrix_results.laboratory.csv %s", folders[1]))
        
        # make new folder $folder2/<run_id>/plugin_out/ and copy TypeSeq2_outputs.laboratory.zip file there
        # TS2B0000219.run_metrics.csv:"Analysis_Name","Auto_user_S5XL-0040-271-T00062845_PC_NP0626-TS9_TS2B0000219_720"
        new_dir <- sprintf("%s/%s/plugin_out", folders[2], run_metrics$Analysis_Name)
        system(sprintf("mkdir -p %s && cp TypeSeq2_outputs.laboratory.zip %s", new_dir, new_dir))
         

        # copy encrypted file to $folder3 /CGF/Sequencing/Analysis/ion_projects/TypeSeq2_IMS_Encrypted_Results
        # rename TypeSeq2_outputs.zip.pgp to TypeSeq2_outputs_TS2B0000238_TS2B0000248.zip.pgp
        # gsub(",", "_", metrics$Assay_Batch_Code)
        system(sprintf("cp TypeSeq2_outputs.zip.pgp %s/TypeSeq2_outputs_%s.zip.pgp", folders[3], gsub(",", "_", run_metrics$Assay_Batch_Code)))
    }
}else{
    # non-clinical mode: 
    if (! is.na(command_line_args$offload)){
        # Running at CGR lab as TypeSeq2
        cat("Running at the CGR lab in non-clinical mode!\n")
        folders <- unlist(strsplit(command_line_args$offload, split=','))
        
        ### Transfer files to the target folders
        # cp *-control_results.csv *-pn_matrix_results.csv to $folder1
        # system(sprintf("cp *-control_results.csv *-pn_matrix_results.csv %s", folders[1]))

        # revise: <batchid>-samples_only_matrix_results.csv <batchid>-control_results.csv
        system(sprintf("cp *-control_results.csv *-samples_only_matrix_results.csv %s", folders[1]))

        # make new folder $folder2/<run_id>/plugin_out/ and copy TypeSeq2_outputs.zip file there
        new_dir <- sprintf("%s/%s/plugin_out", folders[2], run_metrics$Analysis_Name)
        system(sprintf("mkdir -p %s && cp TypeSeq2_outputs.zip %s", new_dir, new_dir))
    }
}

#### E. make html block for torrent server ####
html_block = if ( command_line_args$is_torrent_server == "yes") {
    # system("cp /TypeSeq2/inst/typeseq2/torrent_server_html_block.R ./")
    system(paste0("cp ", system.file("typeseq2", "torrent_server_html_block.R",  package = "TypeSeq2"), " ./"))

    render("./torrent_server_html_block.R", output_dir = "./", params = list(is_clinical = !is.na(command_line_args$is_clinical)))
}


### Clean .drake if the drake workflow is completed successfully
if (length(failed()) == 0 && !is_debug){
    cat("Destroy .drake ...\n")
    cache <- get_cache()
    cache$destroy()
}
NCI-CGR/TypeSeq2 documentation built on Nov. 4, 2024, 2:04 p.m.