Pipelining with muPipeR

February 22, 2016

Installation and set up

muPipeR can be directly installed from github

library(devtools)
install_github("MPIIComputationalEpigenetics/muPipeR")

Preliminaries

Loading the muPipeR package

library(muPipeR)
# library(devtools)
# setwd("~/BroadSVN/eclipse_workspace/muPipeR")
# load_all("muPipeR")

Setting the analysis directory

# anaDir <- "~/tmp_work/muPipeR_test"
anaDir <- file.path(tempdir(), "muPipeR_test")

Pipelines

Setting up the pipeline

Create the pipeline object

pipe <- PipR(anaDir)

Note that the analysis directory must be non-existing. Creating a PipR object will create the analysis directory with its subdirectories:

directory | description --- | --- results | Contains the results of the analysis. For each analysis step, a subdirectory will be created that contains the results of that particular step. log | Contains log files for the analysis jobs that are run. status | Contains status information on the analysis. temp | Contains temporary files.

Add an analysis step. Here, we simply create an empty input file

pipe <- addStep(pipe, 
  "create_input_01",
  "touch",
  args="${STEPDIR}/input01.txt"
)

The following variables will be replaced in argument strings

variable | description --- | --- ${STEPDIR} | result directory of the given analysis step ${STEPDIR:stepname} | result directory of a specific analysis step with name stepname ${BASEDIR} | base analysis directory of the pipeline object ${TEMPDIR} | temporary directory of the pipeline object

Add an analysis step with multiple jobs. Here, we create 5 additional empty files

pipe <- addStep(pipe, 
  "create_input_multiple",
  rep("touch", 5),
  args=lapply(LETTERS[1:5], FUN=function(x){paste0("${STEPDIR}/input", x, ".txt")})
)

The next step takes an input file and appends a string to obtain its output file. This step depends to the input step to be successfully completed. This can be specified in the parents argument.

# a shell script for appending a string to a file
appendTool <- system.file(file.path("extdata", "examples", "appendLine.sh"), package="muPipeR")
pipe <- addStep(pipe, 
  "append_01",
  appendTool,
  args=c(
    "${STEPDIR:create_input_01}/input01.txt",
    "${STEPDIR}/output01.txt",
    "'appended line 01'"
  ),
  parents="create_input_01"
)
# add a step appending a line to each of the input files
# create a list of arguments
aa <- lapply(1:5, FUN=function(i){
  c(
    paste0("${STEPDIR:create_input_multiple}/input", LETTERS[i], ".txt"),
    paste0("${STEPDIR}/output", LETTERS[i], ".txt"),
    paste0("'appended line ", LETTERS[i], "'")
  )
})
pipe <- addStep(pipe, 
  "append_multiple",
  appendTool,
  args=aa,
  parents="create_input_multiple"
)

Add an analysis step combining the output from multiple steps:

# a shell script for concatenating multiple files (2nd to nth argument)
# into one output file (1st argument)
concatFilesTool <- system.file(file.path("extdata", "examples", "concatFiles.sh"), package="muPipeR")
pipe <- addStep(pipe, 
  "join_01_ABC",
  concatFilesTool,
  args=c(
    "${STEPDIR}/concat_01ABC.txt",
    "${STEPDIR:append_01}/output01.txt",
    "${STEPDIR:append_multiple}/outputA.txt",
    "${STEPDIR:append_multiple}/outputB.txt",
    "${STEPDIR:append_multiple}/outputC.txt"
  ),
  parents=c("append_01", "append_multiple")
)

Inspecting the pipeline

Show the pipeline graph

plotGraph(pipe)

Running the pipeline

All steps in the pipeline are executed using the run command:

pipe <- run(pipe)

In the output object, the steps are marked as completed (e.g. they are green in the graph):

plotGraph(pipe)

Resetting pipeline steps

Undo/reset a pipeline step and its dependent steps:

pipe <- resetStep(pipe, "append_01")
plotGraph(pipe)

Per default this will also delete the corresponding subdirectories in the result directory. If you run the pipeline again, the reset steps will be run again:

pipe <- run(pipe)

Cleaning up

To remove temporary files and empty log files use:

cleanup(pipe)

Running jobs using CommandR and its subclasses

The CommandR class is a virtual class implementing methods for running commands along with corresponding arguments. Currently there are two child classes implemented in the package:

class | description --- | --- CommandRsystem | A class for running commands using command line calls. More specifically, the system2 function implemented in R is run to send a given combination of cammand and arguments to the command line. CommandRsge | A class for submitting commands to a compute cluster running a Sun Grid Engine (SGE). Jobs are submitted using calls to qsub. So, the executing script must be run on a machine capable of submitting jobs to the cluster.

cmdr <- CommandRsystem()

CommandR objects can also be initialized with a logging directory where the output of the commands is written to:

cmdr <- CommandRsystem(logDir=tempdir())

Note, that for CommandRsge objects, it is mandatory to specify a logging directory that can be accessed by all execution nodes in the SGE queue.

Get the logging directory:

getLogDir(cmdr)

In order to run a command with a CommandR, it has to be packaged into a Job object.

echoJob <- Job("echo", args="'Hello!'")

If you want to identify a job later, it might also be useful to name it:

echoJob <- Job("echo", args="'Hello!'", id="say_hello")

You can access the command that will be executed, it's arguments and other job details:

getId(echoJob)
getCmd(echoJob)
getArgs(echoJob)
getCallString(echoJob)

The exec method can be used to actually execute jobs:

jobRes <- exec(cmdr, echoJob)

If you want the output of a job to be stored in the result, use the result argument:

jobRes <- exec(cmdr, echoJob, result=TRUE)

The result of the execution is an object of class JobResult, that contains information on the job's output, errors and status:

getOut(jobRes)
getStatus(jobRes)
getErr(jobRes)

You can also execute an array of jobs using lexec (list execute):

echoJobList <- lapply(1:5, FUN=function(i){
  Job("echo", args=paste0("'Hello ", i, "!'"), id=paste0("say_hello_", i))
})
jobResList <- lexec(cmdr, echoJobList)

If you want to specify that a job depends on other jobs you can add them using the dependsOn argument:

echoJob2 <- Job("echo", args="'I mean: Hello, my name is HAL2000.'", 
                id="say_hello_2", dependsOn=list(echoJob))

author: Fabian Müller <fmueller@mpi-inf.mpg.de>



MPIIComputationalEpigenetics/muPipeR documentation built on Oct. 18, 2022, 2:35 a.m.