#library(flowr)
#system("env")
#set_opts(verbose = TRUE)
#opts_flow$get()
verbose = FALSE
opts_flow$set(verbose = 0)
context("\n\nTesting running a pipeline from scratch")
pip = fetch_pipes("sleep_pipe", silent = TRUE)[1,]
if(verbose) print(pip)
test_that("check return of fetch_pipes", {
expect_equal(length(pip), 4)
expect_identical(class(pip), "data.frame")
})
if(verbose) message("class: ", class(pip$pipe))
if(verbose) message("file ", pip$pipe, " exists: ", file.exists(pip$pipe))
# ----- flowmat ---------
context("Test creation of flowmat")
source(pip$pipe)
out = sleep_pipe(x = 3, "sample1")
test_that("test output of sleep_pipe", {
expect_equal(length(out), 2)
expect_identical(class(out), "list")
expect_is(out$flowmat, 'flowmat')
expect_is(check(out$flowmat), 'flowmat')
})
# ----- flowdef ---------
context("Test creation of flowdef")
## --- specify the resource requirement
def = to_flowdef(out$flowmat,
platform = "lsf", queue = "short" ,
memory_reserved = "16384", ## LSF wants more than 16GB
cpu_reserved = 1, walltime = "00:59")
test_that("test creation of flowdef", {
expect_is(def, "flowdef")
expect_is(def, "data.frame")
expect_equal(nrow(def), 4)
expect_equal(ncol(def), 11)
})
# ----- test creation of flowdef ---------
context("Test creation of flow")
rm_col <- function(x, nm){
col = which(colnames(x) %in% nm)
x[, -col]
}
wd = getwd()
context(paste("using dir, ", getwd()))
fobj = to_flow(out$flowmat, def = def,
flowname = "sleep_pipe",
flow_run_path = getwd())
test_that("test creation of flow, works", {
expect_is(fobj, "flow")
})
test_that("test creation of flow, fails; when it should", {
## remove a row
expect_error(to_flow(out$flowmat, def = def[-1,]), "extra jobs in prev_jobs")
expect_error(to_flow(out$flowmat, def = def[,-1]), "flowdef needs these columns")
expect_error(to_flow(out$flowmat, def = def[,-2]), "flowdef needs these columns")
expect_error(to_flow(out$flowmat, def = def[,-3]), "flowdef needs these columns")
expect_error(
to_flow(out$flowmat, def = rm_col(def, "cpu_reserved"))
, "flowdef: missing resource columns")
expect_error(to_flow(out$flowmat, def = rm_col(def, "memory_reserved"))
, "flowdef: missing resource columns")
def2=def;def2$dep_type[1] = "NA"
expect_error(to_flow(out$flowmat, def = def2), "dep_type: invalid")
def2=def;def2$sub_type[1] = "NA"
expect_error(to_flow(out$flowmat, def = def2), "sub_type: invalid")
})
context("Test flowr on multiple platform, including creation, status, rerun and kill")
plats = c("lsf", "sge", "torque")
for(plat in plats){
qobj <- queue(platform = plat, submit_exe = "echo")
#set_opts(verbose = 0)
fobj = to_flow(out$flowmat, def = def,
flowname = "sleep_pipe",
flow_run_path = getwd(),
qobj = qobj)
expect_is(fobj, "flow")
expect_equal(qobj@submit_exe, "echo")
expect_equal(fobj@jobs[[1]]@submit_exe, "echo")
fobj_dry = suppressMessages(submit_flow(fobj))
#expect_warning(suppressMessages(submit_flow(fobj, execute = TRUE)), "Unable to parse JOB IDs")
## fake submit
fobj_sub = fobj_dry
fobj_sub@status = "submitted"
test_that("test fobj is correct", {
expect_is(fobj, "flow")
expect_is(fobj_dry, "flow")
expect_equal(fobj_dry@status, "dry-run")
## create flowdet
expect_is(to_flowdet(fobj_dry), "data.frame")
#expect_error(to_flowdet(fobj), "fobj: no execution details")
})
test_that("flow is killable", {
## killing
expect_error(kill(fobj), "This flow was has not executed, nothing to kill.")
})
test_that("flow is re-runnable", {
skip("rerunning")
## ----- re-running
expect_error(rerun(fobj), "fobj: not submitted yet")
expect_error(rerun(fobj, start_from = "create_tmp"), "fobj: not submitted yet")
## rerun the fake submit
expect_error(rerun(fobj_sub), "start_from, select, ignore: missing")
## check rerun output
fobj_re = suppressMessages(rerun(fobj_sub, start_from = "create_tmp", kill = FALSE, execute = FALSE))
#suppressMessages(rerun(fobj2, start_from = "create_tmp", kill = TRUE, qobj = qobj)), "kill"
fobj_re = suppressMessages(rerun(fobj_sub, start_from = "create_tmp", kill = FALSE, qobj = qobj))
## check the flow
expect_is(fobj_re, "flow")
})
}
# cleanup after
context("cleaning up dirs")
rm_dirs = list.files(path = wd, pattern = "sleep_pipe", full.names = TRUE)
message(rm_dirs)
unlink(rm_dirs, recursive = TRUE)
## add re-run test, specifying without start_from, with select and ignore: FAILS
## specify none: fails
## count jobs after select, ignore and start_from, and match them to expectation.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.