Nothing
#library(flowr)
#system("env")
#set_opts(verbose = TRUE)
#opts_flow$get()
verbose = FALSE
opts_flow$set(verbose = 0)
context("\n\nTesting running a pipeline from scratch")
pip = fetch_pipes("sleep_pipe", silent = TRUE)[1,]
if(verbose) print(pip)
test_that("check return of fetch_pipes", {
expect_equal(length(pip), 4)
expect_identical(class(pip), "data.frame")
})
if(verbose) message("class: ", class(pip$pipe))
if(verbose) message("file ", pip$pipe, " exists: ", file.exists(pip$pipe))
# ----- flowmat ---------
context("Test creation of flowmat")
source(pip$pipe)
out = sleep_pipe(x = 3, "sample1")
test_that("test output of sleep_pipe", {
expect_equal(length(out), 2)
expect_identical(class(out), "list")
expect_is(out$flowmat, 'flowmat')
expect_is(check(out$flowmat), 'flowmat')
})
# ----- flowdef ---------
context("Test creation of flowdef")
## --- specify the resource requirement
def = to_flowdef(out$flowmat,
platform = "lsf", queue = "short" ,
memory_reserved = "16384", ## LSF wants more than 16GB
cpu_reserved = 1, walltime = "00:59")
test_that("test creation of flowdef", {
expect_is(def, "flowdef")
expect_is(def, "data.frame")
expect_equal(nrow(def), 4)
expect_equal(ncol(def), 11)
})
# ----- test creation of flowdef ---------
context("Test creation of flow")
rm_col <- function(x, nm){
col = which(colnames(x) %in% nm)
x[, -col]
}
wd = getwd()
context(paste("using dir, ", getwd()))
fobj = to_flow(out$flowmat, def = def,
flowname = "sleep_pipe",
flow_run_path = getwd())
test_that("test creation of flow, works", {
expect_is(fobj, "flow")
})
test_that("test creation of flow, fails; when it should", {
## remove a row
expect_error(to_flow(out$flowmat, def = def[-1,]), "extra jobs in prev_jobs")
expect_error(to_flow(out$flowmat, def = def[,-1]), "flowdef needs these columns")
expect_error(to_flow(out$flowmat, def = def[,-2]), "flowdef needs these columns")
expect_error(to_flow(out$flowmat, def = def[,-3]), "flowdef needs these columns")
expect_error(
to_flow(out$flowmat, def = rm_col(def, "cpu_reserved"))
, "flowdef: missing resource columns")
expect_error(to_flow(out$flowmat, def = rm_col(def, "memory_reserved"))
, "flowdef: missing resource columns")
def2=def;def2$dep_type[1] = "NA"
expect_error(to_flow(out$flowmat, def = def2), "dep_type: invalid")
def2=def;def2$sub_type[1] = "NA"
expect_error(to_flow(out$flowmat, def = def2), "sub_type: invalid")
})
context("Test flowr on multiple platform, including creation, status, rerun and kill")
plats = c("lsf", "sge", "torque")
for(plat in plats){
qobj <- queue(platform = plat, submit_exe = "echo")
#set_opts(verbose = 0)
fobj = to_flow(out$flowmat, def = def,
flowname = "sleep_pipe",
flow_run_path = getwd(),
qobj = qobj)
expect_is(fobj, "flow")
expect_equal(qobj@submit_exe, "echo")
expect_equal(fobj@jobs[[1]]@submit_exe, "echo")
fobj_dry = suppressMessages(submit_flow(fobj))
#expect_warning(suppressMessages(submit_flow(fobj, execute = TRUE)), "Unable to parse JOB IDs")
## fake submit
fobj_sub = fobj_dry
fobj_sub@status = "submitted"
test_that("test fobj is correct", {
expect_is(fobj, "flow")
expect_is(fobj_dry, "flow")
expect_equal(fobj_dry@status, "dry-run")
## create flowdet
expect_is(to_flowdet(fobj_dry), "data.frame")
#expect_error(to_flowdet(fobj), "fobj: no execution details")
})
test_that("flow is killable", {
## killing
expect_error(kill(fobj), "This flow was has not executed, nothing to kill.")
})
test_that("flow is re-runnable", {
skip("rerunning")
## ----- re-running
expect_error(rerun(fobj), "fobj: not submitted yet")
expect_error(rerun(fobj, start_from = "create_tmp"), "fobj: not submitted yet")
## rerun the fake submit
expect_error(rerun(fobj_sub), "start_from, select, ignore: missing")
## check rerun output
fobj_re = suppressMessages(rerun(fobj_sub, start_from = "create_tmp", kill = FALSE, execute = FALSE))
#suppressMessages(rerun(fobj2, start_from = "create_tmp", kill = TRUE, qobj = qobj)), "kill"
fobj_re = suppressMessages(rerun(fobj_sub, start_from = "create_tmp", kill = FALSE, qobj = qobj))
## check the flow
expect_is(fobj_re, "flow")
})
}
# cleanup after
context("cleaning up dirs")
rm_dirs = list.files(path = wd, pattern = "sleep_pipe", full.names = TRUE)
message(rm_dirs)
unlink(rm_dirs, recursive = TRUE)
## add re-run test, specifying without start_from, with select and ignore: FAILS
## specify none: fails
## count jobs after select, ignore and start_from, and match them to expectation.
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.