flowr

library(knitr)
library(flowr)

Get started

library(flowr)
setup()

This will copy the flowr helper script to ~/bin. Please make sure that this folder is in your $PATH variable. For more details refer to setup's help section.

#We have a quite handy command-line-interface for flowr, which exposes all functions of the package to terminal. Such that we dont have to open a interactive R session each time. To make this work, run a setup function which copies the 'flowr' helper script to your `~/bin` directory.

Running flowr from the terminal will fetch you the following:

Usage: flowr function [arguments]

status          Detailed status of a flow(s).
rerun           rerun a previously failed flow
kill            Kill the flow, upon providing working directory
fetch_pipes     Checking what modules and pipelines are available; flowr fetch_pipes

Please use 'flowr -h function' to obtain further information about the usage of a specific function.

Toy example

library(DiagrammeR)
mermaid("
graph LR
A(sleep)-->B(create_few_files) 
B-->C{merge them}
C-->D[get size]
")

ex = file.path(system.file(package = "flowr"), "pipelines")
flow_mat = as.flowmat(file.path(ex, "sleep_pipe.tsv"))
flow_def = as.flowdef(file.path(ex, "sleep_pipe.def"))

Consider, a simple example where we have three instances of linux's sleep command. After its completion three tmp files are created with some random data. Then, a merging step follows, combining the tmp files into one big file. Next, we use du to calculate the size of the merged file.

To create this flow in flowr, we need the actual commands to run; and a set of instructions regarding how to stitch the individual steps into a coherent pipeline.

Here is a table with the commands we would like to run ( or flow mat ).

kable(flow_mat)

Further, we use an additional file specifying the relationship between the steps, and also other resource requirements: flow_def.

kable(flow_def)

Stitch it

We use the two files described above and stitch them to create a flow object (which contains all the information we need for cluster submission).

ex = file.path(system.file(package = "flowr"), "pipelines")
flowmat = as.flowmat(file.path(ex, "sleep_pipe.tsv"))
flowdef = as.flowdef(file.path(ex, "sleep_pipe.def"))

fobj <- to_flow(x = flowmat, 
                 def = flowdef,
                 flowname = "example1", ## give it a name
                 platform = "lsf")      ## override platform mentioned in flow def

Refer to to_flow's help section for more details.

Plot it

We can use plot_flow to quickly visualize the flow; this really helps when developing complex workflows.

plot_flow(fobj)     # ?plot_flow for more information
plot_flow(flowdef) # plot_flow works on flow definition as well

Refer to plot_flow's help section for more details.

Dry Run

submit_flow(fobj)
Test Successful!
You may check this folder for consistency. Also you may re-run submit with execute=TRUE
 ~/flowr/sleep_pipe-20150520-15-18-27-5mSd32G0

Submit it

Once, we have a flow we can submit it to the cluster using submit_flow

submit_flow(fobj, execute = TRUE)
Flow has been submitted. Track it from terminal using:
flowr status x=~/flowr/type1-20150520-15-18-46-sySOzZnE

Refer to submit_flow's help section for more details.

Check its status

Next, you may use status to monitor the status of a flow.

flowr status x=~/flowr/runs/sleep_pipe-20150520*

|          | total| started| completed| exit_status|    status|
|:---------|-----:|-------:|---------:|-----------:|---------:|
|001.sleep |    10|      10|        10|           0| completed|
|002.tmp   |    10|      10|        10|           0| completed|
|003.merge |     1|       1|         1|           0| completed|
|004.size  |     1|       1|         1|           0| completed|

Notice, how we skipped specifying the complete path. Status would try to use the basename and show status of any folder it can match. If there are multiple matched, status would show a summary of each.

Alternatively, to check a summarized status of several flows, use the parent folder. In this case the parent folder has 3 flows, and here is the summary:

flowr status x=~/flowr/runs

Showing status of: ~/flowr/runs
|          | total| started| completed| exit_status|    status|
|:---------|-----:|-------:|---------:|-----------:|---------:|
|001.sleep |    30|      30|        10|           0|processing|
|002.tmp   |    30|      30|        10|           0|processing|
|003.merge |     3|       3|         1|           0|   pending|
|004.size  |     3|       3|         1|           0|   pending|

Refer to status's help section for more details.

Kill it

Incase something goes wrong, one may use to kill command to terminate all the relating jobs of a single flow OR multiple flows.

kill one flow:

flowr kill_flow x=flow_wd
flowr kill x='~/flowr/runs/sleep_pipe'
found multiple wds:
  ~/flowr/runs/sleep_pipe-20150825-16-24-04-0Lv1PbpI
  ~/flowr/runs/sleep_pipe-20150825-17-47-52-5vFIkrMD
Really kill all of them ? kill again with force=TRUE

To kill multiple flow, set force=TRUE:

kill x='~/flowr/runs/sleep_pipe*' force = TRUE

Refer to kill's help section for more details.

Re-run a flow

flowr also enables you to re-run a pipeline in case of hardware or software failures.

Refer to rerun's help section for more details.

Input files

An easy and quick way to build a workflow is to create a set of two tab delimited files. First is a table with commands to run (for each step of the pipeline), while second has details regarding how the modules are stitched together. In the rest of this document we would refer to them as flow_mat and flow_def respectively (as introduced in the previous sections).

Let us read in examples of both these files to understand their structure.

ex = file.path(system.file(package = "flowr"), "pipelines")
flow_mat = as.flowmat(file.path(ex, "sleep_pipe.tsv"))
flow_def = as.flowdef(file.path(ex, "sleep_pipe.def"))
#flow_def = read_sheet(file.path(exdata, "example1_flow_def2.txt"))
fobj = suppressMessages(to_flow(flow_mat, def = flow_def, platform = "torque"))
fobj@jobs[[1]]@nodes
#debug(submit_flow)
fobj = submit_flow(fobj)

1. Flow matrix

Essentially, this is a tab delimited file with three columns:

Here is an example flow_mat for the flowr described above.

kable(subset(flow_mat, samplename == "sample1"))

2. Flow definition

It is a tab separated file, with a minimum of 4 columns:

These would be explained in detail, below.

Apart from the above described variables, several others defining the resource requirements of each step are also available. These give great amount of flexibility to the user in choosing CPU, wall time, memory and queue for each step (and are passed along to the HPCC platform).

Most cluster platforms accept these resource arguments. Essentially a file like this is used as a template, and variables defined in curly braces ( ex. {{{CPU}}} ) are filled up using the flow definition file.

Here is an example of a typical flow_def file.

kable(head(flow_def))

Example:

Let us use an example flow, to understand submission and dependency types.

Consider three steps A, B and C, where A has 10 commands from A1 to A10, similarly B has 10 commands B1 through B10 and C has a single command, C1. Consider another step D (with D1-D3), which comes after C.

step:       A   ----> B  -----> C -----> D
# of cmds  10        10         1        3

Submission types

This refers to the sub_type column in flow definition.

Dependency types

This refers to the dep_type column in flow definition.

Relationships

Using the above submission and dependency types one can create several types of relationships between former and later jobs. Here are a few examples of relationships one may typically use.

One to One (serial)

                A1 --------> B1
                A2 --------> B1
                .. --------> ..
               A10 --------> B10
 dependency submission  dependency submission   
    none     scatter      serial     scatter
                 relationship
                  ONE-to-ONE

Relationship between steps A and B is best defined as serial. Step A (A1 through A10) is submitted as scatter. Further, $i^th$ jobs of B depends on $i^th$ jobs of A. i.e. B1 requires A1 to complete; B2 requires A2 and so on. Also, we note that defining dependency as serial, makes sure that B does not wait for all elements of A to complete.

qobj <- queue(platform = "lsf", queue = "normal", verbose = FALSE)
A <- job(name = "A", cmds = "sleep1", q_obj = qobj, 
                 submission_type = "scatter")
B <- job(name = "B", cmds = "sleep2", q_obj = qobj,
                 previous_job = "A", 
                 dependency_type = "serial", submission_type = "scatter")
C <- job(name = "C", cmds = "sleep2", q_obj = qobj,
                 previous_job = "B", 
                 dependency_type = "gather", submission_type = "serial")
D <- job(name = "D", cmds = "sleep2", q_obj = qobj,
                 previous_job = "C", 
                 dependency_type = "burst", submission_type = "scatter")

pab <- plot_flow(flow(jobs = list(A, B)))

Many to One (gather)

                B1 ----\ 
                B2 -----\
                ..        -----> C1
                B9 ------/
                B10-----/
 dependency submission  dependency submission   
    serial     scatter    gather     serial
                 relationship
                  MANY-to-ONE

Since C is a single command which requires all steps of B to complete, intuitively it needs to gather pieces of data generated by B. In this case dep_type would be gather and sub_type type would be serial since it is a single command.

pbc <- plot_flow(flow(jobs = list(B, C)))

One to Many (Burst)

                     /-----> D1
                C1 --------> D2
                     \-----> D3
 dependency submission  dependency submission   
    gather   serial       burst     scatter
                 relationship
                  ONE-to-MANY

Further, D is a set of three commands (D1-D3), which need to wait for a single process (C1) to complete. They would be submitted as scatter after waiting on C in a burst type dependency.

pcd <- plot_flow(flow(jobs = list(C, D)))

In essence, an example flow_def would look like as follows (with additional resource requirements not shown for brevity):

ex2def = as.flowdef(file.path(ex, "abcd.def"))
ex2mat = as.flowmat(file.path(ex, "abcd.tsv"))
kable(ex2def[, 1:4])
plot_flow(ex2def)
# Available Pipelines

Here are some of the available pipelines along with their respective locations

pipes = try(fetch_pipes(silent = TRUE))
#message(pipes)
if(class(pipes) != "try-error")
    if(nrow(pipes) > 0){
        pipes$pipe = basename(pipes$pipe)
        pipes$def = basename(pipes$def)
        pipes$conf = basename(pipes$conf)
        params::kable(pipes)
    }

Cluster Support

As of now we have tested this on the following clusters:

#exdata = file.path(system.file(package = "flowr"), "extdata")
plat <- params::read_sheet("files/platforms_supported.txt", id_column = "Platform")
kable(plat)

For more details, refer to the configuration section



Try the flowr package in your browser

Any scripts or data that you put into this service are public.

flowr documentation built on March 3, 2021, 1:12 a.m.