#' Wrapper for MarkDuplicatesSpark from gatk
#'
#' This function removes duplicated reads (artifacts) found in aligned sequences and sorts the output bam.
#'
#' @param bam Path to the input file with the aligned sequence.
#' @param sif_gatk Path to gatk executable. Default path tools/picard/build/libs/picard.jar.
#' @param output_dir Path to the output directory.
#' @param tmp_dir Path to tmp directory.
#' @param remove_duplicates Remove all sequencing duplicates from BAM file. Default TRUE.
#' @param threads Number of threads . Default 4
#' @param ram RAM memory. Default 4
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "mardupsGATK"
#' @param task_name Task name. Default "mardupsGATK"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param verbose [OPTIONAL] Enables progress messages. Default False.#
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] Hold job until job is finished. Job ID.
#' @import tidyverse
#' @export
markdups_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,bam="",output_dir=".",
verbose=FALSE,batch_config=build_default_preprocess_config(),
tmp_dir=".",threads=3,ram=4,remove_duplicates=TRUE,
executor_id=make_unique_id("markdupsGATK"),task_name="markdupsGATK",
mode="local",time="48:0:0",update_time=60,wait=FALSE,hold=NULL){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir,name="markdups_reports")
tmp=""
if (!tmp_dir==""){
tmp=paste0(" --tmp-dir ",tmp_dir)
}
dups=""
if(remove_duplicates){
dups=" --remove-all-duplicates"
}
out_file=paste0(out_file_dir,"/",get_file_name(bam),".sorted.rmdup.",
get_file_ext(bam))
out_file_md=paste0(out_file_dir,"/",get_file_name(bam),".gatk_rmdup.txt")
exec_code=paste0(" singularity exec -H ",getwd(),":/home ",sif_gatk," /gatk/gatk MarkDuplicatesSpark -I ",bam, " -O ", out_file,
" -M ",out_file_md," ",tmp," --conf \'spark.executor.cores=",threads,"\'", dups)
job=build_job(executor_id=executor_id,task_id=task_id)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,threads=threads,
output_dir=out_file_dir2,hold=hold)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("markdups failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
bam=out_file,
log=out_file_md
)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,
time=update_time,verbose=verbose,threads=threads)
}
return(job_report)
}
#' Function for base quality recalibration
#'
#' This function recalibrates the base quality of the reads in two steps process based on GATK best practices guides.
#'
#' @param bin_samtools [REQUIRED] Path to picard executable. Default path tools/samtools/samtools.
#' @param sif_gatk [REQUIRED] Path to picard executable. Default path tools/gatk/gatk.
#' @param bin_picard [REQUIRED] Path to picard executable. Default path tools/picard/build/libs/picard.jar.
#' @param bam [REQUIRED] Path to BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome.
#' @param dbsnp [REQUIRED] Known variant database.Requires atleast 1.
#' @param threads [OPTIONAL] Number of threads to split the work.
#' @param ram [OPTIONAL] RAM memory per thread.
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param clean Clean input files. Default TRUE.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalGATK"
#' @param task_id Task nam. Default "recalGATK"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param verbose [OPTIONAL] Enables progress messages. Default False.#
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
recal_gatk=function(
bin_samtools=build_default_tool_binary_list()$bin_samtools,
sif_gatk=build_default_sif_list()$sif_gatk,
bin_picard=build_default_tool_binary_list()$bin_picard,
bam="",ref_genome="",dbsnp="",ram=4,threads=4,output_dir=".",
tmp_dir=".",verbose=FALSE,batch_config=build_default_preprocess_config(),
executor_id=make_unique_id("recalGATK"),
task_name="recalGATK",clean=TRUE,mode="local",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir_before=set_dir(dir=output_dir,name="recal_reports/recal_before")
out_file_dir_after=set_dir(dir=output_dir,name="recal_reports/recal_after")
out_file_dir_main=set_dir(dir=output_dir,name="recal_reports")
job=build_job(executor_id=executor_id,task_id=task_id)
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=list(
main=out_file_dir_main,
after=out_file_dir_after,
before=out_file_dir_before
),
out_files=list(
)
)
job_report[["steps"]][["getChr"]] <- get_fai_reference_chr(
fasta=ref_genome,verbose=verbose,output_dir=tmp_dir,
batch_config=batch_config,executor_id=task_id,mode="local",
threads=threads,ram=ram,
time=time,update_time=update_time,wait=FALSE,hold=hold)
regions=read.table(job_report[["steps"]][["getChr"]]$out_files$ref,
sep="\t",header=TRUE)
job_report[["steps"]][["par_bqsr_before"]] <- parallel_generate_BQSR_gatk(
bin_samtools=bin_samtools,sif_gatk=sif_gatk,bam=bam,batch_config=batch_config,
ref_genome=ref_genome,dbsnp=dbsnp,regions=regions,
output_dir=out_file_dir_before,clean=clean,tmp_dir=tmp_dir,
verbose=verbose,executor_id=task_id,mode=mode,threads=threads,ram=ram,
time=time,update_time=update_time,wait=FALSE,
hold=hold)
job_report[["steps"]][["par_apply_bqsr"]] <- parallel_apply_BQSR_gatk(
bin_samtools=bin_samtools,sif_gatk=sif_gatk,bin_picard=bin_picard,
bam=bam,ref_genome=ref_genome,batch_config=batch_config,
rec_table=job_report[["steps"]][["par_bqsr_before"]][["steps"]][["gather_bqsr_report"]]$out_files$table,
output_dir=tmp_dir,regions=regions,tmp_dir=tmp_dir,
clean=clean,verbose=verbose,executor_id=task_id,
threads=threads,mode=mode,ram=ram,time=time,
update_time=update_time,wait=FALSE,
hold=unlist_lvl(job_report[["steps"]][["par_bqsr_before"]],var="job_id"))
job_report[["steps"]][["sort_and_index"]] <- sort_and_index_bam_samtools(
bin_samtools=bin_samtools,
bam=job_report[["steps"]][["par_apply_bqsr"]][["steps"]][["gather_bam"]]$out_files$bam,
output_dir=out_file_dir_main,batch_config=batch_config,
ram=ram,verbose=verbose,threads=threads,sort=TRUE,
stats="",index=TRUE,clean=clean,
mode=mode,executor_id=task_id,time=time,
update_time=update_time,wait=FALSE,
hold=unlist_lvl(job_report[["steps"]][["par_apply_bqsr"]],var="job_id"))
job_report[["steps"]][["par_bqsr_after"]] <- parallel_generate_BQSR_gatk(
bin_samtools=bin_samtools,sif_gatk=sif_gatk,tmp_dir=tmp_dir,batch_config=batch_config,
bam=job_report[["steps"]][["sort_and_index"]][["steps"]][["sort"]]$out_files$bam,
ref_genome=ref_genome,dbsnp=dbsnp,threads=threads,regions=regions,
output_dir=out_file_dir_after,verbose=verbose,executor_id=task_id,clean=clean,
mode=mode,ram=ram,time=time,update_time=update_time,
wait=FALSE,hold=unlist_lvl(job_report[["steps"]][["sort_and_index"]],var="job_id"))
job_report[["steps"]][["analyse_covariates"]] <- analyze_covariates_gatk(
sif_gatk=sif_gatk,tmp_dir=tmp_dir,batch_config=batch_config,
before=job_report[["steps"]][["par_bqsr_before"]][["steps"]][["gather_bqsr_report"]]$out_files$table,
after=job_report[["steps"]][["par_bqsr_after"]][["steps"]][["gather_bqsr_report"]]$out_files$table,
output_dir=out_file_dir_main,executor_id=task_id,mode=mode,threads=threads,
ram=ram,time=time,update_time=update_time,wait=FALSE,
hold=unlist_lvl(job_report[["steps"]][["par_bqsr_after"]],var="job_id"))
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,
time=update_time,verbose=verbose,threads=threads)
}
return(job_report)
}
#' Wrapper of BaseRecalibrator function from gatk
#'
#' Generates a recalibration table based on various covariates.
#' This function wraps around gatk BaseRecalibrator function.
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360036898312-BaseRecalibrator
#'
#' @param bam [REQUIRED] Path to the BAM file.
#' @param sif_gatk [REQUIRED] Path to gatk executable. Default tools/gatk/gatk.
#' @param ref_genome [REQUIRED] Path to reference genome
#' @param dbsnp [REQUIRED] Path to known snp positions in VCF format. Multiple vcf can be supplied as a vector.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "generateBQSR"
#' @param task_id Task NAME ID. Default "generateBQSR"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param threads [OPTIONAL] Number of threads to split the work.
#' @param ram [OPTIONAL] If batch mode. RAM memory in GB per job. Default 1
#' @param update_time [OPTIONAL] If batch mode. Show job updates every update time. Default 60
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
generate_BQSR_gatk=function(
region="",rdata=NULL,selected=NULL,
sif_gatk=build_default_sif_list()$sif_gatk,bam="",
ref_genome="",dbsnp="",output_dir=".",tmp_dir=".",verbose=FALSE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("generateBQSR"),task_name="generateBQSR",
time="48:0:0",update_time=60,wait=FALSE,hold=NULL
){
if(!is.null(rdata)){
load(rdata)
if(!is.null(selected)){
region=region_list[selected]
}
}
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
if(tmp_dir!=""){
tmp_dir=paste0(" --tmp-dir ",tmp_dir)
}
reg=""
if (region==""){
out_file=paste0(out_file_dir,get_file_name(bam),".recal.table")
}else{
reg=paste0(" -L ",region)
out_file=paste0(out_file_dir,get_file_name(bam),".",region,".recal.table")
}
## Multiple vcf with snps can be given
if (dbsnp!=""){
dbsnp=paste(" --known-sites ",dbsnp,collapse=" ")
}
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk," /gatk/gatk BaseRecalibrator -I ",bam, " -R ", ref_genome, dbsnp,
reg," -O ",out_file,tmp_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,threads=threads,
output_dir=out_file_dir2,hold=hold)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
recal_table=out_file
)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,
time=update_time,verbose=verbose,threads=threads)
}
return(job_report)
}
#' Multiregion parallelization of generate_BQSR function
#'
#' Generates a recalibration table based on various covariates.
#' This function wraps around gatk BaseRecalibrator function.
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360036898312-BaseRecalibrator
#'
#' @param bam [REQUIRED] Path to the BAM file.
#' @param bin_samtools [REQUIRED] Path to gatk executable. Default tools/samtools/samtools.
#' @param sif_gatk [REQUIRED] Path to gatk executable. Default tools/gatk/gatk.
#' @param ref_genome [REQUIRED] Path to reference genome
#' @param dbsnp [REQUIRED] Path to known snp positions in VCF format. Multiple vcf can be supplied as a vector.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param regions [OPTIONAL] Regions to parallelize through.
#' @param clean Remove intermediary files. Default TRUE
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "par_generateBQSR"
#' @param task_name Task name. Default "par_generateBQSR"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param threads Number of threads to split the work. Default 3
#' @param ram [OPTIONAL] If batch mode. RAM memory in GB per job. Default 1
#' @param update_time [OPTIONAL] If batch mode. Show job updates every update time. Default 60
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
#' @import pbapply
parallel_generate_BQSR_gatk=function(
bin_samtools=build_default_tool_binary_list()$bin_samtools,
sif_gatk=build_default_sif_list()$sif_gatk,bam="",
regions=NULL,ref_genome="", clean=TRUE, dbsnp="",
tmp_dir=".",threads=3,ram=4,
executor_id=make_unique_id("par_generateBQSR"),
task_name="par_generateBQSR",output_dir=".",
verbose=FALSE,batch_config=build_default_preprocess_config(),
mode="local",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL){
options(scipen = 999)
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
if(is.null(regions)){
job_report[["steps"]][["getChr"]] <- get_bam_reference_chr(
bin_samtools=bin_samtools,
bam=bam,verbose=verbose,output_dir=tmp_dir,
executor_id=task_id,mode="local",threads=threads,ram=ram,
time=time,update_time=update_time,wait=FALSE,hold=hold)
regions=read.table(job_report[["steps"]][["getChr"]]$out_files$ref,
sep="\t",header=TRUE)
}
regions$start=regions$start+1
regions=regions %>% dplyr::mutate(region=paste0(chr,":",start,"-",end))
region_list=regions$region
names(region_list)=regions$region
if(mode=="local"){
job_report[["steps"]][["generate_bqsr_report"]]<-parallel::mclapply(
region_list,FUN=function(region){
job_report <- generate_BQSR_gatk(
region=region,tmp_dir=tmp_dir,batch_config=batch_config,
sif_gatk=sif_gatk,bam=bam,ref_genome=ref_genome,
dbsnp=dbsnp,output_dir=out_file_dir,verbose=verbose,
executor_id=task_id
)
},mc.cores=threads)
}else if(mode=="batch"){
rdata_file=paste0(tmp_dir,"/",job,".regions.RData")
executor_id=task_id
save(
region_list,
sif_gatk,bam,
ref_genome,dbsnp,
output_dir,
executor_id,
verbose,
tmp_dir,
file = rdata_file
)
exec_code=paste0("Rscript -e \"ULPwgs::generate_BQSR_gatk(rdata=\\\"",
rdata_file,"\\\",selected=$SGE_TASK_ID)\"")
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,
threads=2,output_dir=out_file_dir2,
hold=hold,array=length(region_list))
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report[["steps"]][["generate_bqsr_report"]]<- build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
recal_table=paste0(out_file_dir,get_file_name(bam),".",region_list,".recal.table")
)
)
}
job_report[["steps"]][["gather_bqsr_report"]]<- gather_BQSR_reports_gatk(
sif_gatk=sif_gatk,
report=unlist_lvl(named_list=job_report[["steps"]][["generate_bqsr_report"]],var="recal_table"),
executor_id=task_id,tmp_dir=tmp_dir,output_dir=out_file_dir,clean=clean,
output_name=get_file_name(bam),verbose=verbose,mode=mode,time=time,
threads=threads,ram=ram,update_time=update_time,wait=FALSE,
hold=unlist_lvl(named_list=job_report[["steps"]][["generate_bqsr_report"]],var="job_id",recursive=TRUE))
if(wait&&mode=="batch"){
job_validator(job=unlist_lvl(named_list=job_report[["steps"]],var="job_id"),
time=update_time,verbose=verbose,threads=threads)
}
return(job_report)
}
#' Wrapper around gatk GatherBQSRReports function
#'
#' This functions collects the Recalibration reports generated from scattered parallel_generate_BQSR output
#' This function wraps around gatk GatherBQSRReports function.
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360036829851-GatherBQSRReports
#'
#' @param sif_gatk [REQUIRED] Path to gatk executable. Default tools/gatk/gatk.
#' @param report [REQUIRED] Path to indivual reports.
#' @param output_name [OPTIONAL] Name for the output report file.
#' @param clean Clean input files. Default TRUE.
#' @param executor_id Task EXECUTOR ID. Default "gatherBQSR"
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param task_name Task name. Default "gatherBQSR"
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param tmp_dir [OPTIONAL] Path to temporary directory.
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param threads Number of threads to split the work. Default 3
#' @param ram [OPTIONAL] If batch mode. RAM memory in GB per job. Default 1
#' @param update_time [OPTIONAL] If batch mode. Show job updates every update time. Default 60
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
gather_BQSR_reports_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
report="",output_name="Report",clean=FALSE,
output_dir=".",tmp_dir=".",verbose=FALSE,
executor_id=make_unique_id("gatherBQSR"),
task_name="gatherBQSR",
batch_config=build_default_preprocess_config(),
mode="local",time="48:0:0",
threads=4,ram=4,update_time=60,wait=FALSE,hold=NULL){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
if(tmp_dir!=""){
tmp_dir=paste0(" --tmp-dir ",tmp_dir)
}
out_file=paste0(out_file_dir,output_name,".recal.table")
exec_code=paste0("singularity exec -H ",getwd(),":/home " ,sif_gatk,
" /gatk/gatk GatherBQSRReports ",paste(" -I ",report,collapse=" "),
" -O ",out_file,tmp_dir)
if(clean){
exec_code=paste(exec_code," && rm",paste(report,collapse=" "))
}
job=build_job(executor_id = executor_id,task_id=task_id)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,
time=time,ram=ram,threads=threads,
output_dir=out_file_dir2,hold=hold)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
table=out_file)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,verbose=verbose,
threads=threads)
}
return(job_report)
}
#' Wrapper of applyBQSR function gatk
#'
#' Applies numerical corrections to each individual basecall based on the covariates analyzed before.
#' This function wraps around gatk applyBQSR function.
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360050814312-ApplyBQSR
#'
#' @param bam [REQUIRED] Path to the BAM file.
#' @param sif_gatk [REQUIRED] Path to gatk executable. Default tools/gatk/gatk.
#' @param ref_genome [REQUIRED] Path to reference genome
#' @param rec_table [REQUIRED] Path to covariates table generated by generate_BSQR.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param threads Number of threads to split the work. Default 3
#' @param ram [OPTIONAL] If batch mode. RAM memory in GB per job. Default 1
#' @param executor_id [OPTIONAL] Task executor name. Default "applyBQSR"
#' @param task_name [OPTIONAL] Task name. Default "applyBQSR"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param ram [OPTIONAL] If batch mode. RAM memory in GB per job. Default 1
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
apply_BQSR_gatk=function(
region="",rdata=NULL,selected=NULL,
sif_gatk=build_default_sif_list()$sif_gatk,bam="",ref_genome="",
rec_table="",output_dir=".",verbose=FALSE,tmp_dir=".",
batch_config=build_default_preprocess_config(),mode="local", threads=4,ram=4,
executor_id=make_unique_id("applyBQSR"),task_name="applyBQSR",time="48:0:0",
update_time=60,wait=TRUE,hold=NULL){
if(!is.null(rdata)){
load(rdata)
if(!is.null(selected)){
region=region_list[selected]
}
}
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
if(tmp_dir!=""){
tmp_dir=paste0(" --tmp-dir ",tmp_dir)
}
reg=""
if (region==""){
out_file=paste0(" ", out_file_dir,"/", get_file_name(bam),".recal.",get_file_ext(bam))
}else{
reg=paste0(" -L ",strsplit(region,"__")[[1]][2], " ")
out_file=paste0(out_file_dir,"/", get_file_name(bam),".",region,".recal.",get_file_ext(bam))
}
exec_code=paste("singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk ApplyBQSR -I ",bam, " -R ", ref_genome,
" --bqsr-recal-file ",rec_table, " -O ",out_file,reg)
job=build_job(executor_id=executor_id,task_id=task_id)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,threads=threads,
output_dir=out_file_dir2,hold=hold)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
recal_bam=out_file,
recal_bai=sub(".bam",".bai",out_file)
)
)
if(wait&&mode=="batch"){
job_validator(job=job,
time=update_time,verbose=verbose,threads=threads)
}
return(job_report)
}
#' Multiregion parallelization of apply_BQSR function
#'
#' Recalibrates
#' Applies numerical corrections to each individual basecall based on the covariates analyzed before.
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360050814312-ApplyBQSR
#'
#' @param bam [REQUIRED] Path to the BAM file.
#' @param bin_samtools [REQUIRED] Path to samtools executable. Default tools/samtools/samtools.
#' @param sif_gatk [REQUIRED] Path to gatk executable. Default tools/gatk/gatk.
#' @param bin_picard [REQUIRED] Path to picard executable. Default tools/picard/build/libs/picard.jar
#' @param ref_genome [REQUIRED] Path to reference genome
#' @param rec_table [REQUIRED] Path to the recalibratio table.
#' @param clean Clean intermediary files Default TRUE
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param regions [OPTIONAL] Regions to parallelize through.
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id [OPTIONAL] Task executor name. Default "par_applyBQSR"
#' @param task_name [OPTIONAL] Task name. Default "par_applyBQSR"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param threads [OPTIONAL] Number of threads for the main job. Default 4
#' @param ram [OPTIONAL] If batch mode. RAM memory in GB per job. Default 1
#' @param update_time [OPTIONAL] If batch mode. Show job updates every update time. Default 60
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
#' @import pbapply
parallel_apply_BQSR_gatk=function(
bin_samtools=build_default_tool_binary_list()$bin_samtools,
sif_gatk=build_default_sif_list()$sif_gatk,
bin_picard=build_default_tool_binary_list()$bin_picard,
bam="",regions=NULL,ref_genome="",rec_table="",clean=TRUE,
output_dir=".",verbose=FALSE,tmp_dir=".",
batch_config=build_default_preprocess_config(),mode="local",
executor_id=make_unique("par_applyBQSR"),
task_name="par_applyBQSR",
time="48:0:0",threads=4,ram=4,
update_time=60,wait=FALSE, hold=NULL){
options(scipen = 999)
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
if(is.null(regions)){
job_report[["steps"]][["getChr"]] <- get_bam_reference_chr(
bin_samtools=bin_samtools,
bam=bam,verbose=verbose,output_dir=tmp_dir,
executor_id=task_id,mode="local",threads=threads,ram=ram,
time=time,update_time=update_time,wait=FALSE,hold=hold)
regions=read.table(job_report[["steps"]][["getChr"]]$out_files$ref,
sep="\t",header=TRUE)
}
regions$start=regions$start+1
regions$pos=1:nrow(regions)
regions=regions %>% dplyr::mutate(region=paste0(pos,"__",chr,":",start,"-",end))
job=build_job(executor_id=executor_id,task_id=task_id)
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
region_list=regions$region
names(region_list)=regions$region
if(mode=="local"){
job_report[["steps"]][["apply_bqsr"]]=parallel::mclapply(
region_list,FUN=function(region){
job_report<-apply_BQSR_gatk(
region=region,batch_config=batch_config,
sif_gatk=sif_gatk,bam=bam,ref_genome=ref_genome,
executor_id=executor_id,rec_table=rec_table,tmp_dir=tmp_dir,
output_dir=out_file_dir,verbose=verbose)},
mc.cores=threads)
}else if(mode=="batch"){
rdata_file=paste0(tmp_dir,"/",job,".regions.RData")
executor_id=task_id
save(
region_list,
sif_gatk,
bam,
ref_genome,
rec_table,
output_dir,
executor_id,
verbose,
tmp_dir,
file = rdata_file
)
exec_code=paste0("Rscript -e \"ULPwgs::apply_BQSR_gatk(rdata=\\\"",
rdata_file,"\\\",selected=$SGE_TASK_ID)\"")
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,
threads=2,output_dir=out_file_dir2,
hold=hold,array=length(region_list))
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report[["steps"]][["apply_bqsr"]] <- build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
recal_bam=paste0(out_file_dir,"/", get_file_name(bam),".",region_list,
".recal.",get_file_ext(bam))
)
)
}
job_report[["steps"]][["gather_bam"]]=gather_bam_files_picard(
bin_picard=bin_picard,
bam=unlist_lvl(job_report[["steps"]][["apply_bqsr"]],var="recal_bam"),
output_dir=out_file_dir,
output_name=paste0(get_file_name(bam),".recal.sorted.rmdup.sorted"),
executor_id=task_id,mode=mode,time=time,threads=threads,ram=ram,
update_time=update_time,wait=FALSE,
clean=clean,
hold=unlist_lvl(job_report[["steps"]][["apply_bqsr"]],var="job_id",recursive=TRUE))
if(wait&&mode=="batch"){
job_validator(job=unlist_lvl(named_list=job_report[["steps"]][["apply_bqsr"]],
var="job_id"),time=update_time,verbose=verbose,threads=threads)
}
return(job_report)
}
#' Wrapper of AnalyzeCovariates function in gatk
#'
#' Generates a report of the recalibrated values.
#' This function wraps around gatk AnalyzeCovariates function.
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360037066912-AnalyzeCovariates
#'
#' @param sif_gatk [REQUIRED] Path to gatk executable. Default tools/gatk/gatk.
#' @param before [REQUIRED] Recalibration table produced by generate_BQSR function.
#' @param after [OPTIONAL] Recalibration table produced by generate_BQSR function.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
analyze_covariates_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,before="",after="",
output_dir=".",verbose=FALSE,tmp_dir=".",
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("recalCovariates"),
task_name="recalCovariates",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
if(tmp_dir!=""){
tmp_dir=paste0(" --tmp-dir ",tmp_dir)
}
if (before!="" & after==""){
out_file=paste0(out_file_dir,"/",get_file_name(before),
"_covariates_analysis_before.pdf")
out_file_csv=paste0(out_file_dir,"/",get_file_name(before),
"_covariates_analysis_before.csv")
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk AnalyzeCovariates -bqsr ",before, " -plots ",out_file,tmp_dir," -csv ",out_file_csv)
}else if(before=="" & after!=""){
out_file=paste0(out_file_dir,"/",get_file_name(after),
"_covariates_analysis_after.pdf")
out_file_csv=paste0(out_file_dir,"/",get_file_name(after),
"_covariates_analysis_after.csv")
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk AnalyzeCovariates -bqsr ",after, " -plots ",out_file,tmp_dir," -csv ",out_file_csv)
}else{
out_file=paste0(out_file_dir,"/",get_file_name(before),"_covariates_analysis.pdf")
out_file_csv=paste0(out_file_dir,"/",get_file_name(before),"_covariates_analysis.csv")
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk AnalyzeCovariates -before ",before," -after ",after,
" -plots ",out_file,tmp_dir," -csv ",out_file_csv)
}
job=build_job(executor_id=executor_id,task=task_id)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
plot=out_file)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
#' Variant Calling using Mutect2
#'
#' This function functions calls Mutect2 for variant calling.
#' If a vector of tumour samples are provided these will be processed in multi-sample mode.
#' To run in tumour-normal mode suppply a single tumour and normal sample.
#' If no normal is supplied this will run in tumour only.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037593851-Mutect2
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param tumour [REQUIRED] Path to tumour BAM file.
#' @param normal [OPTIONAL] Path to normal BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param rdata [OPTIONAL] Import R data information with list of BAM.
#' @param selected [OPTIONAL] Select BAM from list.
#' @param germ_resource [REQUIRED]Path to germline resources vcf file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of the first tumour sample of the samples will be used.
#' @param pon [OPTIONAL] Path to panel of normal.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param mnps [OPTIONAL] Report MNPs in vcf file.
#' @param contamination [OPTIONAL] Produce sample cross-contamination reports. Default TRUE.
#' @param orientation [OPTIONAL] Produce read orientation inforamtion. Default FALSE
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
mutect2_gatk=function(region="",
rdata=NULL,selected=NULL,
sif_gatk=build_default_sif_list()$sif_gatk,
tumour="",normal=NA,output_name="",
ref_genome=build_default_reference_list()$HG19$reference$genome,
germ_resource=build_default_reference_list()$HG19$variant$germ_reference,
pon="",output_dir=".",tmp_dir=".",
verbose=FALSE,orientation=FALSE,mnps=FALSE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("Mutect2"),
task_name="Mutect2",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
if(!is.null(rdata)){
load(rdata)
if(!is.null(selected)){
region=region_list[selected]
}
}
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir,name="vcf")
job=build_job(executor_id=executor_id,task=task_id)
if(tmp_dir!=""){
tmp_dir=paste0(" --tmp-dir ",tmp_dir)
}
id=""
if(output_name!=""){
id=output_name
}else{
id=get_file_name(tumour[1])
}
reg=""
if (region==""){
out_file=paste0(out_file_dir,"/",id,".unfilt.vcf")
}else{
reg=paste0(" -L ",region)
id=paste0(id,".",region)
out_file=paste0(out_file_dir,"/",id,".unfilt.vcf")
}
if (is.vector(tumour)){
tumour=paste0(" -I ",paste(normalizePath(tumour),collapse=" -I "))
}else{
tumour=paste0(" -I ",normalizePath(tumour))
}
norm=" "
if (!is.null(normal)){
if (is.vector(normal)){
norm=paste0(" -I ",paste(normalizePath(normal),collapse=" -I ")," -normal ",
paste(Vectorize(get_file_name)(normal),collapse=" -normal "))
}else{
norm=paste0(" -I ",normalizePath(normal)," -normal ",get_file_name(normal))
}
}
if (pon!=""){
pon=paste0(" --panel-of-normals ",pon)
}
f1r2=""
if (orientation){
out_file_dir_ort=set_dir(dir=output_dir,name="orientation")
out_file2=paste0(out_file_dir_ort,"/",id,".f1r2.tar.gz")
f1r2=paste0(" --f1r2-tar-gz ",out_file2)
}
filter_mnps=""
if (mnps){
filter_mnps=" -max-mnp-distance 0 "
}
exec_code=paste("singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk Mutect2 -R ",ref_genome,tumour, norm,
" --germline-resource ",germ_resource, pon, " -O ",out_file, reg,f1r2,filter_mnps)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
vcf=out_file,
stats=paste0(out_file,".stats"),
idx=paste0(out_file,".idx"),
f1r2=out_file2)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
#' Multiregion parallelization across Mutect2 Gatk Variant Calling
#'
#' This function functions calls Mutect2 across multiple regions in parallel.
#' If a vector of tumour samples are provided these will be processed in multi-sample mode.
#' To run in tumour-normal mode suppply a single tumour and normal sample.
#' If no normal is supplied this will run in tumour only.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037593851-Mutect2
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param bin_bcftools [REQUIRED] Path to bcftools binary file.
#' @param bin_bgzip [REQUIRED] Path to bgzip binary file.
#' @param bin_vep [REQUIRED] Path to VEP binary.
#' @param bin_tabix [REQUIRED] Path to tabix binary file.
#' @param bin_samtools [REQUIRED] Path to samtools binary file.
#' @param tumour [REQUIRED] Path to tumour BAM file.
#' @param normal [OPTIONAL] Path to normal BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param germ_resource [REQUIRED]Path to germline resources vcf file.
#' @param regions [OPTIONAL] Regions to analyze. If regions for parallelization are not provided then these will be infered from BAM file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param pon [OPTIONAL] Path to panel of normal.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param mnps [OPTIONAL] Report MNPs in vcf file.
#' @param extract_pass [OPTIONAL] Extract variants with a PASS filter.Default TRUE
#' @param annotate [OPTIONAL] Annotate variants. Default TRUE
#' @param contamination [OPTIONAL] Produce sample cross-contamination reports. Default TRUE.
#' @param orientation [OPTIONAL] Produce read orientation inforamtion. Default FALSE
#' @param filter [OPTIONAL] Filter Mutect2. Default TRUE.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
parallel_regions_mutect2_gatk=function(
rdata=NULL,selected=NULL,
sif_gatk=build_default_sif_list()$sif_gatk,
bin_bcftools=build_default_tool_binary_list()$bin_bcftools,
bin_vep=build_default_tool_binary_list()$bin_vep,
bin_samtools=build_default_tool_binary_list()$bin_samtools,
bin_bgzip=build_default_tool_binary_list()$bin_bgzip,
bin_tabix=build_default_tool_binary_list()$bin_tabix,
tumour="",normal="",output_name="",
chr=c(1:22,"X","Y","MT"),
ref_genome=build_default_reference_list()$HG19$reference$genome,
germ_resource=build_default_reference_list()$HG19$variant$germ_reference,
biallelic_db=build_default_reference_list()$HG19$variant$biallelic_reference,
db_interval=build_default_reference_list()$HG19$variant$biallelic_reference,
regions=NULL,pon="",output_dir=".",
verbose=FALSE,filter=TRUE,
extract_pass=TRUE,annotate=TRUE,
orientation=TRUE,mnps=FALSE,
contamination=TRUE,clean=FALSE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("parRegionMutect2"),
task_name="parRegionMutect2",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
if(!is.null(rdata)){
load(rdata)
if(!is.null(selected)){
tumour=tumour_list[selected]
}
}
id=""
if(output_name!=""){
id=output_name
}else{
id=get_file_name(tumour)
}
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir,name=paste0("mutect2_reports/",id))
tmp_dir=set_dir(dir=out_file_dir,name="tmp")
job=build_job(executor_id=executor_id,task_id=task_id)
jobs_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
if(is.null(regions)){
jobs_report[["steps"]][["getChr"]] <- get_bam_reference_chr(
bin_samtools=bin_samtools,
bam=normalizePath(tumour[1]),verbose=verbose,output_dir=tmp_dir,
output_name=paste0(get_file_name(tumour[1]),"_Ref"),
executor_id=task_id,mode="local",threads=threads,ram=ram,
time=time,update_time=update_time,wait=FALSE,hold=hold)
regions=read.table(jobs_report[["steps"]][["getChr"]]$out_files$ref,
sep="\t",header=TRUE)
hold=jobs_report[["steps"]][["getChr"]]$job_id
}
regions$start=regions$start+1
regions=regions %>% dplyr::mutate(region=paste0(chr,":",start,"-",end))
##### Ignore regions that are found in chromosome we are not interested
if(!is.null(chr)){
regions=regions[regions$chr %in% chr,]
}
region_list=regions$region
names(region_list)=regions$region
if(mode=="local"){
jobs_report[["steps"]][["par_region_call_variants"]]<-
parallel::mclapply(region_list,FUN=function(region){
job_report <- mutect2_gatk(
sif_gatk=sif_gatk,
region=region,
tumour=normalizePath(tumour),
normal=normalizePath(normal),
output_name=id,
ref_genome=normalizePath(ref_genome),
germ_resource = normalizePath(germ_resource),
pon=normalizePath(pon),output_dir=tmp_dir,tmp_dir=tmp_dir,
verbose=verbose,orientation=orientation,mnps=mnps,
executor_id=task_id)
},mc.cores=threads)
}else if(mode=="batch"){
rdata_file=paste0(tmp_dir,"/",job,".regions.RData")
output_dir=tmp_dir
executor_id=task_id
output_name=id
save(
region_list,
normalizePath(tumour),
normalizePath(normal),
sif_gatk,
normalizePath(ref_genome),
output_name,
normalizePath(germ_resource),
normalizePath(pon),
orientation,
mnps,
executor_id,
output_dir,
verbose,tmp_dir,
file = rdata_file)
exec_code=paste0("Rscript -e \"ULPwgs::mutect2_gatk(rdata=\\\"",
rdata_file,"\\\",selected=$SGE_TASK_ID)\"")
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,
threads=2,output_dir=out_file_dir2,
hold=hold,array=length(region_list))
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
jobs_report[["steps"]][["par_region_call_variants"]]<- build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args=argg,
out_file_dir=tmp_dir,
out_files=list(
vcf=paste0(tmp_dir,"/vcf/",id,".",region_list,".unfilt.vcf"),
stats=paste0(tmp_dir,"/vcf/",id,".",region_list,".unfilt.vcf.stats"),
idx=paste0(tmp_dir,"/vcf/",id,".",region_list,".unfilt.vcf.idx"),
f1r2=paste0(tmp_dir,"/orientation/",id,".",region_list,".f1r2.tar.gz")
)
)
}
vcfs=unlist_lvl(jobs_report[["steps"]][["par_region_call_variants"]],var="vcf")
stats=unlist_lvl(jobs_report[["steps"]][["par_region_call_variants"]],var="stats")
f1r2=unlist_lvl(jobs_report[["steps"]][["par_region_call_variants"]],var="f1r2")
hold=unlist_lvl(jobs_report,var="job_id")
jobs_report[["steps"]][["gatherFilesGatk"]]<-gather_mutect2_gatk(
sif_gatk=sif_gatk,
bin_samtools=bin_samtools,
bin_bcftools=bin_bcftools,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
vcfs=normalizePath(vcfs),
stats=normalizePath(stats),
f1r2=normalizePath(f1r2),
clean=clean,
output_name=output_name,
output_dir=out_file_dir,tmp_dir=tmp_dir,
verbose=verbose,orientation=orientation,
batch_config=batch_config,
threads=threads,ram=ram,mode=mode,
executor_id=task_id,
time=time,
hold=hold
)
vcf=unlist_lvl(jobs_report[["steps"]][["gatherFilesGatk"]],var="sorted_vcf")
stats=unlist_lvl(jobs_report[["steps"]][["gatherFilesGatk"]],var="merged_stats")
if(orientation){
orientation_model=unlist_lvl(jobs_report[["steps"]][["gatherFilesGatk"]],var="orientation_model")
}else{
orientation_model=""
}
if(contamination){
jobs_report[["steps"]][["estimateContaminationGatk"]]<- parallel_estimate_contamination_gatk(
sif_gatk=sif_gatk,
tumours=normalizePath(tumour),
normal=normalizePath(normal),
output_dir=out_file_dir,
verbose=verbose,tmp_dir=tmp_dir,
batch_config=batch_config,
threads=threads,ram=ram,mode=mode,
executor_id=task_id,
time=time,
hold=hold
)
contamination_table=unlist_lvl(jobs_report[["steps"]][["estimateContaminationGatk"]],var="contamination_table")
segmentation_table=unlist_lvl(jobs_report[["steps"]][["estimateContaminationGatk"]],var="segmentation_table")
}else{
contamination_table=""
segmentation_table=""
}
if(filter){
jobs_report[["steps"]][["filterMutectGatk"]]<- mutect_filter_gatk(
sif_gatk=sif_gatk,
bin_bcftools=bin_bcftools,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
vcf=normalizePath(vcf),stats=normalizePath(stats),contamination_table=normalizePath(contamination_table),
segmentation_table=normalizePath(segmentation_table),
orientation_model=normalizePath(orientation_model),output_name=output_name,
ref_genome=normalizePath(ref_genome),
extract_pass=extract_pass,
output_dir=out_file_dir,
verbose=verbose,clean=clean,
batch_config=batch_config,
threads=1,ram=ram,mode=mode,
executor_id=task_id,
time=time,
hold=unlist_lvl(jobs_report[["steps"]],var="job_id")
)
if(extract_pass){
vcf=unlist_lvl(jobs_report[["steps"]][["filterMutectGatk"]],var="extract_vcf")
}else{
vcf=unlist_lvl(jobs_report[["steps"]][["filterMutectGatk"]],var="filtered_vcf")
}
}
if(annotate){
jobs_report[["steps"]][["filterMutectGatk"]]<- annotate_vep(
bin_vep=bin_vep,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
vcf=normalizePath(vcf),
output_name=paste0(get_file_name(vcf),ifelse(extract_pass,".PASS","")),
output_dir=out_file_dir,
verbose=verbose,
batch_config=batch_config,
threads=threads,
ram=ram,mode=mode,
executor_id=task_id,
time=time,
hold=unlist_lvl(jobs_report[["steps"]],var="job_id")
)
}
if(wait&&mode=="batch"){
job_validator(job=unlist_lvl(jobs_report[["steps"]],var="job_id"),time=update_time,
verbose=verbose,threads=threads)
}
return(jobs_report)
}
#' Multiregion parallelization across Mutect2 Gatk Variant Calling
#'
#' This function functions calls Mutect2 across multiple regions in parallel.
#' If a vector of tumour samples are provided these will be processed in multi-sample mode.
#' To run in tumour-normal mode suppply a single tumour and normal sample.
#' If no normal is supplied this will run in tumour only.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037593851-Mutect2
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param bin_bcftools [REQUIRED] Path to bcftools binary file.
#' @param bin_vep [REQUIRED] Path to VEP binary.
#' @param bin_bgzip [REQUIRED] Path to bgzip binary file.
#' @param bin_tabix [REQUIRED] Path to tabix binary file.
#' @param bin_samtools [REQUIRED] Path to samtools binary file.
#' @param tumour [REQUIRED] Path to tumour BAM file.
#' @param normal [OPTIONAL] Path to normal BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param chr [OPTIONAL] Chromosomes to analyze. Default c(1:22,"X","Y","MT")
#' @param germ_resource [REQUIRED]Path to germline resources vcf file.
#' @param regions [OPTIONAL] Regions to analyze. If regions for parallelization are not provided then these will be infered from BAM file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param pon [OPTIONAL] Path to panel of normal.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param mnps [OPTIONAL] Report MNPs in vcf file.
#' @param extract_pass [OPTIONAL] Extract variants with a PASS filter.Default TRUE
#' @param annotate [OPTIONAL] Annotate variants. Default TRUE
#' @param method [OPTIONAL] Default variant calling method. Default single. Options ["single","multi"]
#' @param contamination [OPTIONAL] Produce sample cross-contamination reports. Default TRUE.
#' @param orientation [OPTIONAL] Produce read orientation inforamtion. Default FALSE
#' @param filter [OPTIONAL] Filter Mutect2. Default TRUE.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
parallel_samples_mutect2_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bin_vep=build_default_tool_binary_list()$bin_vep,
bin_bcftools=build_default_tool_binary_list()$bin_bcftools,
bin_samtools=build_default_tool_binary_list()$bin_samtools,
bin_bgzip=build_default_tool_binary_list()$bin_bgzip,
bin_tabix=build_default_tool_binary_list()$bin_tabix,
tumour="",normal="",patient_id="",
chr=c(1:22,"X","Y","MT"),
ref_genome=build_default_reference_list()$HG19$reference$genome,
germ_resource=build_default_reference_list()$HG19$variant$germ_reference,
biallelic_db=build_default_reference_list()$HG19$variant$biallelic_reference,
db_interval=build_default_reference_list()$HG19$variant$biallelic_reference,
regions=NULL,pon="",output_dir=".",
method="single",
verbose=FALSE,filter=TRUE,
orientation=TRUE,mnps=FALSE,
annotate=TRUE,extract_pass=TRUE,
contamination=TRUE,clean=FALSE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("parSamplesMutect2"),
task_name="parSamplesMutect2",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir,name=patient_id)
tmp_dir=set_dir(dir=out_file_dir,name="tmp")
job=build_job(executor_id=executor_id,task_id=task_id)
jobs_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
if(method=="single"){
tumour_list=tumour
names(tumour_list)=Vectorize(get_file_name)(tumour)
if(mode=="local"){
jobs_report[["steps"]][["par_sample_call_variants"]]<-
parallel::mclapply(tumour_list,FUN=function(tumour){
job_report <- parallel_regions_mutect2_gatk(
sif_gatk=sif_gatk,
bin_bcftools=bin_bcftools,
bin_samtools=bin_samtools,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
regions=regions,
output_name=get_file_name(tumour),
chr=chr,
ref_genome=normalizePath(ref_genome),
germ_resource=normalizePath(germ_resource),
biallelic_db=normalizePath(biallelic_db),
db_interval=normalizePath(db_interval),
pon=normalizePath(pon),
filter=filter,
orientation=orientation,
mnps=mnps,
contamination=contamination,
clean=clean,
tumour=tumour,
normal=normal,
output_dir=out_file_dir,
verbose=verbose,
threads=threads,
executor_id=task_id)
},mc.cores=threads)
}else if(mode=="batch"){
rdata_file=paste0(tmp_dir,"/",job,".samples.RData")
output_dir=out_file_dir
executor_id=task_id
save(tumour_list,normal,
sif_gatk,
bin_bcftools,
bin_samtools,
bin_bgzip,
bin_tabix,
chr,
germ_resource,
biallelic_db,db_interval,pon,ref_genome,
filter,orientation,mnps,
mode,ram,
executor_id,
output_dir,
contamination,clean,
verbose,file = rdata_file)
exec_code=paste0("Rscript -e \"ULPwgs::parallel_regions_mutect2_gatk(rdata=\\\"",
rdata_file,"\\\",selected=$SGE_TASK_ID)\"")
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,
threads=4,output_dir=out_file_dir2,
hold=hold,array=length(tumour_list))
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
jobs_report[["steps"]][["par_sample_call_variants"]]<- build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
filtered_vcf=ifelse(filter,paste0(out_file_dir,"/mutect2_reports/",names(tumour_list),"/",
names(tumour_list),".filtered.vcf"),""),
sorted_vcf=paste0(out_file_dir,"/mutect2_reports/",names(tumour_list),"/",
names(tumour_list),".sorted.vcf"),
compressed_vcf=paste0(out_file_dir,"/mutect2_reports/",names(tumour_list),"/",
names(tumour_list),".sorted.vcf.gz")
)
)
}
}else if (method=="multi"){
jobs_report[["steps"]][["par_sample_call_variants"]]<-
parallel_regions_mutect2_gatk(
sif_gatk=sif_gatk,
bin_bcftools=bin_bcftools,
bin_samtools=bin_samtools,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
regions=regions,
output_name=patient_id,
chr=chr,
ref_genome=normalizePath(ref_genome),
germ_resource=normalizePath(germ_resource),
biallelic_db=normalizePath(biallelic_db),
db_interval=normalizePath(db_interval),
pon=normalizePath(pon),
filter=filter,
orientation=orientation,
mnps=mnps,
contamination=contamination,
clean=clean,
tumour=tumour,
normal=normal,
output_dir=out_file_dir,
verbose=verbose,
threads=threads,
executor_id=task_id,
mode=mode
)
}else{
stop("Wrong method supplied. Only single or multi methods available.")
}
if(wait&&mode=="batch"){
job_validator(job=unlist_lvl(jobs_report[["steps"]],var="job_id"),time=update_time,
verbose=verbose,threads=threads)
}
return(jobs_report)
}
#' Multiregion parallelization across Mutect2 Gatk Variant Calling for multiple samples
#'
#' This function functions calls Mutect2 across multiple regions in parallel.
#' If a vector of tumour samples are provided these will be processed in multi-sample mode.
#' To run in tumour-normal mode suppply a single tumour and normal sample.
#' If no normal is supplied this will run in tumour only.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037593851-Mutect2
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param bin_bcftools [REQUIRED] Path to bcftools binary file.
#' @param bin_vep [REQUIRED] Path to VEP binary.
#' @param bin_bgzip [REQUIRED] Path to bgzip binary file.
#' @param bin_tabix [REQUIRED] Path to tabix binary file.
#' @param bin_samtools [REQUIRED] Path to samtools binary file.
#' @param sample_sheet [OPTIONAL] Path to sheet with sample information.
#' @param bam_dir [OPTIONAL] Path to directory with BAM files.
#' @param normal_id [OPTIONAL] Path to directory with BAM files.
#' @param chr [OPTIONAL] Chromosomes to analyze. Default c(1:22,"X","Y","MT")
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param germ_resource [REQUIRED]Path to germline resources vcf file.
#' @param regions [OPTIONAL] Regions to analyze. If regions for parallelization are not provided then these will be infered from BAM file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param pon [OPTIONAL] Path to panel of normal.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param mnps [OPTIONAL] Report MNPs in vcf file.
#' @param method [OPTIONAL] Default variant calling method. Default single. Options ["single","multi"]
#' @param contamination [OPTIONAL] Produce sample cross-contamination reports. Default TRUE.
#' @param orientation [OPTIONAL] Produce read orientation inforamtion. Default FALSE
#' @param filter [OPTIONAL] Filter Mutect2. Default TRUE.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
multisample_mutect2_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bin_vep=build_default_tool_binary_list()$bin_vep,
bin_bcftools=build_default_tool_binary_list()$bin_bcftools,
bin_samtools=build_default_tool_binary_list()$bin_samtools,
bin_bgzip=build_default_tool_binary_list()$bin_bgzip,
bin_tabix=build_default_tool_binary_list()$bin_tabix,
sample_sheet=NULL,
bam_dir="",
normal_id="",
patient_id="",
pattern="bam$",
ref_genome=build_default_reference_list()$HG19$reference$genome,
germ_resource=build_default_reference_list()$HG19$variant$germ_reference,
biallelic_db=build_default_reference_list()$HG19$variant$biallelic_reference,
db_interval=build_default_reference_list()$HG19$variant$biallelic_reference,
pon=build_default_reference_list()$HG19$panel$PCF_V3$variant$pon_muts,
chr=c(1:22,"X","Y","MT"),
method="single",
regions=NULL,output_dir=".",
verbose=FALSE,filter=TRUE,
annotate=TRUE,extract_pass=TRUE,
orientation=TRUE,mnps=FALSE,
contamination=TRUE,clean=FALSE,
header=TRUE,
sep="\t",
batch_config=build_default_preprocess_config(),
threads=4,ram=8,mode="local",
executor_id=make_unique_id("multiSampleMutect2"),
task_name="multiSampleMutect2",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
columns=c(
"sif_gatk",
"bin_bcftools",
"bin_samtools",
"bin_bgzip",
"bin_vep",
"bin_tabix",
"patient_id",
"tumour",
"normal",
"chr",
"ref_genome",
"germ_resource",
"biallelic_db",
"db_interval",
"pon",
"regions",
"filter",
"orientation",
"annotate",
"extract_pass",
"mnps",
"method",
"contamination",
"output_dir",
"clean",
"verbose",
"batch_config",
"threads",
"ram",
"time",
"mode",
"hold")
if(!is.null(sample_sheet)){
if(!is.data.frame(sample_sheet)){
file_info=read.csv(sample_sheet,header=header,sep=sep,stringsAsFactors=FALSE)
if(!header){
names(file_info)=columns
}
}else{
file_info=sample_sheet
}
file_info=file_info %>% dplyr::group_by(dplyr::across(-tumour)) %>% dplyr::summarise(tumour=list(tumour))
job_report[["steps"]][["multisample_gatk"]]=parallel::mclapply(seq(1,nrow(file_info)),FUN=function(x){
lapply(columns,FUN=function(col){
if(is.null(file_info[[col]])){
file_info[[col]]<<-get(col)
}
})
job_report<- parallel_samples_mutect2_gatk(
sif_gatk=sif_gatk,
bin_vep=bin_vep,
bin_bcftools=bin_bcftools,
bin_samtools=bin_samtools,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
tumour=normalizePath(unlist(file_info[x,]$tumour)),
normal=normalizePath(file_info[x,]$normal),
ref_genome=normalizePath(file_info[x,]$ref_genome),
germ_resource=normalizePath(file_info[x,]$germ_resource),
biallelic_db=normalizePath(file_info[x,]$biallelic_db),
db_interval=normalizePath(file_info[x,]$biallelic_db),
patient_id=file_info[x,]$patient_id,
pon=normalizePath(file_info[x,]$pon),
chr=eval(parse(text=file_info[x,]$chr)),
regions=file_info[[x,"regions"]],
method=file_info[x,]$method,
output_dir=file_info[x,]$output_dir,
extract_pass=file_info[x,]$extract_pass,
annotate=file_info[x,]$annotate,
verbose=file_info[x,]$verbose,
filter=file_info[x,]$filter,
orientation=file_info[x,]$orientation,mnps=file_info[x,]$mnps,
contamination=file_info[x,]$contamination,
clean=file_info[x,]$clean,
batch_config=file_info[x,]$batch_config,
threads=file_info[x,]$threads,
ram=file_info[x,]$ram,
mode=file_info[x,]$mode,
executor_id=task_id,
time=file_info[x,]$time,
hold=file_info[[x,"hold"]])
},mc.cores=ifelse(mode=="local",1,3))
}else{
bam_dir_path=system(paste("realpath",bam_dir),intern=TRUE)
bam_files=system(paste0("find ",bam_dir_path,"| grep ",pattern),intern=TRUE)
tumour=bam_files[!grepl(normal_id,bam_files)]
normal=bam_files[grepl(normal_id,bam_files)]
job_report<-parallel_samples_mutect2_gatk(
sif_gatk=sif_gatk,
bin_vep=bin_vep,
bin_bcftools=bin_bcftools,
bin_samtools=bin_samtools,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
tumour=normalizePath(tumour),
normal=normalizePath(normal),
patient_id=patient_id,
ref_genome=normalizePath(ref_genome),
germ_resource=normalizePath(germ_resource),
biallelic_db=normalizePath(biallelic_db),
db_interval=normalizePath(biallelic_db),
extract_pass=extract_pass,
chr=chr,
annotate=annotate,
regions=regions,
method=method,
pon=pon,
output_dir=output_dir,
verbose=verbose,
filter=filter,
orientation=orientation,mnps=mnps,
contamination=contamination,
clean=clean,
batch_config=batch_config,
threads=threads,ram=ram,
mode=mode,
executor_id=task_id,
time=time,
hold=hold)
}
if(wait&&mode=="batch"){
job_validator(job=unlist_level(named_list=job_report[["steps"]][["multisample_gatk"]],var="job_id"),
time=update_time,verbose=verbose,threads=threads)
}
return(job_report)
}
#' Filter Mutect2 Calls wrapper for R
#'
#' This function filters Mutect2 callsets under multiple conditions
#'
#' For more information:
#'
#' https://gatk.broadinstitute.org/hc/en-us/articles/360036856831-FilterMutectCalls
#'
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param bin_bcftools [REQUIRED] Path to bcftools binary file.
#' @param bin_bgzip [REQUIRED] Path to bgzip binary file.
#' @param bin_tabix [REQUIRED] Path to tabix binary file.
#' @param vcf [REQUIRED] Path to VCF file.
#' @param stats [OPTIONAL] Path to stats information generated by Mutect2 for supplied VCF.
#' @param contamination_table [OPTIONAL] Path to contamination tables for each tumour samples in VCF.
#' @param segmentation_table [OPTIONAL] Path to segmentation tables for each tumour samples in VCF.
#' @param orientation_model [OPTIONAL] Path to orientation model generated F1R2 read information.
#' @param clean [OPTIONAL] Remove unfiltered VCF after completion. Default FALSE.
#' @param extract_pass [OPTIONAL] Extract PASSing variants. Default TRUE.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [OPTIONAL] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id [OPTIONAL] Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name [OPTIONAL] Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
mutect_filter_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bin_bcftools=build_default_tool_binary_list()$bin_bcftools,
bin_bgzip=build_default_tool_binary_list()$bin_bgzip,
bin_tabix=build_default_tool_binary_list()$bin_tabix,
vcf="",stats=NULL,contamination_table=NULL,
extract_pass=TRUE,
segmentation_table=NULL,orientation_model=NULL,output_name="",
ref_genome=build_default_reference_list()$HG19$reference$genome,
output_dir=".",verbose=FALSE,clean=FALSE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("filterMutect2Gatk"),
task_name="filterMutect2Gatk",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
id=""
if(output_name!=""){
id=output_name
}else{
id=get_file_name(vcf)
}
if(!is.null(stats)){
stats=paste0(" -stats ",normalizePath(stats))
}
if (!is.null(contamination_table)){
contamination_table=paste0(" --contamination-table ", paste0(normalizePath(contamination_table),
collapse=" --contamination-table "))
}
if(!is.null(segmentation_table)){
segmentation_table=paste0(" --tumor-segmentation ",paste0(normalizePath(segmentation_table),
collapse=" --tumor-segmentation "))
}
if(!is.null(orientation_model)){
orientation_model=paste0(" --ob-priors ",normalizePath(orientation_model))
}
out_file=paste0(out_file_dir,"/",id,".filtered.vcf")
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk FilterMutectCalls -R ",ref_genome," -O ",out_file," -V ",normalizePath(vcf), stats,
contamination_table,segmentation_table,orientation_model
)
if(clean){
exec_code=paste(exec_code," && rm",vcf)
}
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
filtered_vcf=out_file)
)
if(extract_pass){
job_report[["steps"]][["extractPASSvcf"]]<-
parallel_vcfs_variants_by_filters_vcf(
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
vcf=out_file,filters="PASS",
exclusive=TRUE,
compress=FALSE,
output_dir=out_file_dir,
verbose=verbose,
batch_config=batch_config,
threads=threads,ram=ram,mode=mode,
executor_id=task_id,
time=time,
hold=job
)
}
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
gather_mutect2_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bin_samtools=build_default_tool_binary_list()$bin_samtools,
bin_bcftools=build_default_tool_binary_list()$bin_bcftools,
bin_bgzip=build_default_tool_binary_list()$bin_bgzip,
bin_tabix=build_default_tool_binary_list()$bin_tabix,
vcfs="",stats=NULL,f1r2=NULL,output_dir=".",tmp_dir=".",
output_name="",verbose=FALSE,orientation=FALSE,clean=TRUE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("gatherMutect"),
task_name="gatherMutect",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
jobs_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
jobs_report[["steps"]][["concatVCF"]] <- concat_vcf(
bin_bcftools=bin_bcftools,
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
vcfs=normalizePath(vcfs),clean=clean,sort=TRUE,
compress=FALSE,format="vcf",
verbose=verbose,output_name=output_name,
output_dir=out_file_dir,
tmp_dir=tmp_dir,batch_config=batch_config,
threads=1,ram=ram,mode=mode,
executor_id=task_id,
time=time,hold=hold
)
jobs_report[["steps"]][["compressAndIndex"]]<-compress_and_index_vcf_htslib(
bin_bgzip=bin_bgzip,
bin_tabix=bin_tabix,
vcf=unlist_lvl(jobs_report[["steps"]][["concatVCF"]],var="sorted_vcf"),
output_dir=out_file_dir,output_name=output_name,
clean=clean,verbose=verbose,
batch_config=batch_config,
threads=1,ram=ram,mode=mode,
executor_id=task_id,
time=time,
hold=unlist_lvl(jobs_report[["steps"]][["concatVCF"]],var="job_id")
)
jobs_report[["steps"]][["mergeStatMutect"]] <- merge_mutect_stats_gatk(
sif_gatk=sif_gatk,
stats=stats,output_name=output_name,
output_dir=out_file_dir,clean=clean,
verbose=verbose,batch_config=batch_config,
threads=1,ram=ram,mode=mode,
executor_id=task_id,
time=time,hold=hold
)
if(orientation){
jobs_report[["steps"]][["orientationModel"]] <- learn_orientation_gatk(
sif_gatk=sif_gatk,
f1r2=f1r2,output_name=output_name,
output_dir=out_file_dir,clean=clean,
verbose=verbose,batch_config=batch_config,
threads=1,ram=4,mode=mode,
executor_id=task_id,
time=time,hold=hold
)
}
if(wait&&mode=="batch"){
job_validator(job=jobs_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(jobs_report)
}
#' Create Read Orientation Model for Mutect2 Filter
#'
#' This function creates read orientation model for mutect2 filtering
#'
#' @param f1r2 Path to f1r2 files.
#' @param sif_gatk Path to gatk sif file.
#' @param ref_genome Path to reference genome fasta file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param output_dir Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
learn_orientation_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
f1r2="",output_name="",output_dir=".",clean=TRUE,
verbose=FALSE,batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("learnOrientationMutect2"),
task_name="learnOrientationMutect2",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
id=""
if(output_name!=""){
id=output_name
}else{
id=get_file_name(f1r2[1])
}
if (!is.null(f1r2)){
f1r2_list=paste0(" -I ",paste0(normalizePath(f1r2),collapse=" -I "))
}
out_file=paste0(out_file_dir,"/",id,".ROM.tar.gz")
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk LearnReadOrientationModel ",f1r2_list," -O ",out_file)
if(clean){
exec_code=paste(exec_code," && rm",paste(f1r2,collapse=" "))
}
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
orientation_model=out_file)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
#' Estimate sample contamination Gatk
#'
#' This function estimates the sample cross-contamination
#'
#' @param f1r2 Path to f1r2 files.
#' @param sif_gatk Path to gatk sif file.
#' @param ref_genome Path to reference genome fasta file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param output_dir Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
estimate_contamination_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
rdata=NULL,selected=NULL,
tumour=NA,normal=NA,tumour_pileup="",
normal_pileup="",output_name="",output_dir=".",tmp_dir=".",
biallelic_db=build_default_reference_list()$HG19$variant$biallelic_reference,
db_interval=build_default_reference_list()$HG19$variant$biallelic_reference,
verbose=FALSE,batch_config=build_default_preprocess_config(),
threads=1,ram=4,mode="local",
executor_id=make_unique_id("estimateContaminationGatk"),
task_name="estimateContaminationGatk",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL){
if(!is.null(rdata)){
load(rdata)
if(!is.null(selected)){
tumour=tumour_list[selected]
}
}
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir,name="contamination_reports")
out_file_dir_tab=set_dir(dir=out_file_dir,name="contamination")
out_file_dir_seg=set_dir(dir=out_file_dir,name="segmentation")
job=build_job(executor_id=executor_id,task_id=task_id)
if(output_name!=""){
id=output_name
}else if(tumour!=""){
id=get_file_name(tumour)
}else if(tumour_pileup!=""){
id=get_file_name(tumour_pileup)
}
out_file=paste0(out_file_dir_tab,"/",id,".contamination.table")
out_file2=paste0(out_file_dir_seg,"/",id,".segmentation.table")
jobs_report=build_job_report(
job_id=job,
executor_id=executor_id,
task_id=task_id,
input_args = argg,
out_file_dir=list(
contamination=out_file_dir_tab,
segmentation=out_file_dir_seg
),
out_files=list(
contamination_table=out_file,
segmentation_table=out_file2
)
)
if(!is.na(normal)){
jobs_report[["steps"]][["nPileupGatk"]]<-pileup_summary_gatk(
sif_gatk=sif_gatk,
bam=normalizePath(normal),
output_name=get_file_name(normal),
output_dir=paste0(out_file_dir,"/pileup_reports/normal"),
verbose=verbose,batch_config=batch_config,
biallelic_db=normalizePath(biallelic_db),
db_interval=normalizePath(db_interval),
threads=1,ram=ram,mode=mode,
executor_id=task_id,hold=hold
)
normal_pileup<-jobs_report[["steps"]][["nPileupGatk"]]$out_files$pileup_table
}
if(!is.na(tumour)){
jobs_report[["steps"]][["tPileupGatk"]]<-pileup_summary_gatk(
sif_gatk=sif_gatk,
bam=normalizePath(tumour),
output_name=get_file_name(tumour),
output_dir=paste0(out_file_dir,"/pileup_reports/tumour"),
verbose=verbose,batch_config=batch_config,
biallelic_db=normalizePath(biallelic_db),
db_interval=normalizePath(db_interval),
threads=1,ram=ram,mode=mode,
executor_id=task_id,hold=hold
)
tumour_pileup<-jobs_report[["steps"]][["tPileupGatk"]]$out_files$pileup_table
}
if (normal_pileup!=""){
normal_pileup=paste0(" -matched ",normalizePath(normal_pileup))
}
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk CalculateContamination -O ",out_file, " -tumor-segmentation ", out_file2,
" -I ",normalizePath(tumour_pileup),normal_pileup)
if(mode=="batch"){
hold=unlist_lvl(jobs_report[["steps"]],var="job_id")
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
jobs_report$exec_code=exec_code
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
if(wait&&mode=="batch"){
job_validator(job=jobs_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(jobs_report)
}
#' Estimate multis-sample contamination GATK
#'
#' This function estimates the sample cross-contamination
#'
#' @param f1r2 Path to f1r2 files.
#' @param sif_gatk Path to gatk sif file.
#' @param ref_genome Path to reference genome fasta file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param output_dir Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
parallel_estimate_contamination_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
tumours="",normal="",output_dir=".",tmp_dir=".",
biallelic_db=build_default_reference_list()$HG19$variant$biallelic_reference,
db_interval=build_default_reference_list()$HG19$variant$biallelic_reference,
verbose=FALSE,batch_config=build_default_preprocess_config(),
threads=1,ram=4,mode="local",
executor_id=make_unique_id("parEstimateContaminationGatk"),
task_name="parEstimateContaminationGatk",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
jobs_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
tumour_list=tumours
names(tumour_list)=Vectorize(get_file_name)(tumours)
if(normal!=""){
jobs_report[["steps"]][["nPileupGatk"]]<-pileup_summary_gatk(
sif_gatk=sif_gatk,
bam=normal,output_name=get_file_name(normal),
output_dir=paste0(out_file_dir,"/contamination_reports/pileup_reports/normal"),
verbose=verbose,batch_config=batch_config,
biallelic_db=biallelic_db,
db_interval=db_interval,
threads=1,ram=ram,mode=mode,
executor_id=task_id,hold=hold
)
normal_pileup<-jobs_report[["steps"]][["nPileupGatk"]]$out_files$pileup_table
hold=jobs_report[["steps"]][["nPileupGatk"]]$job_id
}
if(mode=="local"){
jobs_report[["steps"]][["parEstimateContaminationGatk"]]<-
parallel::mclapply(tumour_list,FUN=function(tumour){
job_report <- estimate_contamination_gatk(
sif_gatk= sif_gatk,
tumour = normalizePath(tumour),
normal="",
normal_pileup= normal_pileup,
output_name=get_file_name(tumour),
output_dir=out_file_dir,
tmp_dir=tmp_dir,
verbose=verbose,
executor_id=task_id)
},mc.cores=threads)
}else if(mode=="batch"){
rdata_file=paste0(tmp_dir,"/",job,".pileups.RData")
save(tumour_list,normal_pileup,sif_gatk,biallelic_db,db_interval,
output_dir,verbose,tmp_dir,file = rdata_file)
exec_code=paste0("Rscript -e \"ULPwgs::estimate_contamination_gatk(rdata=\\\"",
rdata_file,"\\\",selected=$SGE_TASK_ID)\"")
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,
threads=1,output_dir=out_file_dir2,
hold=hold,array=length(tumour_list))
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
jobs_report[["steps"]][["parEstimateContaminationGatk"]]<- build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
contamination_table=paste0(out_file_dir,"/contamination_reports/contamination/",names(tumour_list),".contamination.table"),
segmentation_table=paste0(out_file_dir,"/contamination_reports/segmentation/",names(tumour_list),".segmentation.table")
)
)
}
if(wait&&mode=="batch"){
jobs_validator(job=jobs_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(jobs_report)
}
#' Generate a pileup summary for BAM file
#'
#' This function generates a pilelup summary for a bam file
#'
#' @param bam Path to a BAM file
#' @param sif_gatk Path to gatk sif file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param output_dir Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
pileup_summary_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bam="",output_name="",output_dir=".",
rdata=NULL,selected=NULL,
verbose=FALSE,batch_config=build_default_preprocess_config(),
biallelic_db=build_default_reference_list()$HG19$variant$biallelic_reference,
db_interval=build_default_reference_list()$HG19$variant$biallelic_reference,
threads=1,ram=4,mode="local",
executor_id=make_unique_id("pileupSummaryGatk"),
task_name="pileupSummaryGatk",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
if(!is.null(rdata)){
load(rdata)
if(!is.null(selected)){
bam=bam_list[selected]
}
}
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
id=""
if(output_name!=""){
id=output_name
}else{
id=get_file_name(bam)
}
out_file=paste0(out_file_dir,"/",id,".pileup.table")
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk GetPileupSummaries -O ",out_file," -V ",normalizePath(biallelic_db)," -L ",
normalizePath(db_interval), " -I ",normalizePath(bam))
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",
exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
pileup_table=out_file)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
#' Generate pileup summary for BAM files
#'
#' This function generates a pilelup summary for a bam file
#'
#' @param bam Path to a BAM file
#' @param sif_gatk Path to gatk sif file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param output_dir Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
parallel_pileup_summary_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bams="",output_dir=".",tmp_dir=".",verbose=FALSE,
batch_config=build_default_preprocess_config(),
biallelic_db=build_default_reference_list()$HG19$variant$biallelic_reference,
db_interval=build_default_reference_list()$HG19$variant$biallelic_reference,
threads=1,ram=4,mode="local",
executor_id=make_unique_id("parPileupSummaryGatk"),
task_name="parPileupSummaryGatk",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,task_id=task_id)
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
)
)
bam_list=bams
names(bam_list)=Vectorize(get_file_name)(bams)
if(mode=="local"){
job_report[["steps"]][["parPileupSummaryGatk"]]<-
parallel::mclapply(bam_list,FUN=function(bam){
job_report <- pileup_summary_gatk(
sif_gatk=sif_gatk,
bam=normalizePath(bam),output_name=get_file_name(bam),output_dir=out_file_dir,
verbose=verbose,batch_config=batch_config,
biallelic_db=normalizePath(biallelic_db),
db_interval=normalizePath(db_interval),
executor_id=task_id,
time=time,hold=hold)
},mc.cores=threads)
}else if(mode=="batch"){
rdata_file=paste0(tmp_dir,"/",job,".tumours.RData")
output_dir=out_file_dir
save(bam_list,sif_gatk,output_dir,normalizePath(biallelic_db),normalizePath(db_interval),verbose,tmp_dir,file = rdata_file)
exec_code=paste0("Rscript -e \"ULPwgs::pileup_summary_gatk(rdata=\\\"",rdata_file,"\\\",selected=$SGE_TASK_ID)\"")
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,time=time,ram=ram,
threads=1,output_dir=out_file_dir2,
hold=hold,array=length(bam_list))
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report[["steps"]][["parPileupSummaryGatk"]]<- build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args=argg,
out_file_dir=out_file_dir,
out_files=list(
pileup_table=paste0(out_file_dir,"/pileup/",names(bam_list),".pileup.table")
)
)
}
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
#' Multiregion parallelization across Mutect2 Gatk Variant Calling
#'
#' This function functions calls Mutect2 across multiple regions in parallel
#'
#' @param sif_gatk Path to gatk sif file.
#' @param stats Path to Mutect2 stats files.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param output_dir Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
merge_mutect_stats_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
stats=NA,output_name="",output_dir=".",clean=FALSE,
verbose=TRUE, batch_config=build_default_preprocess_config(),
threads=1,ram=4,mode="local",
executor_id=make_unique_id("mergeMutectStats"),
task_name="mergeMutectStats",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir)
job=build_job(executor_id=executor_id,
task_id=task_id)
id=""
if(output_name!=""){
id=output_name
}else{
id=get_file_name(stats[1])
}
if (!is.null(stats)){
stat=paste0(" -stats ",paste0(normalizePath(stats),collapse=" -stats "))
}
out_file=paste0(out_file_dir,"/",id,".merged.vcf.stats")
exec_code=paste0("singularity exec -H ",getwd(),":/home ",sif_gatk,
" /gatk/gatk MergeMutectStats -O ", out_file ,stat)
if(clean){
exec_code=paste(exec_code," && rm",paste(normalizePath(stats),collapse=" "))
}
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
merged_stats=out_file)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
#' Wrapper around CreateSomaticPanelOfNormals from GATK package
#'
#' This function creates a somatic panel of normals from normal samples bam files.
#' Recommended minimum number of normal samples is around 40.
#'
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360037058172-CreateSomaticPanelOfNormals-BETA-
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param bin_bcftools [REQUIRED] Path to bcftools binary file.
#' @param bin_bgzip [REQUIRED] Path to bgzip binary file.
#' @param bin_tabix [REQUIRED] Path to tabix binary file.
#' @param bin_samtools [REQUIRED] Path to samtools binary file.
#' @param normals [OPTIONAL] Path to normal BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param germ_resource [REQUIRED]Path to germline resources vcf file.
#' @param regions [OPTIONAL] Regions to analyze. If regions for parallelization are not provided then these will be infered from BAM file.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of one of the samples will be used.
#' @param pon [OPTIONAL] Path to panel of normal.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param mnps [OPTIONAL] Report MNPs in vcf file.
#' @param contamination [OPTIONAL] Produce sample cross-contamination reports. Default TRUE.
#' @param orientation [OPTIONAL] Produce read orientation inforamtion. Default FALSE
#' @param filter [OPTIONAL] Filter Mutect2. Default TRUE.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
create_pon_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bin_bcftools=build_default_tool_binary_list()$bin_bcftools,
bin_samtools=build_default_tool_binary_list()$bin_samtools,
bin_bgzip=build_default_tool_binary_list()$bin_bgzip,
bin_tabix=build_default_tool_binary_list()$bin_tabix,
output_name="PoN",
vcfs="",
regions=NULL,output_dir=".",
ref_genome=build_default_reference_list()$HG19$reference$genome,
germ_resource=build_default_reference_list()$HG19$variant$germ_reference,
verbose=FALSE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("createPoNGatk"),
task_name="createPoNGATK",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir,name="pon_gatk")
job=build_job(executor_id=executor_id,task_id=task_id)
out_file=paste0(out_file_dir,output_name,".vcf.gz")
jobs_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code="",
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
pon=out_file
)
)
jobs_report[["steps"]][["createGenomicDbGatk"]]<-create_genomic_db_gatk(
sif_gatk=sif_gatk,
vcfs=normalizePath(vcfs),output_dir=out_file_dir,
ref_genome=build_default_reference_list()$HG19$reference$genome,
verbose=verbose,
batch_config=batch_config,
threads=threads,ram=ram,mode=mode,
executor_id=task_id,
time=time,
hold=hold
)
hold=unlist_lvl(jobs_report[["steps"]][["createGenomicDbGatk"]],var="job_id")
exec_code=paste("singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk CreateSomaticPanelOfNormals -R ",ref_genome,paste0(" -V gendb://",
normalizePath(jobs_report[["steps"]][["createGenomicDbGatk"]]$out_file_dir))," -O ",out_file)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
jobs_report$exec_code=exec_code
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
if(wait&&mode=="batch"){
job_validator(job=jobs_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(jobs_report)
}
#' Wrapper around CreateDBImport from GATK package
#'
#' This function creates a genomic database
#'
#' For more information about this function: https://gatk.broadinstitute.org/hc/en-us/articles/360037058172-CreateSomaticPanelOfNormals-BETA-
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param vcfs [OPTIONAL] Path to VCFs files. Only required if BAM files are not given.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param size [OPTIONAL] Number of VCF per thread. Default 10.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
create_genomic_db_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bin_samtools=build_default_tool_binary_list()$bin_samtools,
bin_bgzip=build_default_tool_binary_list()$bin_bgzip,
bin_tabix=build_default_tool_binary_list()$bin_tabix,
vcfs="",regions=NULL,output_dir=".",
ref_genome=build_default_reference_list()$HG19$reference$genome,
verbose=FALSE,size=10,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("createGenomicDB"),
task_name="createGenomicDB",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
argg <- as.list(environment())
task_id=make_unique_id(task_name)
tmp_dir=set_dir(dir=output_dir,name="tmp")
job=build_job(executor_id=executor_id,task_id=task_id)
map_file=paste0(tmp_dir,"/vcfs.map")
map=data.frame(id=Vectorize(get_file_name)(vcfs),file=vcfs)
write.table(x=map,file=map_file,quote=FALSE,sep="\t",
row.names=FALSE,col.names=FALSE)
jobs_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=list(),
task_id=task_id,
input_args = argg,
out_file_dir=paste0(output_dir,"/db"),
out_files=list(
map_file=map_file
)
)
if(is.null(regions)){
jobs_report[["steps"]][["getChr"]] <- get_fai_reference_chr(
fasta=ref_genome,verbose=verbose,output_dir=tmp_dir,
output_name="refChr",header=FALSE,
executor_id=task_id,mode="local",threads=threads,ram=ram,
time=time,update_time=update_time,wait=FALSE,hold=hold)
regions=jobs_report[["steps"]][["getChr"]]$out_files$ref
}
exec_code=paste("singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk GenomicsDBImport -R ",normalizePath(ref_genome),
" --genomicsdb-workspace-path ",paste0(output_dir,"/db"),
" --batch-size ",size,
" --max-num-intervals-to-import-in-parallel ",threads,
" --tmp-dir ",normalizePath(tmp_dir)," --reader-threads ",threads,
" --overwrite-existing-genomicsdb-workspace TRUE",
" --sample-name-map ",normalizePath(map_file), " -L ",regions)
if(mode=="batch"){
out_file_dir2=set_dir(dir=output_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
jobs_report$exec_code=exec_code
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
if(wait&&mode=="batch"){
job_validator(job=jobs_report$job_id,
time=update_time,verbose=verbose,threads=threads)
}
return(jobs_report)
}
#' Variant Calling using HaplotypeCaller
#'
#' This function functions calls HaplotypeCaller for variant calling.
#' If a vector of normal samples are provided these will be processed in multi-sample mode.
#' To run in normal mode suppply a single normal sample.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param normal [OPTIONAL] Path to normal BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param rdata [OPTIONAL] Import R data information with list of BAM.
#' @param selected [OPTIONAL] Select BAM from list.
#' @param output_name [OPTIONAL] Name for the output. If not given the name of the first tumour sample of the samples will be used.
#' @param output_dir [OPTIONAL] Path to the output directory.
#' @param tmp_dir [OPTIONAL] Path to the temporary directory.
#' @param threads [OPTIONAL] Number of threads to split the work. Default 4
#' @param ram [OPTIONAL] RAM memory to asing to each thread. Default 4
#' @param verbose [OPTIONAL] Enables progress messages. Default False.
#' @param mode [REQUIRED] Where to parallelize. Default local. Options ["local","batch"]
#' @param batch_config [REQUIRED] Additional batch configuration if batch mode selected.
#' @param executor_id Task EXECUTOR ID. Default "recalCovariates"
#' @param task_name Task name. Default "recalCovariates"
#' @param time [OPTIONAL] If batch mode. Max run time per job. Default "48:0:0"
#' @param update_time [OPTIONAL] If batch mode. Job update time in seconds. Default 60.
#' @param wait [OPTIONAL] If batch mode wait for batch to finish. Default FALSE
#' @param hold [OPTIONAL] HOld job until job is finished. Job ID.
#' @export
haplotypecaller_gatk=function(
rdata=NULL,selected=NULL,
sif_gatk=build_default_sif_list()$sif_gatk,
normal="",
region="",
ref_genome=build_default_reference_list()$HG19$reference$genome,
output_dir=".",
tmp_dir=".",
output_name="",
verbose=FALSE,
batch_config=build_default_preprocess_config(),
threads=4,ram=4,mode="local",
executor_id=make_unique_id("parallelCallHaplotypecallerGatk"),
task_name="parallelCallHaplotypecallerGatk",time="48:0:0",
update_time=60,wait=FALSE,hold=NULL
){
if(!is.null(rdata)){
load(rdata)
if(!is.null(selected)){
region=region_list[selected]
}
}
argg <- as.list(environment())
task_id=make_unique_id(task_name)
out_file_dir=set_dir(dir=output_dir,name="vcf")
job=build_job(executor_id=executor_id,task_id=task_id)
if(tmp_dir!=""){
tmp_dir=paste0(" --tmp-dir ",normalizePath(tmp_dir))
}
id=""
if(output_name!=""){
id=output_name
}else{
id=get_file_name(normal[1])
}
reg=""
if (region==""){
out_file=paste0(out_file_dir,"/",id,".unfilt.vcf.gz")
}else{
reg=paste0(" -L ",region)
id=paste0(id,".",region)
out_file=paste0(out_file_dir,"/",id,".unfilt.vcf.gz")
}
if (is.vector(normal)){
normal=paste0(" -I ",paste(normalizePath(normal),collapse=" -I "))
}else{
normla=paste0(" -I ",normalizePath(normal))
}
exec_code=paste("singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk HaplotypeCaller -R ",normalizePath(ref_genome), normal," -O ",out_file,reg)
if(mode=="batch"){
out_file_dir2=set_dir(dir=out_file_dir,name="batch")
batch_code=build_job_exec(job=job,hold=hold,time=time,ram=ram,
threads=threads,output_dir=out_file_dir2)
exec_code=paste0("echo '. $HOME/.bashrc;",batch_config,";",exec_code,"'|",batch_code)
}
if(verbose){
print_verbose(job=job,arg=argg,exec_code=exec_code)
}
error=execute_job(exec_code=exec_code)
if(error!=0){
stop("gatk failed to run due to unknown error.
Check std error for more information.")
}
job_report=build_job_report(
job_id=job,
executor_id=executor_id,
exec_code=exec_code,
task_id=task_id,
input_args = argg,
out_file_dir=out_file_dir,
out_files=list(
vcf=out_file,
idx=paste0(out_file,".idx")
)
)
if(wait&&mode=="batch"){
job_validator(job=job_report$job_id,time=update_time,
verbose=verbose,threads=threads)
}
return(job_report)
}
#' Variant Calling using HaplotypeCaller
#'
#' This function functions calls HaplotypeCaller for variant calling.
#' If a vector of normal samples are provided these will be processed in multi-sample mode.
#' To run in normal mode suppply a single normal sample.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param bam [OPTIONAL] Path to normal BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param region [REQUIRED] Genomic position in samtools format chr:start-end.
#' @export
new_haplotypecaller_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
ref_genome=build_default_reference_list()$HG19$reference$genome,
indel_db=build_default_reference_list()$HG19$variant$mills_reference,
haplotype_db=build_default_reference_list()$HG19$variant$hapmap_reference,
bam=NULL,
region=NULL,
score=TRUE,
score_CNN="CNN_1D",
filter=TRUE,
snp_tranche=99.95,
indel_tranche=99.4,
keep_previous_filters=FALSE,
...
){
run_main=function(
.env
){
.this.env=environment()
append_env(to=.this.env,from=.env)
set_main(.env=.this.env)
tmp_dir=paste0(" --tmp-dir ",normalizePath(tmp_dir))
if(mode=="local_parallel"){
threads=1
}
if (is.null(region)){
.main$out_files$unfiltered_vcf=paste0(
out_file_dir,"/",input_id,".unfilt.vcf.gz"
)
}else{
.main$out_files$unfiltered_vcf=paste0(
out_file_dir,"/",paste0(input_id,".",input),".unfilt.vcf.gz"
)
.main$out_files$unfiltered_vcf_index=paste0(
out_file_dir,"/",paste0(input_id,".",input),".unfilt.vcf.gz.tbi"
)
region=paste0(" -L ",input)
}
.main$exec_code=paste(
"singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk HaplotypeCaller --native-pair-hmm-threads ",threads," -R ",
normalizePath(ref_genome),
paste0(" -I ",normalizePath(bam)),
" -O ",.main$out_files$unfiltered_vcf,
region
)
run_job(.env=.this.env)
.main.step=.main$steps[[fn_id]]
if(score){
.main.step$steps<-append(
.main.step$steps,
cnn_score_variants_gatk(
sif_gatk=sif_gatk,
ref_genome=normalizePath(ref_genome),
haplotype_db=normalizePath(haplotype_db),
indel_db=normalizePath(indel_db),
bam=bam,
vcf=.main.step$out_files$unfiltered_vcf,
filter=filter,
info_key=score_CNN,
snp_tranche=snp_tranche,
indel_tranche=indel_tranche,
keep_previous_filters=keep_previous_filters,
output_name=paste0(input_id,".",input),
output_dir=paste0(out_file_dir,"/scored"),
tmp_dir=tmp_dir,
batch_dir=batch_dir,
env_dir=env_dir,
err_msg=err_msg,
verbose=verbose,
threads=threads,
ram=ram,
executor_id=task_id
)
)
.this.step=.main.step$steps$cnn_score_variants_gatk
.main.step$out_files=append(.main.step$out_files,.this.step$out_files)
}
.env$.main<-.main
}
.base.env=environment()
list2env(list(...),envir=.base.env)
set_env_vars(
.env= .base.env,
output_name=get_file_name(bam),
vars="region"
)
launch(.env=.base.env)
}
#' Variant Calling using HaplotypeCaller
#'
#' This function functions calls HaplotypeCaller for variant calling.
#' If a vector of normal samples are provided these will be processed in multi-sample mode.
#' To run in normal mode suppply a single normal sample.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param bam [OPTIONAL] Path to normal BAM file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param region [REQUIRED] Genomic position in samtools format chr:start-end.
#' @export
call_haplotypecaller_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
bin_samtools=build_default_tool_binary_list()$bin_samtools,
ref_genome=build_default_reference_list()$HG19$reference$genome,
indel_db=build_default_reference_list()$HG19$variant$mills_reference,
haplotype_db=build_default_reference_list()$HG19$variant$hapmap_reference,
bam=NULL,
patient_id=NULL,
chromosomes=c(1:22,"X","Y"),
region=NULL,
score=TRUE,
filter=TRUE,
score_CNN="CNN_1D",
snp_tranche=99.95,
indel_tranche=99.4,
keep_previous_filters=FALSE,
...
){
run_main=function(
.env
){
.this.env=environment()
append_env(to=.this.env,from=.env)
out_file_dir=set_dir(
out_file_dir,
name=paste0(patient_id,"/haplotypecaller_reports/",input_id)
)
set_main(.env=.this.env)
.main$steps[[fn_id]]<-.this.env
.main.step=.main$steps[[fn_id]]
### IF NO REGION IS GIVEN SPLIT BAM PER CHROMOSOME
if(is.null(region)){
.main.step$steps <-append(
.main.step$steps,
get_sq_bam(
bin_samtools=bin_samtools,
bam=input,
output_name=input_id,
output_dir=tmp_dir,
header=TRUE,
tmp_dir=tmp_dir,
env_dir=env_dir,
batch_dir=batch_dir,
err_msg=err_msg,
verbose=verbose,
threads=threads,
ram=ram,
executor_id=task_id
)
)
.this.step=.main.step$steps$get_sq_bam
.main.step$out_files$region=.this.step$out_files$index_bed
region=.main.step$out_files$region
}
### ASCERTAIN REGION INPUT IF GIVEN
if(length(region)==1){
### IF PATH READ AS BED
if (file.exists(region)){
region=read_bed(
bed=region
)$body
}
### IF DATA.FRAME GENERATE GID
if (is.data.frame(region)){
region=region[region$chrom %in% chromosomes,]
### CHANGE TO BASE 1 SYSTEM
region$gid=paste0(region$chrom,":",as.numeric(region$chromStart)+1,"-",as.numeric(region$chromEnd))
region=unlist(region$gid)
}
}
### ONLY RUN LOCALLY PARALLEL JOBS AT REGION LEVEL IF SAMPLE LEVEL LOCAL/BATCH MODE
### LIMITATION OF MCLAPPLY
run_mode="local_parallel"
if(mode=="local_parallel"){
run_mode="local"
}
.main.step$steps <-append(
.main.step$steps,
new_haplotypecaller_gatk(
sif_gatk=sif_gatk,
ref_genome=normalizePath(ref_genome),
haplotype_db=normalizePath(haplotype_db),
indel_db=normalizePath(indel_db),
bam=input,
output_name=input_id,
region=region,
score=score,
filter=filter,
score_CNN=score_CNN,
snp_tranche=snp_tranche,
indel_tranche=indel_tranche,
keep_previous_filters=keep_previous_filters,
mode=run_mode,
output_dir=paste0(out_file_dir,"/unfiltered"),
tmp_dir=tmp_dir,
env_dir=env_dir,
batch_dir=batch_dir,
err_msg=err_msg,
verbose=verbose,
threads=threads,
ram=ram,
executor_id=task_id
))
.this.step=.main.step$steps
.main.step$out_files$region_vcfs=get_variable_env(env=.this.step)
.env$.main<-.main
}
.base.env=environment()
list2env(list(...),envir=.base.env)
set_env_vars(
.env= .base.env,
vars="bam"
)
launch(.env=.base.env)
}
#' Germline Variant Scoring Using CNN
#'
#' This function functions calls CNNScoreVariants.
#' If a vector of normal samples are provided these will be processed in multi-sample mode.
#' To run in normal mode suppply a single normal sample.
#' TO DO// Implement mitochondrial mode feature
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360037225632-HaplotypeCaller
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param vcf [REQUIRE] Path to VCF file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param bam [OPTIONAL] Path to BAM file. If given then 2D CNN will be applied
#' @export
#'
cnn_score_variants_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
ref_genome=build_default_reference_list()$HG19$reference$genome,
indel_db=build_default_reference_list()$HG19$variant$mills_reference,
haplotype_db=build_default_reference_list()$HG19$variant$hapmap_reference,
vcf=NULL,
bam=NULL,
filter=TRUE,
info_key="CNN_1D",
snp_tranche=99.95,
indel_tranche=99.4,
keep_previous_filters=FALSE,
...
){
run_main=function(
.env
){
.this.env=environment()
append_env(to=.this.env,from=.env)
set_main(.env=.this.env)
opt=""
.main$out_files$scored_vcf=paste0(out_file_dir,"/",input_id,".CNNscored.1D.vcf.gz")
.main$out_files$scored_vcf_index=paste0(out_file_dir,"/",input_id,".CNNscored.1D.vcf.gz.tbi")
if (info_key=="CNN_2D"){
bam=paste0(" -I ",bam)
opt=" -tensor-type read_tensor "
.main$out_files$scored_vcf=paste0(out_file_dir,"/",input_id,".CNNscored.2D.vcf.gz")
.main$out_files$scored_vcf_index=paste0(out_file_dir,"/",input_id,".CNNscored.2D.vcf.gz.tbi")
}else{
bam=NULL
}
.main$exec_code=paste(
"singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk CNNScoreVariants -R ",
normalizePath(ref_genome),
paste0(" -V ",normalizePath(vcf)),
" -O ",.main$out_files$scored_vcf,
bam,opt
)
run_job(.env=.this.env)
.main.step=.main$steps[[fn_id]]
if(filter){
.main.step$steps <-append(
.main.step$steps,
filter_variant_tranches_gatk(
sif_gatk=build_default_sif_list()$sif_gatk,
ref_genome=normalizePath(ref_genome),
indel_db=normalizePath(indel_db),
haplotype_db=normalizePath(haplotype_db),
vcf=.main$out_files$scored_vcf,
info_key=info_key,
snp_tranche=snp_tranche,
indel_tranche=indel_tranche,
keep_previous_filters=keep_previous_filters,
output_dir=paste0(out_file_dir,"/filtered"),
output_name=input_id,
tmp_dir=tmp_dir,
env_dir=env_dir,
batch_dir=batch_dir,
err_msg=err_msg,
verbose=verbose,
threads=threads,
ram=ram,
executor_id=task_id
)
)
.this.step=.main.step$steps$filter_variant_tranches_gatk
.main.step$out_files=append(.main.step$out_files,.this.step$out_files)
}
.env$.main<-.main
}
.base.env=environment()
list2env(list(...),envir=.base.env)
set_env_vars(
.env= .base.env,
vars="vcf"
)
launch(.env=.base.env)
}
#' Variant Trench Filtering using GATK
#'
#' This function filters variant tranches calls generated using CNNScoreVariants method
#'
#' For more information read:
#' https://gatk.broadinstitute.org/hc/en-us/articles/360051308071-FilterVariantTranches
#'
#' @param sif_gatk [REQUIRED] Path to gatk sif file.
#' @param vcf [REQUIRED] Path to VCF file.
#' @param ref_genome [REQUIRED] Path to reference genome fasta file.
#' @param indel_db [OPTIONAL] Path to database with indel info.
#' @param haplotype_db [OPTIONAL] Path to database with haplotype info.
#' @param info_key [OPTIONAL] Info Key for CNN model training. Default CNN_1D. Options ["CNN_1D","CNN_2D"]
#' @param snp_tranche [OPTIONAL] SNP tranche cut-off.
#' @param indel_tranche [OPTIONAL] INDEL tranche cut-off.
#' @param keep_previous_tranche [OPTIONAL] Remove previous filter information.
#' @export
filter_variant_tranches_gatk=function(
sif_gatk=build_default_sif_list()$sif_gatk,
ref_genome=build_default_reference_list()$HG19$reference$genome,
indel_db=build_default_reference_list()$HG19$variant$mills_reference,
haplotype_db=build_default_reference_list()$HG19$variant$hapmap_reference,
vcf=NULL,
info_key="CNN_1D",
snp_tranche=99.95,
indel_tranche=99.4,
keep_previous_filters=FALSE,
...
){
run_main=function(
.env
){
.this.env=environment()
append_env(to=.this.env,from=.env)
set_main(.env=.this.env)
prev_filters=" --invalidate-previous-filters "
if (keep_previous_filters){
prev_filters=" "
}
if(!is.null(indel_db)){
indel_db=paste0(" --resource ",normalizePath(indel_db))
}
if(!is.null(haplotype_db)){
haplotype_db=paste0(" --resource ",normalizePath(haplotype_db))
}
.main$out_files$filtered_vcf=paste0(out_file_dir,"/",input_id,".filteredTranches.vcf.gz")
.main$out_files$filtered_vcf_index=paste0(out_file_dir,"/",input_id,".filteredTranches.vcf.gz.tbi")
.main$exec_code=paste(
"singularity exec -H ",paste0(getwd(),":/home "),sif_gatk,
" /gatk/gatk FilterVariantTranches -R ",normalizePath(ref_genome),
" -O ", .main$out_files$filtered_vcf,
" -V ",normalizePath(input), " --info-key ",info_key,
" --snp-tranche ",snp_tranche,
" --indel-tranche ",indel_tranche,
indel_db,
haplotype_db,prev_filters
)
run_job(.env=.this.env)
.env$.main<-.main
}
.base.env=environment()
list2env(list(...),envir=.base.env)
set_env_vars(
.env=.base.env,
vars="vcf"
)
launch(.env=.base.env)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.