R/run.varitas.pipeline.R

Defines functions run.varitas.pipeline

Documented in run.varitas.pipeline

#' Run VariTAS pipeline in full.
#'
#' @description
#' 	Run all steps in VariTAS processing pipeline, with appropriate dependencies.
#' 
#' @param file.details
#' 	Data frame containing details of files to be used during first processing step.
#' 	Depending on what you want to be the first step in the pipeline, this can either be 
#'  FASTQ files, BAM files, VCF files, or variant (txt) files.
#' @param output.directory 
#'	Main directory where all files should be saved
#' @param run.name
#'	Name of pipeline run. Will be added as a prefix to all LSF jobs.
#' @param start.stage 
#' 	String indicating which stage pipeline should start at. If starting at a later stage
#' 	of the pipeline, appropriate input files must be provided. For example, if starting with annotation, 
#'	VCF files with variant calls must be provided.
#' @param variant.callers
#' 	Vector specifying which variant callers should be run.
#' @param proton
#' 	Logical indicating if data was generated by proton sequencing. Used to set base quality 
#'	thresholds in variant calling steps.
#' @param quiet
#' 	Logical indicating whether to print commands to screen rather than submit jobs. Defaults to FALSE,
#'	can be useful to set to TRUE for testing.
#' @param email 
#' 	Email address that should be notified when pipeline finishes. If NULL or FALSE, no email is sent. 
#' @param verify.options
#'	Logical indicating whether to run verify.varitas.options
#' @param save.specification.files
#'	Logical indicating if specification files should be saved to project directory
#'
#' @return None
#' @examples
#' run.varitas.pipeline(
#'        file.details = data.frame(
#'         sample.id = c('1', '2'),
#'         reads = c('1-R1.fastq.gz', '2-R1.fastq.gz'),
#'         mates = c('1-R2.fastq.gz', '2-R2.fastq.gz'),
#'          patient.id = c('P1', 'P1'),
#'          tissue = c('tumour', 'normal')
#'        ),
#'        output.directory = '.',
#'        quiet = TRUE,
#'        run.name = "Test", 
#'        variant.callers = c('mutect', 'varscan')
#'      )
#'
#' @export
run.varitas.pipeline <- function(
	file.details,
	output.directory, 
	run.name = NULL,
	start.stage = c('alignment', 'qc', 'calling', 'annotation', 'merging'), 
	variant.callers = NULL, 
	proton = FALSE, 
	quiet = FALSE,
	email = NULL,
	verify.options = !quiet,
	save.specification.files = !quiet
	) { 
  
  logo <- c("====================================================================",
            "`7MMF'   `7MF'                 db MMP\"\"MM\"\"YMM   db       .M\"\"\"bgd ", 
            "  `MA     ,V                      P'   MM   `7  ;MM:     ,MI    \"Y ", 
            "   VM:   ,V ,6\"Yb.  `7Mb,od8 `7MM      MM      ,V^MM.    `MMb.     ", 
            "    MM.  M'8)   MM    MM' \"'   MM      MM     ,M  `MM      `YMMNq. ", 
            "    `MM A'  ,pm9MM    MM       MM      MM     AbmmmqMA   .     `MM ", 
            "     :MM;  8M   MM    MM       MM      MM    A'     VML  Mb     dM ", 
            "      VF   `Moo9^Yo..JMML.   .JMML.  .JMML..AMA.   .AMMA.P\"Ybmmd\"  ",
            "===================================================================="
  )
  
  cat(logo, sep = '\n')
  
	start.stage <- match.arg(start.stage);
	if( !is.null(variant.callers) ) variant.callers <- tolower(variant.callers);

	### INPUT TESTS ###########################################################

	if( ( !quiet || save.specification.files) && !dir.exists(output.directory) ) {
		error.message <- paste('Directory', output.directory, 'does not exist');
		stop(error.message);
	}

	if( start.stage %in% c('alignment', 'qc', 'calling') && is.null(variant.callers) ) { 
		stop('Variant callers must be provided when starting pipeline at variant calling or earlier.');
	}

	# check that all variants are supported
	supported.callers <- c('mutect', 'vardict', 'varscan', 'lofreq', 'muse');
	if( !all(variant.callers %in% supported.callers) ) {

		unrecognized.callers <- variant.callers[ !(variant.callers %in% supported.callers ) ];
		error.message <- paste(
			'Unrecognzied variant callers:', 
			paste(unrecognized.callers, collapse = ' ')
			);

		stop(error.message);
	}

	# warn if doing alignment on proton data - should be using BAMs from machine
	if( 'alignment' == start.stage && proton ) {
		warning('Running alignment on proton sequencing data. This is probably not a good idea - using BAMs from the machine would be better');
	}

	### MAIN ##################################################################


	### SORT OUT STAGES

	# which stages should be run
	stage.order <- c('alignment', 'qc', 'calling', 'annotation', 'merging');
	start.stage.index <- match(start.stage, stage.order);

	stages.to.run <- stage.order[ seq_along(stage.order) >= start.stage.index];

	# Make sure options contain all required fields
	if( verify.options ) {
		verify.varitas.options(
			stages.to.run = stages.to.run, 
			variant.callers = variant.callers
			);
	}
	
	# No need to run QC if running alignment stage
	if ( 'alignment' %in% stages.to.run ) {
	  stages.to.run <- stages.to.run[c(1,3,4,5)]
	}

	cat('RUNNING STAGES:', paste(stages.to.run, collapse = ' '), '\n');


	# what kind of input was provided
	#	- might have to update this if adding support for multiple start points
	if( 'alignment' == start.stage ) { 
		fastq.specification <- file.details;
	} else if( 'qc' == start.stage ) { 
	  bam.specification <- file.details;
	} else if( 'calling' == start.stage ) { 
		bam.specification <- file.details;
	} else if( 'annotation' == start.stage ) { 
		vcf.specification <- file.details;
	} else if( 'merging' == start.stage ) { 
		variant.specification <- file.details;
	}
	
	# Determine if running on HPC
	HPC <- TRUE
	config <- read.yaml(save.config())
	if (config[['cluster_scheduler']] == 'none') {
	  HPC <- FALSE
	}

	### RUN PIPELINE

	# STAGE 1: ALIGNMENT
	if( 'alignment' %in% stages.to.run ) { 

		# is this the best way to run it? 
		paired.end <- FALSE;
		if( 'mates' %in% names(fastq.specification) ) {
			paired.end <- TRUE;
		}	

		bam.specification <- run.alignment(
			fastq.specification = fastq.specification, 
			output.directory = output.directory,
			paired.end = paired.end,
			job.group = 'alignment', 
			job.name.prefix = run.name,
			quiet = quiet, 
			verify.options = FALSE
			);

		if( save.specification.files ) {

			file.name <- date.stamp.file.name('bam_specification.txt');
			utils::write.table(
				bam.specification, 
				file.path(output.directory, file.name), 
				sep = '\t', 
				row.names = FALSE
				);
		}

	}

	# STAGE 2: quality control 
	if( 'qc' %in% stages.to.run ) {

		# TO DO:
		#	- add job dependencies from QC to post processing stage

		paired <- FALSE;
		if( 'normal.bam' %in% names(bam.specification) ) {
			paired <- TRUE;
		}

		bam.specification <- run.target.qc(
			bam.specification,
			project.directory = output.directory,
			job.name.prefix = run.name,
			paired = paired,
			quiet = quiet
			);

	}


	# STAGE 2: VARIANT CALLING
	if( 'calling' %in% stages.to.run ) { 

		paired <- FALSE;
		if( 'normal.bam' %in% names(bam.specification) ) {
			paired <- TRUE;
		}

		vcf.specification <- run.variant.calling(
			bam.specification = bam.specification, 
			output.directory = output.directory, 
			variant.callers = variant.callers, 
			job.name.prefix = run.name,
			paired = paired, 
			proton = proton, 
			quiet = quiet, 
			verify.options = FALSE
			);

		if( save.specification.files ) {
			
			file.name <- date.stamp.file.name('variant_vcf_specification.txt');
			utils::write.table(
				vcf.specification, 
				file.path(output.directory, file.name), 
				sep = '\t', 
				row.names = FALSE
				);

		}

	} 

	# STAGE 3: ANNOTATION
	if( 'annotation' %in% stages.to.run ) { 

		job.name.prefix <- 'annotate';

		# add name of run if it exists
		if( !is.null(run.name) && '' != run.name ) {
			job.name.prefix <- paste(run.name, job.name.prefix, sep = '_');
		}

		variant.specification <- run.annotation(
			vcf.specification = vcf.specification, 
			output.directory = output.directory, 
			job.group = 'annotation', 
			job.name.prefix = job.name.prefix,
			quiet = quiet, 
			verify.options = FALSE
			);


		if( save.specification.files ) {
			
			file.name <- date.stamp.file.name('annotated_variants_specification.txt');
			utils::write.table(
				variant.specification, 
				file.path(output.directory, file.name), 
				sep = '\t', 
				row.names = FALSE
				);

		}

	} 

	# STAGE 4: MERGING
	if( 'merging' %in% stages.to.run ) { 

		run.post.processing(
			variant.specification = variant.specification, 
			output.directory = output.directory, 
			job.name.prefix = run.name,
			quiet = quiet,
			email = email
			);
	}
	
	# RUN ALL SCRIPTS IF NOT ON HPC
  if (!HPC) {
    run.all.scripts(
      output.directory = output.directory,
      stages.to.run = stages.to.run,
      variant.callers = variant.callers,
      quiet = quiet
    )
  }

}

Try the varitas package in your browser

Any scripts or data that you put into this service are public.

varitas documentation built on Nov. 14, 2020, 1:07 a.m.