context("task_bundle")
test_that("simple", {
test_cleanup()
on.exit(test_cleanup())
existing <- queues()
## expect_that(existing, equals(character(0)))
obj <- queue("tmpjobs", sources="myfuns.R")
group <- "mygroup"
x <- obj$task_bundle_get(group)
expect_that(x, is_a("task_bundle"))
expect_that(x$ids(), equals(character(0)))
expect_that(x$groups, equals(group))
expect_that(x$update_groups(), equals(character(0)))
expect_that(x$results(), equals(empty_named_list()))
expect_that(x$wait(), equals(empty_named_list()))
expect_that(as.list(x$overview()),
equals(list(PENDING=0, RUNNING=0, COMPLETE=0, ERROR=0)))
## Queue up a job:
t <- obj$enqueue(sin(1), group=group)
expect_that(x$update_groups(), equals(t$id))
expect_that(x$update_groups(), equals(character(0)))
ids <- t$id
for (i in 1:3) {
t <- obj$enqueue(sin(1), group=group)
ids <- c(ids, t$id)
}
expect_that(x$update_groups(), equals(ids[-1]))
expect_that(x$update_groups(), equals(character(0)))
expect_that(x$ids(), equals(ids))
expect_that(x$status(),
equals(setNames(rep(TASK_PENDING, length(ids)), ids)))
expect_that(x$results(), throws_error("Tasks not yet completed"))
expect_that(x$wait(0), throws_error("Tasks not yet completed"))
expect_that(x$wait(1), throws_error("Exceeded maximum time"))
expect_that(x$status(),
equals(setNames(rep(TASK_PENDING, length(ids)), ids)))
## Start a worker:
logfile <- "worker.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile)
Sys.sleep(.5)
expect_that(x$status(),
equals(setNames(rep(TASK_COMPLETE, length(ids)), ids)))
cmp <- setNames(rep(list(sin(1)), length(ids)), ids)
expect_that(x$results(), equals(cmp))
## Set some names and watch them come out too:
x$names <- letters[1:4]
x$results()
cmp <- setNames(rep(list(sin(1)), length(ids)), letters[1:4])
expect_that(x$results(), equals(cmp))
## Add an new task:
t <- obj$enqueue(sin(1), group=group)
Sys.sleep(.5)
## Nothing has changed in the bundle:
expect_that(x$results(), equals(cmp))
id <- x$update_groups()
expect_that(id, equals(t$id))
ids <- c(ids, id)
## Names have been removed:
expect_that(x$names, equals(NULL))
cmp <- setNames(rep(list(sin(1)), length(ids)), ids)
expect_that(x$results(), equals(cmp))
t <- obj$enqueue(slowdouble(2), group=group)
x$update_groups()
st <- setNames(rep(c(TASK_COMPLETE, TASK_RUNNING), c(length(ids), 1)),
c(ids, t$id))
Sys.sleep(.2)
expect_that(x$status(), equals(st))
r <- x$wait(3)
cmp <- c(cmp, setNames(list(4), t$id))
expect_that(r, equals(cmp))
expect_that(x$status(),
equals(setNames(rep(TASK_COMPLETE, length(r)), names(r))))
expect_that(x$wait1(1), equals(NULL))
expect_that(x$wait1(1), takes_less_than(1))
t1 <- obj$enqueue(slowdouble(1), group=group)
t2 <- obj$enqueue(slowdouble(2), group=group)
x$update_groups()
res <- x$wait1(60)
expect_that(res[[1]], equals(t1$id))
expect_that(res[[2]], equals(2))
res <- x$wait1(60)
expect_that(res[[1]], equals(t2$id))
expect_that(res[[2]], equals(4))
expect_that(x$wait1(), is_null())
t3 <- obj$enqueue(slowdouble(3), group=group)
x$update_groups()
res0 <- x$wait1(1)
expect_that(res0, is_null())
res <- x$wait1(10)
expect_that(res, equals(list(id=t3$id, result=t3$result())))
expect_that(as.list(x$overview()),
equals(list(PENDING=0, RUNNING=0, COMPLETE=9, ERROR=0)))
## Yeah, this is not going to work.
xt <- x$times()
expect_that(xt, is_a("data.frame"))
cols <- c("task_id", "submitted", "started", "finished",
"waiting", "running", "idle")
expect_that(names(xt), equals(cols))
cols <- setdiff(cols, "idle")
expect_that(xt[cols], equals(obj$tasks_times(x$ids())[cols]))
## Get the bundle again:
y <- obj$task_bundle_get(group)
expect_that(y$ids(), equals(x$ids()))
expect_that(y$status(), equals(x$status()))
expect_that(y$results(), equals(x$results()))
obj$stop_workers(wid)
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.