context("queue")
test_that("queue", {
test_cleanup()
on.exit(test_cleanup())
existing <- queues()
expect_that(existing, equals(character(0)))
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj, is_a("queue"))
expect_that(obj, is_a("observer"))
expect_that(obj$con, is_a("redis_api"))
expect_that(obj$queue_name, equals("tmpjobs"))
expect_that(length(obj$envirs_list()), equals(1))
expect_that(setdiff(queues(), existing), equals("tmpjobs"))
expect_that(obj$workers_list(), equals(character(0)))
expect_that(obj$workers_list(TRUE), equals(character(0)))
con <- obj$con
keys <- rrqueue_keys(obj$queue_name)
## TODO: add rrqueue version here too so we know we're speaking the
## right dialect.
## There is going to be a nasty bit here because the *files* data
## storage creates a couple of keys that are implementation
## dependent.
keys_startup <- c(keys$envirs_contents,
keys$envirs_files)
tmp <- sort(as.character(con$KEYS("tmpjobs*")))
tmp <- tmp[!grepl(paste0(keys$files, ":.*"), tmp)]
expect_that(tmp, equals(sort(keys_startup)))
dat <- obj$envirs_contents()[[obj$envir_id]]
expect_that(dat$packages, equals(NULL))
expect_that(dat$sources, equals("myfuns.R"))
expect_that(dat$source_files, equals(hash_files("myfuns.R")))
expect_that(obj$envirs_contents(obj$envir_id),
equals(setNames(list(dat), obj$envir_id)))
expect_that(obj$envirs_contents("xxx"),
throws_error("Environment 'xxx' not found"))
expect_that(obj$envirs_contents(c(obj$envir_id, "xxx")),
throws_error("Environment 'xxx' not found"))
expect_that(obj$envirs_contents(rep(obj$envir_id, 2)),
equals(rep(setNames(list(dat), obj$envir_id), 2)))
expect_that(obj$tasks_groups_list(), equals(character(0)))
## Queue two tasks:
grp <- "mygroup"
task1 <- obj$enqueue(sin(1))
task2 <- obj$enqueue(sin(2), group=grp)
expect_that(task1, is_a("task"))
expect_that(task1$id, equals("1"))
expect_that(task2$id, equals("2"))
expect_that(con$GET(keys$tasks_counter), equals("2"))
expect_that(obj$tasks_list(), equals(c("1", "2")))
expect_that(obj$tasks_status(),
equals(c("1"=TASK_PENDING, "2"=TASK_PENDING)))
expect_that(task1$expr(), equals(quote(sin(1))))
## as.list because 'table' objects are weird.
expect_that(as.list(obj$tasks_overview()),
equals(list(PENDING=2, RUNNING=0, COMPLETE=0, ERROR=0)))
expect_that(obj$tasks_envir(),
equals(c("1"=obj$envir_id, "2"=obj$envir_id)))
t <- obj$tasks_times()
expect_that(t, is_a("data.frame"))
now <- RedisAPI::redis_time_to_r(RedisAPI::redis_time(obj$con))
expect_that(t$task_id, equals(obj$tasks_list()))
expect_that(all(t$submitted <= now), is_true())
expect_that(all(t$waiting >= 0.0), is_true())
expect_that(all(is.na(t$started)), is_true())
expect_that(t$started, is_a("POSIXct"))
expect_that(names(t), equals(c("task_id",
"submitted", "started", "finished",
"waiting", "running", "idle")))
key_queue <- rrqueue_key_queue(obj$queue_name, obj$envir_id)
keys_tasks <- c(keys$tasks_expr, keys$tasks_counter,
keys$tasks_status, keys$tasks_envir,
keys$tasks_complete, keys$tasks_group,
keys$tasks_time_sub, key_queue)
tmp <- sort(as.character(RedisAPI::scan_find(con, "tmpjobs*")))
tmp <- tmp[!grepl(paste0(keys$files, ":.*"), tmp)]
expect_that(tmp,
equals(sort(c(keys_startup, keys_tasks))))
redis_status <- function(x) structure(x, class="redis_status")
expect_that(con$TYPE(key_queue), equals(redis_status("list")))
expect_that(con$TYPE(keys$tasks_expr), equals(redis_status("hash")))
expect_that(con$TYPE(keys$tasks_counter), equals(redis_status("string")))
expect_that(con$TYPE(keys$tasks_status), equals(redis_status("hash")))
expect_that(con$TYPE(keys$tasks_complete), equals(redis_status("hash")))
expect_that(con$TYPE(keys$tasks_group), equals(redis_status("hash")))
expect_that(con$TYPE(keys$tasks_envir), equals(redis_status("hash")))
expect_that(con$TYPE(keys$tasks_time_sub), equals(redis_status("hash")))
ids <- con$LRANGE(key_queue, 0, -1)
expect_that(ids, equals(list("1", "2")))
ids <- as.character(ids) # unlist
expect_that(task1$expr(), equals(quote(sin(1))))
expect_that(task2$expr(), equals(quote(sin(2))))
tmp <- task2$expr(locals=TRUE)
expect_that(attr(tmp, "envir"), is_a("environment"))
expect_that(ls(attr(tmp, "envir")), equals(character(0)))
attr(tmp, "envir") <- NULL
expect_that(tmp, equals(quote(sin(2))))
## TODO: check that envir is 1 and that the complete queue is empty,
## but that it is registered
expect_that(obj$con$HGET(keys$tasks_envir, ids[[1]]),
equals(obj$envir_id))
expect_that(obj$con$HGET(keys$tasks_envir, ids[[2]]),
equals(obj$envir_id))
expect_that(obj$con$HGET(keys$tasks_complete, ids[[1]]),
equals(rrqueue_key_task_complete(obj$queue_name, ids[[1]])))
expect_that(obj$con$HGET(keys$tasks_complete, ids[[2]]),
equals(rrqueue_key_task_complete(obj$queue_name, ids[[2]])))
expect_that(obj$con$HGET(keys$tasks_group, ids[[1]]), is_null())
expect_that(obj$con$HGET(keys$tasks_group, ids[[2]]), equals(grp))
expect_that(obj$tasks_groups_list(), equals(grp))
expect_that(obj$tasks_in_groups(grp), equals(ids[[2]]))
expect_that(obj$tasks_in_groups("xxx"), equals(character(0)))
grp2 <- create_group(NULL, FALSE)
grp3 <- create_group(NULL, FALSE)
obj$tasks_set_group(ids[[1]], grp2)
expect_that(sort(obj$tasks_groups_list()), equals(sort(c(grp, grp2))))
expect_that(obj$tasks_in_groups(grp2), equals(ids[[1]]))
## No error when setting a group to the same thing:
expect_that(obj$tasks_set_group(ids[[1]], grp2),
not(throws_error()))
msg <- paste("Groups already exist for tasks:", ids[[1]])
expect_that(obj$tasks_set_group(ids[[1]], grp3),
throws_error(msg))
expect_that(obj$tasks_set_group(ids[[1]], grp3, "warn"),
gives_warning(msg))
expect_that(obj$tasks_set_group(ids[[1]], grp3, "pass"),
not(gives_warning()))
expect_that(obj$tasks_in_groups(grp2), equals(ids[[1]]))
expect_that(obj$tasks_set_group(ids[[1]], grp3, "overwrite"),
not(throws_error()))
expect_that(obj$tasks_in_groups(grp2), equals(character(0)))
expect_that(obj$tasks_in_groups(grp3), equals(ids[[1]]))
## Delete the groups:
obj$tasks_set_group(ids, NULL)
expect_that(sort(obj$tasks_groups_list()), equals(character(0)))
expect_that(task1$status(), equals(TASK_PENDING))
expect_that(task2$status(), equals(TASK_PENDING))
expect_that(task1$result(), throws_error("unfetchable: PENDING"))
expect_that(task2$result(), throws_error("unfetchable: PENDING"))
expect_that(task1$key_complete,
equals(rrqueue_key_task_complete(obj$queue_name, ids[[1]])))
expect_that(task2$key_complete,
equals(rrqueue_key_task_complete(obj$queue_name, ids[[2]])))
expect_that(obj$tasks_drop(ids), equals(setNames(c(TRUE, TRUE), ids)))
expect_that(obj$tasks_drop(ids), equals(setNames(c(FALSE, FALSE), ids)))
## TODO:
## expect_that(obj$tasks(), equals(empty_named_character()))
expect_that(con$LRANGE(key_queue, 0, -1), equals(list()))
expect_that(task1$status(), equals(TASK_MISSING))
expect_that(task2$status(), equals(TASK_MISSING))
## This needs to send output to a file and not to stdout!
logfile <- "worker.log"
## See: https://github.com/hadley/testthat/issues/144
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile)
## or!
## w <- rrqueue::worker("tmpjobs", heartbeat_period=10)
expect_that(obj$workers_len(), equals(1))
expect_that(con$TYPE(keys$workers_status), equals(redis_status("hash")))
expect_that(con$TYPE(keys$workers_name), equals(redis_status("set")))
w <- obj$workers_list()
expect_that(length(w), equals(1L))
expect_that(w, is_identical_to(wid))
expect_that(obj$workers_list(envir_only=TRUE), equals(wid))
ww <- parse_worker_name(w)
expect_that(ww$host, equals(Sys.info()[["nodename"]]))
expect_that(obj$workers_status(),
equals(structure(as.character(WORKER_IDLE),
names=w[[1]])))
## OK, submit a trivial job
task <- obj$enqueue(sin(1))
## Blocking check for job completion.
done <- obj$con$BLPOP(task$key_complete, 10)
expect_that(done[[1]], equals(task$key_complete))
expect_that(done[[2]], equals(task$id))
## TODO: if the test above fails, everything below here will pack a sad.
## TODO: Fix $tasks() here
## expect_that(restore_expression(obj$tasks()[["3"]], e, obj$objects),
## equals(quote(sin(1))))
## expect_that(ls(e), equals(character(0)))
expect_that(obj$tasks_status()[[task$id]], equals(TASK_COMPLETE))
expect_that(obj$task_result(task$id), equals(sin(1)))
expect_that(task$status(), equals(TASK_COMPLETE))
expect_that(task$result(), equals(sin(1)))
## Another, using arguments:
x <- 1
e <- environment()
task <- obj$enqueue(sin(x), e)
done <- obj$con$BLPOP(task$key_complete, 10)
expect_that(obj$tasks_status()[[task$id]], equals(TASK_COMPLETE))
expect_that(obj$task_result(task$id), equals(sin(x)))
## TODO: factor out the mangling here.
expect_that(obj$objects$list(),
equals(paste0(task_object_prefix(task$id), "x")))
## TODO:
## e2 <- new.env(parent=.GlobalEnv)
## expect_that(restore_expression(obj$tasks()[[task$id]], e2, obj$objects),
## equals(quote(sin(x))))
## expect_that(ls(e2), equals("x"))
## TODO:
## on job removal, clean up objects that were created. That's easy
## enough to set up with enqueue, and then trigger later.
obj$send_message("STOP", w)
Sys.sleep(.5)
expect_that(obj$workers_len(), equals(0))
expect_that(obj$workers_list(), equals(character(0)))
expect_that(obj$workers_list_exited(), equals(wid))
expect_that(obj$workers_list_exited(TRUE), equals(wid))
expect_that(obj$workers_log_tail(wid)[["command"]], equals("STOP"))
expect_that(obj$workers_status(wid), equals(setNames("EXITED", wid)))
dlog <- obj$workers_log_tail(w, n=0)
expect_that(dlog, is_a("data.frame"))
expect_that(dlog$command, equals(c("ALIVE", "ENVIR",
"TASK_START", "TASK_COMPLETE",
"TASK_START", "TASK_COMPLETE",
"MESSAGE", "RESPONSE", "STOP")))
expect_that(dlog$message, equals(c("", obj$envir_id, "3", "3", "4", "4",
"STOP", "STOP", "OK")))
## TODO: cleanup properly.
test_cleanup()
})
test_that("worker responses", {
test_cleanup()
on.exit(test_cleanup())
obj <- queue("tmpjobs", sources="myfuns.R")
con <- obj$con
wid <- worker_spawn(obj$queue_name, "worker.log")
## or!
## w <- rrqueue::worker("tmpjobs")
wid <- obj$workers_list()
response <- rrqueue_key_worker_response(obj$queue_name, wid)
expect_that(con$HLEN(response), equals(0L))
## First, PING:
id <- obj$send_message("PING")
## sending messages returns the set of workers that the message was
## sent to. That makes it easier to retrieve messages from their
## queue.
Sys.sleep(.2)
expect_that(obj$has_responses(id),
equals(setNames(TRUE, wid)))
expect_that(obj$response_ids(wid), equals(id))
res <- obj$get_responses(id)
expect_that(res, equals(setNames(list("PONG"), wid)))
res <- obj$get_responses(id, delete=TRUE)
expect_that(res, equals(setNames(list("PONG"), wid)))
expect_that(obj$get_responses(id, delete=TRUE),
throws_error("Response missing for workers"))
expect_that(obj$has_responses(id), equals(setNames(FALSE, wid)))
expect_that(obj$response_ids(wid), equals(character(0)))
id <- obj$send_message("ECHO", "hello")
Sys.sleep(.2)
res <- obj$get_response(id, wid, delete=TRUE)
expect_that(res, equals("OK"))
expect_that(obj$get_response(id, wid, delete=TRUE),
throws_error("Response missing for workers"))
id <- obj$send_message("EVAL", "1 + 1")
Sys.sleep(.2)
res <- obj$get_response(id, wid, delete=TRUE)
expect_that(res, equals(2))
id <- obj$send_message("INFO")
Sys.sleep(.2)
res <- obj$get_response(id, wid, delete=TRUE)
expect_that(res, is_a("worker_info"))
expect_that(res$worker, equals(wid))
## I think I'm not getting the ids correct here.
id <- obj$send_message("STOP")
Sys.sleep(.2)
res <- obj$get_response(id, wid, delete=TRUE)
expect_that(res, equals("BYE"))
})
test_that("cleanup", {
test_cleanup()
queues()
obj <- queue("tmpjobs", sources="myfuns.R")
obj$enqueue(Sys.sleep(20))
wid <- worker_spawn(obj$queue_name, "worker.log")
## or!
## w <- rrqueue::worker("tmpjobs")
info <- obj$workers_info()
pid <- info[[1]]$pid
expect_that(pid_exists(pid), is_true())
## Better; poll for a change in the status...
Sys.sleep(.5)
expect_that(obj$workers_status(), equals(setNames("BUSY", wid)))
queue_clean(obj$con, "tmpjobs", stop_workers="kill")
Sys.sleep(.5)
expect_that(pid_exists(pid), is_false())
test_cleanup()
})
test_that("worker-first startup", {
test_cleanup()
wid <- worker_spawn("tmpjobs", "worker.log")
## or!
## w <- rrqueue::worker("tmpjobs")
obs <- observer("tmpjobs")
expect_that(obs$workers_list(), equals(wid))
## No environments:
expect_that(obs$workers_info()[[wid]]$envir, equals(character(0)))
expect_that(obs$worker_envir(wid), equals(character(0)))
## startup a queue:
obj <- queue("tmpjobs", sources="myfuns.R")
envir_id <- obj$envir_id
## This is not updated:
expect_that(obs$workers_info()[[1]]$envir, equals(character(0)))
## But this is (might need a little sleep here)
Sys.sleep(.1)
expect_that(obs$worker_envir(wid), equals(envir_id))
## Ask the worker to update the environment:
obj$send_message("INFO")
Sys.sleep(.1) # Do a BLPOP on the response queue.
expect_that(obs$workers_info()[[1]]$envir, equals(envir_id))
expect_that(obj$envirs_list(), equals(envir_id))
expect_that(obj$envir_workers(envir_id),
equals(setNames(TRUE, wid)))
t <- obj$enqueue(sin(1))
Sys.sleep(.1)
expect_that(t$result(), equals(sin(1)))
obj$send_message("STOP")
Sys.sleep(.1)
expect_that(obj$envir_workers(envir_id),
equals(setNames(TRUE, wid)[-1]))
})
test_that("file info", {
test_cleanup()
on.exit(test_cleanup())
existing <- queues()
expect_that(existing, equals(character(0)))
filename <- "myfuns.R"
obj <- queue("tmpjobs", sources=filename)
dat <- string_to_object(obj$con$HGET(obj$keys$envirs_files, obj$envir_id))
tmp <- tempfile("rrqueue_")
files_unpack(obj$files, dat, tmp)
expect_that(dir(tmp), equals(filename))
expect_that(hash_file(file.path(tmp, filename)),
equals(hash_file(filename)))
})
test_that("fetch files", {
test_cleanup()
queues()
obj <- queue("tmpjobs", sources="myfuns.R")
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, "worker.log")
## or!
## w <- rrqueue::worker("tmpjobs", heartbeat_period=10)
id <- obj$send_message("PUSH", "myfuns.R")
dat <- obj$get_response(id, wid, wait=5, delete=TRUE)
tmp <- tempfile("rrqueue_")
files_unpack(obj$files, dat, tmp)
expect_that(dir(tmp), equals("myfuns.R"))
expect_that(hash_file(file.path(tmp, "myfuns.R")),
equals(hash_file("myfuns.R")))
obj$send_message("STOP")
})
test_that("empty start", {
path <- tempfile("rrqueue_")
dir.create(path)
test_cleanup()
queues()
obj <- queue("tmpjobs", sources="myfuns.R")
Sys.setenv("R_TESTS" = "")
logfile <- tempfile("rrqueue_", fileext=".log")
wid <- worker_spawn(obj$queue_name, logfile, path=path)
expect_that(file.exists(logfile), is_true())
## The directory is empty:
id <- obj$send_message("DIR")
dat <- obj$get_response(id, wid, wait=5, delete=TRUE)
expect_that(dat, equals(empty_named_character()))
## But we still do have a worker listening on this queue:
expect_that(obj$worker_envir(wid), equals(obj$envir_id))
expect_that(obj$envir_workers(obj$envir_id), equals(setNames(TRUE, wid)))
## We can pull files onto the worker too:
res <- obj$files_pack("myfuns.R")
dat <- obj$envirs_contents(obj$envir_id)[[1]]$source_files
id <- obj$send_message("PULL", dat)
expect_that(obj$get_response(id, wid, wait=5, delete=TRUE), equals("OK"))
id <- obj$send_message("DIR")
dat <- obj$get_response(id, wid, wait=5)
expect_that(dat, equals(hash_files("myfuns.R")))
## can also exectute commands on the worker:
id <- obj$send_message("EVAL", "getwd()", wid)
dat <- obj$get_response(id, wid, wait=5)
## Might be platform dependent, unfortunately.
expect_that(dat, equals(normalizePath(path)))
id <- obj$send_message("STOP")
dat <- obj$get_response(id, wid, wait=5)
expect_that(dat, equals("BYE"))
})
test_that("complex expressions", {
obj <- queue("tmpjobs")
expect_that(obj$enqueue(sin(cos(1))),
throws_error("complex expressions not yet supported"))
})
test_that("controlled failures", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
wid <- worker_spawn(obj$queue_name, tempfile())
t <- obj$enqueue(failure(controlled=TRUE))
Sys.sleep(.1)
expect_that(t$status(), equals("COMPLETE"))
expect_that(is_error(t$result()), is_true())
expect_that(t$result(), not(is_a("WorkerTaskError")))
t <- obj$enqueue(failure(controlled=FALSE))
Sys.sleep(.1)
expect_that(t$status(), equals("ERROR"))
expect_that(is_error(t$result()), is_true())
expect_that(t$result(), is_a("WorkerTaskError"))
obj$send_message("STOP")
})
test_that("wait", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
t <- obj$enqueue(slowdouble(0.3))
expect_that(t$wait(0), throws_error("task not returned in time"))
expect_that(t$wait(0.1), throws_error("task not returned in time"))
wid <- worker_spawn(obj$queue_name, tempfile())
expect_that(t$wait(1), equals(0.6))
expect_that(t$wait(1), takes_less_than(1))
t <- obj$enqueue(slowdouble(1))
expect_that(t$wait(0), throws_error("task not returned in time"))
expect_that(t$wait(2), equals(2))
obj$send_message("STOP")
Sys.sleep(.5)
})
test_that("config", {
obj <- queue(sources="myfuns.R", config="config.yml")
dat <- load_config("config.yml")
expect_that(obj$queue_name, equals(dat$queue_name))
cfg <- obj$con$config()
expect_that(cfg$host, equals(dat$redis$host))
expect_that(cfg$port, equals(dat$redis$port))
env <- obj$envirs_contents()[[obj$envir_id]]
expect_that(env$packages, equals(dat$packages))
expect_that(env$sources, equals(dat$sources))
})
test_that("config observer", {
obj <- observer(config="config.yml")
dat <- load_config("config.yml")
expect_that(obj$queue_name, equals(dat$queue_name))
cfg <- obj$con$config()
expect_that(cfg$host, equals(dat$redis$host))
expect_that(cfg$port, equals(dat$redis$port))
})
test_that("refresh_environent", {
writeLines("f <- function(x) 1", "update.R")
on.exit(file.remove("update.R"))
test_cleanup()
obj <- queue("tmpjobs", sources="update.R")
envir_id <- obj$envir_id
wid <- worker_spawn(obj$queue_name, tempfile())
r1 <- obj$enqueue(f(1))$wait(60)
expect_that(r1, equals(1))
writeLines("f <- function(x) 2", "update.R")
r2 <- obj$enqueue(f(2))$wait(60)
expect_that(r2, equals(1))
expect_that(val <- obj$refresh_environment(),
shows_message("Initialising environment"))
expect_that(val, is_true())
expect_that(obj$envir_id, not(equals(envir_id)))
expect_that(val <- obj$refresh_environment(), not(shows_message()))
expect_that(val, is_false())
r3 <- obj$enqueue(f(2))$wait(60)
expect_that(r3, equals(2))
expect_that(sort(obj$worker_envir(wid)),
equals(sort(c(envir_id, obj$envir_id))))
obj$send_message("STOP")
})
test_that("pause", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
wid <- worker_spawn(obj$queue_name, tempfile())
expect_that(obj$workers_status(),
equals(setNames(WORKER_IDLE, wid)))
id <- obj$send_message("PAUSE")
res <- obj$get_response(id, wid, wait=1)
expect_that(res, equals("OK"))
expect_that(obj$workers_status(),
equals(setNames(WORKER_PAUSED, wid)))
## Can still read the current environment:
expect_that(obj$worker_envir(wid), equals(obj$envir_id))
t <- obj$enqueue(sin(1))
## should have been queued by now if the worker was interested:
Sys.sleep(.5)
expect_that(t$status(), equals(TASK_PENDING))
id <- obj$send_message("RESUME")
res <- obj$get_response(id, wid, wait=1)
expect_that(res, equals("OK"))
Sys.sleep(.5)
expect_that(t$status(), equals(TASK_COMPLETE))
log <- obj$workers_log_tail(wid, 0)
expect_that(log$command, equals(c("ALIVE", "ENVIR",
"MESSAGE", "RESPONSE",
"MESSAGE", "RESPONSE",
"TASK_START", "TASK_COMPLETE")))
expect_that(log$message, equals(c("", obj$envir_id,
"PAUSE", "PAUSE",
"RESUME", "RESUME",
t$id, t$id)))
## Pausing twice is fine:
id <- obj$send_message("PAUSE")
res <- obj$get_response(id, wid, wait=1)
expect_that(res, equals("OK"))
id <- obj$send_message("PAUSE")
res <- obj$get_response(id, wid, wait=1)
expect_that(res, equals("OK"))
## Will stop when paused:
id <- obj$send_message("STOP")
res <- obj$get_response(id, wid, wait=1)
expect_that(res, equals("BYE"))
expect_that(obj$workers_list_exited(), equals(wid))
})
test_that("unknown command", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
wid <- worker_spawn(obj$queue_name, tempfile())
expect_that(obj$workers_status(),
equals(setNames(WORKER_IDLE, wid)))
id <- obj$send_message("XXXX")
res <- obj$get_response(id, wid, wait=1)
expect_that(res, is_a("condition"))
expect_that(res$message, matches("Recieved unknown message"))
expect_that(res$command, equals("XXXX"))
expect_that(res$args, equals(NULL))
id <- obj$send_message("YYYY", "ZZZZ")
res <- obj$get_response(id, wid, wait=1)
expect_that(res, is_a("condition"))
expect_that(res$message, matches("Recieved unknown message"))
expect_that(res$command, equals("YYYY"))
expect_that(res$args, equals("ZZZZ"))
## Complex arguments are supported:
d <- data.frame(a=1, b=2)
id <- obj$send_message("YYYY", d)
res <- obj$get_response(id, wid, wait=1)
expect_that(res, is_a("condition"))
expect_that(res$message, matches("Recieved unknown message"))
expect_that(res$command, equals("YYYY"))
expect_that(res$args, equals(d))
obj$stop_workers()
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.