library(knitr) library(flowr)
s <- file.path(system.file(package = 'flowr'), 'pipelines/sleep_pipe.R') knitr::read_chunk(s)
Let us use the same example described in the overview section. We start by getting a set of commands we would like to run.
## wait for a few seconds… sleep 5 sleep 5 ## create two small files cat $RANDOM > tmp1 cat $RANDOM > tmp2 ## merge the two files cat tmp1 tmp2 > tmp ## check the size of the resulting file du -sh tmp
Wrap these commands into R
sleep=c('sleep 5', 'sleep 5') tmp=c('cat $RANDOM > tmp1', 'cat $RANDOM > tmp2') merge='cat tmp1 tmp2 > tmp' size='du -sh tmp'
Next, we would create a table using the above commands:
## create a table of all commands library(flowr) lst = list( sleep=sleep, create_tmp=tmp, merge=merge, size=size) flowmat = to_flowmat(lst, "samp1") kable(flowmat)
We have a few steps in a pipeline; we would use a flow definition to describe their flow. Flowr enables us to quickly create a skeleton flow definition using a flowmat, which we can then alter to suit our needs. A handy function
to_flowdef
, accepts a flowmat
and creates a flow definition.
## create a skeleton flow definition def = to_flowdef(flowmat) suppressMessages(plot_flow(def))
We can make a few changes to make this pipeline a little more efficient. Briefly, we would run a few steps in a scatter fashion (in parallel).
A few points to note:
## sleep create tmp merge size def$sub_type = c("scatter", "scatter", "serial", "serial") def$dep_type = c("none", "serial", "gather", "serial") kable(def)
suppressMessages(plot_flow(def))
Next, we create a flow object:
fobj = to_flow(flowmat, def, flowname = "sleep_pipe")
Finally, we can submit this to the cluster:
plot_flow(fobj) submit_flow(fobj) ## dry run fobj2 = submit_flow(fobj, execute = TRUE) ## submission to LSF cluster ## after submission, we can use the following: status(fobj2) ## check status rerun(fobj2) ## re-run from a intermediate step kill(fobj2) ## kill it!
We used a simple example where a single function was creating all the commands. This is easier, but a step (or module) is not re-usable in another pipeline. Thus we may write a module for each step, such that one may mix and match to create their own pipeline.
NOTE: A module, always returns a flowmat. A module may have one or several steps. A module + flowdef, becomes a pipeline.
## to follow this tutorial, you may download them: url=https://raw.githubusercontent.com/sahilseth/flowr/master/inst/pipelines cd ~/flowr/pipelines wget $url/sleep_pipe.R ## A R script, with sleep_pipe(), which creates a flowmat wget $url/sleep_pipe.def ## A tab-delimited flow definition file wget $url/sleep_pipe.conf ## An *optional* tab-delim conf file, defining default params
The sleep_pipe
calls the three other functions (modules); fetches flowmat from each, then rbinds them,
creating a larger flowmat. You may refer to the sleep_pipe.R
file for the source.
Using run
One may use run
function to create the flowmat, fetch the flowdef and execute the pipeline in a single step. Here we would focus more on each of these steps in detail.
## 1. Single step submission: fobj = run("sleep_pipe", execute = TRUE); ## 2 ## change wd, so that we can source the files downloaded in the previous step setwd("~/flowr/pipelines") ## 2a. optionally, load default parameters load_opts("sleep_pipe.conf") ## 2b. get sleep_pipe() function source("sleep_pipe.R") ## create a flowmat flowmat = sleep_pipe() ## 2c. read a flow definition. flowdef = as.flowdef("sleep_pipe.def") ## 2d. create flow and submit to cluster fobj = to_flow(flowmat, flowdef, execute = TRUE)
These are some of the practices we follow in-house. We feel using these makes stitching custom pipelines using a set of modules quite easy. Consider this a check-list of a few ideas and a work in progress.
picard_merge <- function(x, samplename = opts_flow$get("samplename"), mergedbam, java_exe = opts_flow$get("java_exe"), java_mem = opts_flow$get("java_mem"), java_tmp = opts_flow$get("java_tmp"), picard_jar = opts_flow$get("picard_jar")){ ## Make sure all args have a value (not null) ## If a variable was not defined in a conf. file opts_flow$get, will return NULL check_args() bam_list = paste("INPUT=", x, sep = "", collapse = " ") ## create a named list of commands cmds = list(merge = sprintf("%s %s -Djava.io.tmpdir=%s -jar %s MergeSamFiles %s OUTPUT=%s ASSUME_SORTED=TRUE VALIDATION_STRINGENCY=LENIENT CREATE_INDEX=true USE_THREADING=true",java_exe, java_mem, java_tmp, picard_jar, bam_list, mergedbam)) ## Create a flowmat flowmat = to_flowmat(cmds, samplename) ## return a list, flowmat AND outfiles return(list(outfiles = mergedbam, flowmat = flowmat)) }
opts_flow$get("param")
to use their value.## Example conf file: cat my.conf bwa_exe /apps/bwa/bin/bwa
check_args()
to make sure none of the default parameters are null. ## check_args(), checks ALL the arguments of the function, and throws a error. use ?check_args for more details. opts_flow$get("my_new_tool")
For example we have a pipeline consisting of alignment using bwa (aln1, aln2, sampe), fix rg tags using picard and merging the files. We would create three files:
fastq_bam_bwa.R ## A R script, with fastq_bam_bwa(), which creates a flowmat fastq_bam_bwa.conf ## An *optional* tab-delim conf file, defining default params fastq_bam_bwa.def ## A tab-delimited flow definition file
Notice how all files have the same basename; this is essential for the run function to find all these files.
Reason for using the same basename:
run("fastq_bam_bwa", ....)
it tries to look for a .R file inside flowr's package, ~/flowr/pipelines
OR your current wd.
If there are multiple matches, later is chosen. fastq_bam_bwa.conf
(if available). Further, it calls the function fastq_bam_bwa
, then stitches a flow using fastq_bam_bwa.def
as the flow definition.
can have multiple flowdefs like fastq_bam_bwa_lsf.def, fastq_bam_bwa_lsf.def etc, where
Feature:
run('fastq_bam_bwa', def = 'path/myflowdef.def'....)
run('fastq_bam_bwa', def = 'path/myflowdef.def', conf='path/myconf.conf',....)
This is quite useful for portability, since to use the same pipeline across institution/computing clusters one only needs to change the flow definition and R function remains intact.
Refer to help section on run for more details.
Here is a good example: https://github.com/flow-r/flowr/blob/master/inst/pipelines/fastq_bam_bwa.conf
(recommended for increased compatibility)
<%CPU%>
, this makes this value dynamic and is picked up by the flow definitionAdd the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.