context("heartbeat")
## This file collects all the really nasty stuff with starting and
## destroying worker processes, and detecting that with the heartbeat
## process. There is a lot of mocking up and it's slow to develop and
## test. There is a real problem with cascading test failures
## throughout, so many failures might just be caused by a single root
## thing.
test_that("heartbeat with signals", {
test_cleanup()
## Fairly fast worker timeout:
expire <- 3
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile,
heartbeat_period=1, heartbeat_expire=expire)
## worker("tmpjobs", heartbeat_expire=3, heartbeat_period=1)
pid <- obj$workers_info()[[wid]]$pid
expect_that(pid_exists(pid), is_true())
expect_that(obj$workers_running(wid),
equals(setNames(TRUE, wid)))
expect_that(obj$workers_status(),
equals(setNames(WORKER_IDLE, wid)))
t <- obj$enqueue(slowdouble(100))
Sys.sleep(.5)
expect_that(obj$workers_status(wid), equals(setNames("BUSY", wid)))
expect_that(is.na(t$times()[["finished"]]), is_true())
obj$send_signal(tools::SIGINT, wid)
Sys.sleep(.5)
expect_that(t$status(), equals(TASK_ORPHAN))
expect_that(is.na(t$times()[["finished"]]), is_false())
## Machine still alive:
expect_that(pid_exists(pid), is_true())
## Realistically, this will trigger fairly quickly.
for (i in seq_len(10)) {
## Another one at this point will do nothing:
obj$send_signal(tools::SIGINT, wid)
Sys.sleep(.5)
expect_that(pid_exists(pid), is_true())
## Check that an additional job will work OK and not get caught by
## the *second* interrupt (this did happen to me before)
tt <- obj$enqueue(sin(1))
expect_that(res <- tt$wait(1), not(throws_error()))
expect_that(res, equals(sin(1)))
if ("REQUEUE" %in% obj$workers_log_tail(wid, 0)$command) {
break
}
}
expect_that("REQUEUE" %in% obj$workers_log_tail(wid, 0)$command,
is_true())
t <- obj$enqueue(slowdouble(100))
Sys.sleep(.5)
obj$send_signal(tools::SIGTERM, wid)
Sys.sleep(.5)
expect_that(pid_exists(pid), is_false())
expect_that(obj$workers_running(wid),
equals(setNames(TRUE, wid)))
expect_that(obj$workers_list(), equals(wid))
Sys.sleep(expire)
expect_that(obj$workers_running(wid),
equals(setNames(FALSE, wid)))
## Still in the workers list, though:
expect_that(obj$workers_list(), equals(wid))
ret <- obj$workers_identify_lost()
expect_that(ret, equals(list(workers=wid, tasks=t$id)))
expect_that(t$status(), equals(TASK_ORPHAN))
expect_that(is.na(t$times()[["finished"]]), is_false())
expect_that(obj$workers_status(wid), equals(setNames(WORKER_LOST, wid)))
expect_that(obj$workers_list_exited(), equals(wid))
## We were killed in the act:
log <- obj$workers_log_tail(wid, 1)
expect_that(log$command, equals("TASK_START"))
## Then we can clean this mess up:
res <- obj$workers_delete_exited()
expect_that(res, equals(wid))
expect_that(obj$workers_list_exited(), equals(character(0)))
expect_that(obj$workers_delete_exited(), equals(character(0)))
## No worker keys, plenty of task keys:
expect_that(RedisAPI::scan_find(obj$con, "tmpjobs:tasks:*"),
not(equals(character(0))))
expect_that(RedisAPI::scan_find(obj$con, "tmpjobs:workers:*"),
equals(character(0)))
})
test_that("heartbeat shutdown when running job", {
expire <- 3
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile,
heartbeat_period=1, heartbeat_expire=expire)
## worker("tmpjobs", heartbeat_expire=3, heartbeat_period=1)
## wid <- obj$workers_list()
pid <- obj$workers_info()[[wid]]$pid
expect_that(pid_exists(pid), is_true())
t <- obj$enqueue(slowdouble(100))
Sys.sleep(.5)
expect_that(obj$workers_status(wid), equals(setNames("BUSY", wid)))
## This message will be ignored:
obj$send_message("STOP")
Sys.sleep(.5)
expect_that(obj$workers_status(wid), equals(setNames("BUSY", wid)))
expect_that(pid_exists(pid), is_true())
expect_that(t$status(), equals("RUNNING"))
## But after a message will be honoured:
obj$send_signal(tools::SIGINT, wid)
Sys.sleep(.5)
## The shutdown *is* clean:
expect_that(obj$workers_status(wid),
equals(setNames(WORKER_EXITED, wid)))
log <- obj$workers_log_tail(wid, 1)
expect_that(log$command, equals("STOP"))
expect_that(log$message, equals("OK"))
expect_that(pid_exists(pid), is_false())
## The task was orphaned:
expect_that(t$status(), equals(TASK_ORPHAN))
})
test_that("requeue orphaned jobs", {
expire <- 3
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile,
heartbeat_period=1, heartbeat_expire=expire)
## worker("tmpjobs", heartbeat_expire=3, heartbeat_period=1)
## wid <- obj$workers_list()
pid <- obj$workers_info()[[wid]]$pid
expect_that(pid_exists(pid), is_true())
t_double <- 5
t <- obj$enqueue(slowdouble(t_double))
Sys.sleep(.5)
expect_that(obj$workers_status(wid), equals(setNames("BUSY", wid)))
expect_that(is.na(t$times()[["finished"]]), is_true())
obj$send_signal(tools::SIGINT, wid)
Sys.sleep(.5)
expect_that(t$status(), equals(TASK_ORPHAN))
expect_that(is.na(t$times()[["finished"]]), is_false())
t2 <- obj$requeue(t$id)
expect_that(t$status(), equals(TASK_REDIRECT))
Sys.sleep(0.5)
expect_that(t2$status(), equals(TASK_RUNNING))
Sys.sleep(t_double)
expect_that(t2$status(), equals(TASK_COMPLETE))
expect_that(t$status(follow_redirect=TRUE), equals(TASK_COMPLETE))
expect_that(t$result(follow_redirect=TRUE), equals(t_double * 2))
expect_that(t$result(), throws_error("task [0-9]+ is unfetchable"))
tt <- obj$tasks_times(t2$id)
expect_that(tt, is_a("data.frame"))
expect_that(tt$running, is_more_than(t_double - 1))
expect_that(tt$running, is_less_than(t_double + 1))
obj$send_message("STOP")
})
test_that("workers stop", {
expire <- 3
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile,
heartbeat_period=1, heartbeat_expire=expire)
## worker("tmpjobs", heartbeat_expire=3, heartbeat_period=1)
## wid <- obj$workers_list()
pid <- obj$workers_info()[[wid]]$pid
expect_that(pid_exists(pid), is_true())
## TODO: worker handles could help avoid this bit of mock up:
expect_that(msg <- worker_stop_message(
list(con=obj$con,
keys=list(queue_name=obj$queue_name),
name=wid,
styles=worker_styles())),
shows_message())
cmp <- bquote(rrqueue::worker_stop(.(obj$queue_name), .(wid)))
expect_that(parse(text=msg[[length(msg)]])[[1]],
equals(cmp))
eval(cmp, .GlobalEnv)
Sys.sleep(.5)
expect_that(pid_exists(pid), is_false())
log <- obj$workers_log_tail(wid, 1)
expect_that(log$command, equals("STOP"))
expect_that(log$message, equals("OK"))
})
test_that("stop_workers (simple case)", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile)
ok <- obj$stop_workers(wid, wait=10)
expect_that(ok, equals(wid))
expect_that(obj$workers_list(), equals(character(0)))
expect_that(obj$workers_list_exited(), equals(wid))
log <- obj$workers_log_tail(wid, 3)
expect_that(log$command, equals(c("MESSAGE", "RESPONSE", "STOP")))
expect_that(log$message, equals(c("STOP", "STOP", "OK")))
res <- obj$workers_identify_lost()
expect_that(res, equals(list(workers=character(), tasks=character())))
expect_that(obj$workers_delete_exited(), equals(wid))
expect_that(obj$workers_list_exited(), equals(character(0)))
expect_that(nrow(obj$workers_log_tail(wid, 1)), equals(0))
})
test_that("stop_workers (running, interrupt)", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile)
len <- 2
t <- obj$enqueue(slowdouble(len))
Sys.sleep(.25)
expect_that(t$status(), equals(TASK_RUNNING))
ok <- obj$stop_workers(wid, interrupt=TRUE)
expect_that(ok, equals(wid))
Sys.sleep(.25)
expect_that(t$status(), equals(TASK_ORPHAN))
expect_that(obj$workers_list(), equals(character(0)))
expect_that(obj$workers_list_exited(), equals(wid))
log <- obj$workers_log_tail(wid, 5)
expect_that(log$command, equals(c("INTERRUPT", "TASK_ORPHAN",
"MESSAGE", "RESPONSE", "STOP")))
expect_that(log$message, equals(c("", t$id, "STOP", "STOP", "OK")))
res <- obj$workers_identify_lost()
expect_that(res, equals(list(workers=character(), tasks=character())))
expect_that(obj$workers_delete_exited(), equals(wid))
expect_that(obj$workers_list_exited(), equals(character(0)))
expect_that(nrow(obj$workers_log_tail(wid, 1)), equals(0))
})
test_that("stop_workers (running, wait)", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile)
len <- 2
t <- obj$enqueue(slowdouble(len))
Sys.sleep(.25)
expect_that(t$status(), equals(TASK_RUNNING))
ok <- obj$stop_workers(wid, interrupt=FALSE)
expect_that(ok, equals(wid))
Sys.sleep(len + .25)
expect_that(t$status(), equals(TASK_COMPLETE))
expect_that(obj$workers_list(), equals(character(0)))
expect_that(obj$workers_list_exited(), equals(wid))
log <- obj$workers_log_tail(wid, 1)
expect_that(log$command, equals("STOP"))
expect_that(log$message, equals("OK"))
expect_that(obj$workers_delete_exited(), equals(wid))
expect_that(obj$workers_list_exited(), equals(character(0)))
expect_that(nrow(obj$workers_log_tail(wid, 1)), equals(0))
})
test_that("stop_workers (blocking)", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
expire <- 3
logfile <- "worker_heartbeat.log"
Sys.setenv("R_TESTS" = "")
wid <- worker_spawn(obj$queue_name, logfile,
heartbeat_expire=expire, heartbeat_period=1)
t <- obj$enqueue(block(10))
Sys.sleep(.5)
ok <- obj$stop_workers(wid, wait=1)
expect_that(ok, equals(wid))
expect_that(t$status(), equals(TASK_RUNNING))
Sys.sleep(expire)
res <- obj$workers_identify_lost()
expect_that(res, equals(list(workers=wid, tasks=t$id)))
expect_that(t$status(), equals(TASK_ORPHAN))
})
test_that("clean up multiple workers", {
test_cleanup()
obj <- queue("tmpjobs", sources="myfuns.R")
expect_that(obj$workers_list_exited(), equals(character(0)))
expire <- 3
n <- 4
logfile <- sprintf("worker_%d.log", seq_len(n))
wid <- worker_spawn(obj$queue_name, logfile, n=n,
heartbeat_expire=expire, heartbeat_period=1)
expect_that(sort(obj$workers_list()), equals(sort(wid)))
for (i in seq_len(n)) {
obj$enqueue(slowdouble(1000))
}
Sys.sleep(.5)
ids <- obj$workers_task_id()
## Kill two savagely:
wkill <- obj$workers_list()[2:3]
pid <- viapply(obj$workers_info()[wkill], "[[", "pid")
ok <- tools::pskill(pid, tools::SIGTERM) == PSKILL_SUCCESS
Sys.sleep(expire + 1)
expect_that(sort(obj$workers_list()), equals(sort(wid)))
x <- obj$workers_identify_lost()
expect_that(sort(x$workers), equals(sort(wkill)))
expect_that(sort(x$tasks), equals(sort(unname(ids[wkill]))))
expect_that(obj$tasks_status(x$tasks),
equals(setNames(c(TASK_ORPHAN, TASK_ORPHAN),
x$tasks)))
expect_that(obj$workers_status(x$workers),
equals(setNames(c(WORKER_LOST, WORKER_LOST), wkill)))
obj$stop_workers()
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.