context("dbWork: runIDs organization")
#--- Inputs
dbpath <- tempdir()
flock <- tempfile(pattern = "rSFSW2lock", tmpdir = normalizePath(dbpath))
runsN_master <- 25L
include_YN <- rep(TRUE, runsN_master)
include_YN[c(1, 10, 24:25)] <- FALSE
expN <- 4L
runsN_total <- runsN_master * expN
runIDs_total <- seq_len(runsN_total)
runIDs <- runIDs_total[rep(include_YN, times = expN)]
sim_size <- list(runsN_master = runsN_master, runsN_total = runsN_total,
expN = expN)
time_set3 <- c(50, 75, 125)
verbose <- FALSE
test_update <- function(i, dbpath, flock = NULL, verbose) {
is_inwork <- dbWork_update_job(dbpath, i, "inwork", verbose = verbose)
if (is_inwork) {
todos <- dbWork_todos(dbpath)
if (any(i == todos)) {
dbWork_update_job(dbpath, i, "failed", verbose = verbose)
} else {
dbWork_update_job(dbpath, i, "completed", time_s = .node_id,
verbose = verbose)
}
} else {
FALSE
}
}
expect_dbWork_check <- function(x, len, sum) {
expect_s3_class(x, "data.frame")
expect_identical(dim(x), c(len, 3L))
expect_identical(colnames(x), c("completed", "failed", "inwork"))
if (prod(dim(x)) > 0)
expect_equal(sum(x), sum)
invisible()
}
pretend_sim <- function(cl, runIDs, dbpath, flock, verbose) {
temp <- if (is.null(cl)) {
sapply(runIDs, test_update, dbpath = dbpath, flock = flock,
verbose = verbose)
} else {
parallel::clusterApplyLB(cl, runIDs, test_update,
dbpath = dbpath, flock = flock, verbose = verbose)
}
# ELSE IF (not error)
times <- dbWork_timing(dbpath)
if (!(length(times) == length(runIDs))) {
stop("not all runs completed or some failed")
} else {
table(times)
}
}
#--- Unit tests
test_that("dbWork: mock simulation in parallel", {
# Skip these tests on CIs and CRAN because parallel code
# will (likely) fail on CIs and on CRAN, e.g.,
# - travis on July 21, 2017: "Error in .check_ncores(length(names)) :
# 10 simultaneous processes spawned"
skip_on_cran()
skip_on_travis()
skip_on_appveyor()
# Only run these tests in parallel if not on cran, if not on travis, and if
# not on appveyor
do_parallel <- !identical(tolower(Sys.getenv("NOT_CRAN")), "false") &&
!(identical(tolower(Sys.getenv("TRAVIS")), "true") ||
identical(tolower(Sys.getenv("APPVEYOR")), "true"))
.node_id <- 0L
if (do_parallel) {
skip_if_not_installed("pkgload")
# Parallel setup
temp <- max(2L, min(10L, parallel::detectCores() - 2L))
ncores <- if (is.finite(temp)) temp else 2L
cl <- parallel::makePSOCKcluster(ncores)
temp <- parallel::clusterApply(cl, seq_len(ncores),
function(i) assign(".node_id", i, envir = globalenv()))
parallel::clusterSetRNGStream(cl, iseed = 127)
parallel::clusterCall(cl, fun = pkgload::load_all,
path = pkg_temp_dir(), reset = FALSE, quiet = TRUE)
} else {
fail("dbWork: mock simulation in parallel: cannot run in parallel!")
}
# Init
unlink(flock, recursive = TRUE)
expect_true(setup_dbWork(dbpath, sim_size, include_YN))
expect_identical(dbWork_todos(dbpath), runIDs)
#--- Error due to locked database
if (utils::packageVersion("rSFSW2") <= "2.5.5") {
# This should fail with
#Error in checkForRemoteErrors(val) :
# 100 nodes produced errors; first error: rsqlite_query_send: could not
# execute1: database is locked
expect_error(pretend_sim(cl, runIDs, dbpath, flock = NULL, verbose))
} else {
# PR #256 introduced code to prevent such failures
expect_silent(pretend_sim(cl, runIDs, dbpath, flock = NULL, verbose))
}
#--- No error expected because dbWork is run with file locking
# Init
unlink(file.path(dbpath, "dbWork.sqlite3*"))
unlink(flock, recursive = TRUE)
expect_true(setup_dbWork(dbpath, sim_size, include_YN))
expect_identical(dbWork_todos(dbpath), runIDs)
expect_s3_class(pretend_sim(cl, runIDs, dbpath, flock, verbose), "table")
#--- Clean up
if (do_parallel) {
parallel::stopCluster(cl)
}
unlink(file.path(dbpath, "dbWork.sqlite3*"))
unlink(flock, recursive = TRUE)
})
test_that("dbWork: access and manipulation functions", {
# Init
unlink(file.path(dbpath, "dbWork.sqlite3*"))
unlink(flock, recursive = TRUE)
expect_true(setup_dbWork(dbpath, sim_size, include_YN))
# Testing 'dbWork_todos' and 'dbWork_Ntodo'
expect_identical(dbWork_todos(dbpath), runIDs)
expect_identical(dbWork_Ntodo(dbpath), length(runIDs))
# Testing 'dbWork_timing'
# - expect length 0 because no runID is completed and timed
expect_length(dbWork_timing(dbpath), 0)
# - set runIDs along 'time_set3' as complete with specified timing
for (k in seq_along(time_set3)) {
expect_true(dbWork_update_job(dbpath, runIDs[k], "completed",
time_s = time_set3[k], verbose = verbose))
# - expect timings for the completed runIDs
expect_identical(dbWork_timing(dbpath), time_set3[seq_len(k)])
}
# - compare aggregated timing
timing <- dbWork_timing(dbpath)
agg_timing <- dbWork_agg_timing(dbpath)
expect_equivalent(agg_timing["mean"], mean(timing))
expect_equivalent(agg_timing["sd"], sd(timing))
expect_equivalent(agg_timing["n"], length(timing))
# Testing 'dbWork_report_completion'
expect_equivalent(dbWork_report_completion(dbpath),
100 * length(timing) / length(runIDs))
# Testing 'dbWork_check_run' (part 1 of 2)
temp <- dbWork_check_run(dbpath, runIDs = runIDs[seq_along(time_set3)])
expect_dbWork_check(temp, length(time_set3), length(time_set3))
# Testing 'dbWork_redo'
# - incorrect runIDs arguments doesn't change dbWork
expect_true(dbWork_redo(dbpath, runIDs = c(NULL, numeric(), -Inf, Inf, NA,
NaN, FALSE, TRUE, "a", -1, runsN_total + 1)))
expect_identical(dbWork_todos(dbpath), runIDs[-seq_along(time_set3)])
# - attempt to reset runIDs which haven't completed yet
expect_true(dbWork_redo(dbpath, runIDs = runIDs[-seq_along(time_set3)]))
expect_identical(dbWork_todos(dbpath), runIDs[-seq_along(time_set3)])
# - reset previously set runIDs along 'time_set3'
expect_true(dbWork_redo(dbpath, runIDs = runIDs[seq_along(time_set3)]))
expect_identical(dbWork_todos(dbpath), runIDs)
# Testing 'dbWork_check_run' (part 2)
temp <- dbWork_check_run(dbpath, runIDs = runIDs[seq_along(time_set3)])
expect_dbWork_check(temp, length(time_set3), 0L)
# - incorrect runIDs arguments returns a 0-row data.frame
temp <- dbWork_check_run(dbpath, runIDs = c(NULL, numeric(), -Inf, Inf, NA,
NaN, FALSE, TRUE, "a", -1, runsN_total + 1))
expect_dbWork_check(temp, 0L, 0L)
# Testing 'dbWork_clean'
expect_identical(dbWork_todos(dbpath), runIDs)
expect_true(dbWork_clean(dbpath))
expect_identical(dbWork_todos(dbpath), runIDs)
})
#--- Clean up
# remove "dbWork.sqlite3", "dbWork.sqlite3-shm", "dbWork.sqlite3-wal"
unlink(file.path(dbpath, "dbWork.sqlite3*"))
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.