wd = "/rsrch2/iacs/ngs_runs/1412_tcga_normals/ESCA/logs/esca-2015-03-09-00-05-36-tfhaScFP/"
wd = "/rsrch2/iacs/ngs_runs/1412_tcga_normals/BRCA/logs/brca-2015-02-17-12-42-32-MCscE2AW"
#' @title
#' Re-run a pipeline in case of hardware or software failures.
#' @description
#' \itemize{
#' \item \strong{hardware} no change required, simple rerun: \code{rerun(x=flow_wd)}
#' \item \strong{software} either a change to flowmat or flowdef has been made: \code{rerun(x=flow_wd, mat = new_flowmat, def = new_flowdef)}
#' }
#' \strong{NOTE:}
#' \emph{flow_wd}: flow working directory, same input as used for \link{status}
#' @param x flow working directory
#' @param execute [logical] whether to execute or not
#' @param start_from (required) which job to start from, this is a job name.
#' @param select (optional) select a subset of jobs to rerun [character vector]
#' @param ignore (optional) ignore a subset of jobs to rerun [character vector]
#' @param mat (optional) flowmat fetched from previous submission if missing. For more information regarding the format refer to \link{to_flowmat}
#' @param def (optional) flowdef fetched from previous submission if missing. For more information regarding the format refer to \link{to_flowdef}
#' @param kill (optional) logical indicating whether to kill the jobs from the previous execution of flow.
#' @inheritParams to_flow
#' @param samplename (optional) If flowmat contains multiple samples, provide the samplename,
#' corresponding to the flow working directory provided.
#' @param ... passed onto to_flow
#' @details
#' This function fetches details regarding the previous execution from the flow working directory (flow_wd).
#' It reads the \link{flow} object from the flow_details.rds file, and extracts flowdef and flowmat from it
#' using \link{to_flowmat} and \link{to_flowdef} functions.
#' \strong{Using new flowmat OR flowdef for re-run}:
#' Optionally, if either of flowmat or flowdef are supplied; supplied ones are used instead of those
#' extracted from previous submission.
#' This functions efficiently updates job details of the latest submission into the previous file; thus information
#' regarding previous job ids and their status is not lost.
#' @export
#' @examples \dontrun{
#' #
#' rerun(wd = wd, fobj = fobj, execute = TRUE, kill = TRUE)
#' }
rerun <- function(x, ...) {
if(opts_flow$get("verbose") > 1)
message("rerun: input x is ", class(x))
#' @rdname rerun
#' @export
rerun.character <- function(x, ...){
message("> input looks like a path, seeing if multiple paths match ...")
wds = get_wds(x)
tmp <- lapply(wds, function(wd){
message("> reading flow_details.rds from: ", wd)
wd = file_path_as_absolute(wd)
fobj <- read_fobj(wd)
stop("x does not seems to be a correct path to the flow submission, missing flow_details.rds")
# assume platform of the last job is the final platform
f.platform = tail(fobj@jobs, 1)[[1]]@platform
if(f.platform == "local" & fobj@status == "created")
stop("Currently rerun of killed or processing local jobs is not supported.")
args = list(...)
if(any(names(args) %in% c("flowmat", "flowdef", "flow_mat", "flow_def")))
stop("some arguments not recognized\n",
"Valid arguments for rerun are: mat and def, for flow matrix and flow definition respectively.")
# updating this with the new path
fobj@flow_run_path = dirname(wd)
fobj@flow_path = wd
rerun(fobj, ...)
#' @rdname rerun
#' @importFrom utils capture.output
#' @importFrom params kable
#' @export
rerun.flow <- function(x, mat, def,
execute = TRUE,
kill = TRUE,
verbose = opts_flow$get("verbose"),
fobj = x
wd = fobj@flow_path
assert_version(fobj, '')
assert_status(fobj, "submitted")
## converting missing to NA, so that passing to subsequent
## steps is easier
if( &[1]) &[1])){
stop("start_from, select, ignore: missing\n",
"Detection of failure point is currently not supported. ",
"Please mention what steps need to be re-run.\n",
"Use start_from=<jobname>\n",
"OR select=<jobnames> OR ignore=<jobnames>")
#start_from = detect_redo()
if( ![1]) & ![1]) )
stop("both ignore and select specified\n",
"Either specify jobs to re-run using select OR ",
"jobs to be ignored using re-run, not both")
#stop("Please metion where to start from. Detection currently no supported")
message("\n> extracting flow definition from previous run.")
def = to_flowdef(fobj)
message("\nReading flow definition supplied.")
def = as.flowdef(def) ## as jobids now !
message("> Extracting flow mat (shell cmds) from previous run.")
message("> Hope the reason for previous failure was fixed...")
mat = to_flowmat(fobj)
mat = as.flowmat(mat)
message("> subsetting for sample: ",
samplename, " starting with rows ", nrow(mat))
samp = samplename
mat = subset(mat, mat$samplename == samp)
message("--> now we have: ", nrow(mat), " rows")
if(nrow(mat) == 0)
stop("--> no jobs left after subsetting ...")
}else if(missing(samplename) & unique(mat$samplename) > 1){
message("multiple samplenames present in flowmat, subsetting")
samplename = get_samplename(fobj)
samp = samplename
mat = subset(mat, mat$samplename == samp)
message("--> now we have: ", nrow(mat), " rows")
if(nrow(mat) == 0)
stop("--> no jobs left after subsetting ...")
message("\n> subsetting... get steps to re-run:")
mat = subset_fmat(fobj = fobj, mat = mat, start_from = start_from, select, ignore)
def = subset_fdef(fobj = fobj, def = def, start_from = start_from, select, ignore)
message(paste("--> ", def$jobname, collapse = "\n"), "\n")
## reset few things before we start the new flow
## kill the flow
## remove trigger files
det = to_flowdet(fobj)
newdet = subset_fdet(fobj, det, start_from = start_from, select, ignore)
newdet = file.remove(newdet$trigger)
## jobname, has ids as well.
# flow_run_path: does not matter, since in the next step we would use flow_path
fobj2 <- to_flow(x = mat, def = def,
flowname = fobj@name,
flow_run_path = fobj@flow_run_path,
module_cmds = fobj@module_cmds,
fobj2@status = "rerun"
fobj2 <- submit_flow(fobj2,
uuid = fobj@flow_path,
execute = execute,
dump = FALSE)
## -- need a function to read and update the old flow object with new job submission ids
fobj = update.flow(fobj, child = fobj2)
## new flowdet of the new flow
flowdet = to_flowdet(fobj)
write_flow_details(wd, fobj, flow_det = flowdet)
update.flow <- function(x, child){
child_jobs = jobnames(child)
## --- for each job in child update ids
## updat the whole job, and not just IDs
for(j in child_jobs){
#x@jobs[[j]]@id = child@jobs[[j]]@id
x@jobs[[j]] = child@jobs[[j]]
#' @importFrom params read_sheet
detect_redo <- function(fobj, wd){
## get detail file
det_file = to_flowdet(wd)
get_status(x = wd)
## subset those which need to be rerun
flow_status = read_sheet(det_file)
#plot_flow(x = fobj, pdf = TRUE)
## ingest everything in the fobj !
mods = unique(as.character(flow_status$jobnm))
fobj2 = fobj
fobj2@status = "" ## reset flow status, will be submitted as a independent flow
for(m in mods){
fobj@jobs[[m]]@exit_code = subset(flow_status, flow_status$jobnm == m)$exit_code
## if we do not know the exit code, we redo
redo = !fobj@jobs[[m]]@exit_code == 0;redo = ifelse(, TRUE, redo)
## need to subset CMDS,
fobj2@jobs[[m]]@cmds = fobj2@jobs[[m]]@cmds[ redo ]
fobj2@jobs[[m]]@dependency = list() ## dependent job ids
fobj2@jobs[[m]]@id = vector(mode = "character")
fobj2@jobs[[m]]@exit_code = vector(mode = "numeric")
subset_mods <- function(fobj, start_from, select, ignore){
mods = names(fobj@jobs)
## subset jobs using, start from, ignore and select
if(!missing(start_from) & !all( )
mods = mods[which(mods == start_from):length(mods)]
##mods = mods[which(grepl(start_from, mods)):length(mods)]
if(!missing(select) & !all(
mods = mods[mods %in% select]
if(!missing(ignore) & !all(
mods = mods[!mods %in% ignore]
# subset_fmat
# @param mat a part of flowmat
# @param fobj flow object
#' @inheritParams rerun
subset_fmat <- function(fobj, mat, start_from, select, ignore){
mods = subset_mods(fobj, start_from, select, ignore)
## get mat
mat = subset(mat, mat$jobname %in% mods)
## subset and get jobs which failed
#rerun = cbind(module = mods, rerun = unlist(lapply(mods, function(m) length(fobj@jobs[[m]]@cmds))))
# subset_fdef
# @param fobj flow object
# @param def flowdef
#' @inheritParams rerun
subset_fdef <- function(fobj, def, start_from, select, ignore){
stop("Please supply a flow def file")
mods = subset_mods(fobj, start_from, select, ignore)
## get mat
def = subset(def, def$jobname %in% mods)
def$prev_jobs = ifelse(def$prev_jobs %in% mods, def$prev_jobs, "none")
def$dep_type = ifelse(def$prev_jobs %in% mods, def$dep_type, "none")
# subset flow details file
# @param fobj flow object
# @param det flowdet
#' @inheritParams rerun
subset_fdet <- function(fobj, det, start_from, select, ignore){
stop("Please supply a flow det file")
mods = subset_mods(fobj, start_from, select, ignore)
## get mat
det = subset(det, det$jobnm %in% mods)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.