WORKER_IDLE <- "IDLE"
WORKER_BUSY <- "BUSY"
WORKER_EXITED <- "EXITED"
WORKER_LOST <- "LOST"
WORKER_PAUSED <- "PAUSED"
## TODO: Part way through restructuring the task timeout / heartbeat
## thing, but it might be wise to have the timeout be slower during
## the idle phase? That could be done with a multiplier on the
## heartbeat.
##' @importFrom R6 R6Class
.R6_worker <- R6::R6Class(
"worker",
public=list(
con=NULL,
queue_name=NULL,
keys=NULL,
key_queue=NULL,
envir=list(),
heartbeat=NULL,
heartbeat_period=NULL,
heartbeat_expire=NULL,
name=NULL,
files=NULL,
objects=NULL,
styles=NULL,
paused=FALSE,
initialize=function(queue_name, redis_host, redis_port,
heartbeat_period, heartbeat_expire, key_worker_alive) {
#
self$name <- sprintf("%s::%d", hostname(), process_id())
self$queue_name <- queue_name
## TODO: Set these differently if there is no heartbeat support.
self$heartbeat_period <- as.numeric(heartbeat_period)
self$heartbeat_expire <- as.numeric(heartbeat_expire)
self$keys <- rrqueue_keys(self$queue_name, worker_name=self$name)
self$styles <- worker_styles()
self$con <- redis_connection(redis_host, redis_port)
## NOTE: This needs to happen *before* running the
## initialize_worker; it checks that we can actually use the
## database and that we will not write anything to an existing
## worker. An error here will not be caught.
##
## TODO: Provide some guidance as to what to do here!
if (self$con$SISMEMBER(self$keys$workers_name, self$name) == 1L) {
stop("Looks like this worker exists already...")
}
withCallingHandlers(self$initialize_worker(key_worker_alive),
error=self$catch_error)
self$main()
},
## This is in its own function so that error handling can be done
## gracefully; it's only called by initialize()
initialize_worker=function(key_worker_alive) {
info <- object_to_string(self$print_info())
self$heartbeat <- heartbeat(self$con,
self$keys$heartbeat,
self$heartbeat_period,
self$heartbeat_expire)
redis_multi(self$con, {
self$con$SADD(self$keys$workers_name, self$name)
self$con$HSET(self$keys$workers_status, self$name, WORKER_IDLE)
self$con$HDEL(self$keys$workers_task, self$name)
self$con$DEL(self$keys$log)
self$con$HSET(self$keys$workers_info, self$name, info)
})
self$log("ALIVE")
self$files <- file_cache(self$keys$files, self$con)
self$objects <- object_cache(self$keys$objects, self$con)
## Always listen to the message queue, even if no environments
## will be loaded (this could be merged with the loop below for
## better robustness).
self$key_queue <- self$set_key_queue(clear=TRUE)
## load *existing* environments that the controller knows about.
envir_ids <- as.character(self$con$HKEYS(self$keys$envirs_contents))
for (envir_id in envir_ids) {
self$initialize_environment(envir_id)
}
## This announces that we're up; things may monitor this
## queue, and worker_spawn does a BLPOP to
if (!is.null(key_worker_alive)) {
self$con$RPUSH(key_worker_alive, self$name)
}
},
initialize_environment=function(envir_id) {
self$log("ENVIR", envir_id)
dat_str <- self$con$HGET(self$keys$envirs_contents, envir_id)
dat <- string_to_object(dat_str)
protocol <- rrqueue_protocol()
if (protocol != dat$protocol) {
self$log("ENVIR ERROR",
sprintf("protocol mismatch (queue: %s, worker: %s)",
dat$protocol, protocol))
return(FALSE)
}
## TODO: Probably refactor this into something easily testable...
## Check the hashes of the files
hash_expected <- dat$source_files
if (compare_hash(hash_expected)) {
e <- tryCatch(create_environment(dat$packages, dat$sources),
error=identity)
} else {
tmp <- tempfile("rrqueue_")
files_unpack(self$files, hash_expected, tmp)
owd <- setwd(tmp)
e <- tryCatch(create_environment(dat$packages, dat$sources),
error=identity,
finally=setwd(owd))
}
## TODO: a stack trace here would be great, but requires some
## withCallingHandlers magic in the above code.
if (inherits(e, "error")) {
msg <- sprintf("%s (in %s)", e$message,
paste(deparse(e$call), collapse="\n"))
self$log("ENVIR ERROR", msg)
return(FALSE)
}
self$envir[[envir_id]] <- e
fmt <- function(x) {
if (is.null(x)) {
"(none)"
} else {
paste(x, collapse=" ")
}
}
## Read from the message queue *first* as that allows a STOP
## command to prevent the worker continuing with job.
if (!self$paused) {
self$key_queue <- self$set_key_queue()
}
self$con$SADD(self$keys$envir, envir_id)
self$log("ENVIR PACKAGES", fmt(dat$packages), push=FALSE)
self$log("ENVIR SOURCES", fmt(dat$sources), push=FALSE)
TRUE
},
set_key_queue=function(clear=FALSE) {
if (clear) {
keys <- character(0)
} else {
keys <- rrqueue_key_queue(self$queue_name, names(self$envir))
}
self$key_queue <- c(self$keys$message, keys)
},
get_environment=function(envir_id) {
self$envir[[envir_id]]
},
## TODO: if we're not running in a terminal, then we should output
## the worker id into the screen message.
log=function(label, message=NULL, push=TRUE) {
t <- Sys.time()
ti <- as.integer(t) # to nearest second
ts <- self$styles$info(as.character(t))
if (is.null(message)) {
msg_log <- sprintf("%d %s", ti, label)
msg_scr <- sprintf("[%s] %s", ts, self$styles$key(label))
} else {
msg_log <- sprintf("%d %s %s", ti, label, paste(message, collapse="\n"))
## Try and make nicely printing logs for the case where the
## message length is longer than 1:
lab <- c(label, rep_len(blank(nchar(label)), length(message) - 1L))
msg_scr <- paste(sprintf("[%s] %s %s", ts,
self$styles$key(lab),
self$styles$value(message)),
collapse="\n")
}
message(msg_scr)
if (push) {
self$con$RPUSH(self$keys$log, msg_log)
}
},
main=function() {
con <- self$con
task <- NULL
continue <- TRUE
catch_interrupt <- function(e) {
## NOTE: this won't recursively catch interrupts. Especially
## on a high-latency connection this might be long enough for
## a second interrupt to arrive. We don't deal with that and
## it will be about the same as a SIGTERM - we'll just die.
## But that will disable the heartbeat so it should all end up
## OK.
self$log("INTERRUPT")
## This condition is not quite enough; I need to know if we're
## working on a job at all.
task_running <- self$con$HGET(self$keys$workers_task, self$name)
if (!is.null(task_running)) {
self$task_cleanup(e, task_running, TASK_ORPHAN)
}
## Depending on when the interrupt was recieved in the loop
## below (I think BLPOP vs R code) we might have picked a task
## off the list but not yet be running it (this is the
## situation where a SIGTERM will lose a job, but we'd pick
## that up with the orphaning approach). So here, we test
## that if we have a task and it's not the same as the task
## that was running we immediately push it back onto the queue
## in first place.
if (!is.null(task[[2]]) && !identical(task[[2]], task_running)) {
self$log("REQUEUE", task[[2]])
self$con$LPUSH(task[[1]], task[[2]])
}
worker_stop_message(self)
}
catch_worker_stop <- function(e) {
self$shutdown("OK")
continue <<- FALSE
}
catch_worker_error <- function(e) {
if (!is.null(e$task_id)) {
self$task_cleanup(e, e$task_id, e$task_status)
}
}
while (continue) {
tryCatch({
task <- con$BLPOP(self$key_queue, self$heartbeat_period)
if (is.null(task)) {
## self$log("WAITING", push=FALSE)
} else {
if (task[[1]] == self$keys$message) {
self$run_message(task[[2]])
} else { # is a task
self$run_task(task[[2]])
}
}
},
interrupt=catch_interrupt,
WorkerStop=catch_worker_stop,
WorkerError=catch_worker_error,
error=self$catch_error)
}
},
run_task=function(task_id) {
keys <- self$keys
con <- self$con
self$log("TASK_START", task_id)
expr <- self$task_retrieve(task_id)
context <- self$task_prepare(task_id, expr)
## Here, we get time from the Redis server, not R; that means
## that all ideas of time are centralised.
time <- redis_time(con)
redis_multi(con, {
con$HSET(keys$workers_status, self$name, WORKER_BUSY)
con$HSET(keys$workers_task, self$name, task_id)
con$HSET(keys$tasks_worker, task_id, self$name)
con$HSET(keys$tasks_status, task_id, TASK_RUNNING)
con$HSET(keys$tasks_time_beg, task_id, time)
})
expr_str <- capture.output(print(context$expr))
self$log("EXPR", expr_str, push=FALSE)
res <- tryCatch(eval(context$expr, context$envir),
error=WorkerTaskError)
if (inherits(res, "WorkerTaskError")) {
status <- TASK_ERROR
} else {
status <- TASK_COMPLETE
}
self$task_cleanup(res, task_id, status)
},
run_message=function(msg) {
content <- string_to_object(msg)
message_id <- content$id
cmd <- content$command
args <- content$args
## NOTE: This is a departure from previous because we no longer
## print the *arguments* to args. That could be modified into
## here pretty easily by appending args iff they are a scalar
## character. Better might be to serialise to json here, but
## that's going to be more work and not work for everything, not
## necessarily transitive without assumptions and YAGNI.
self$log("MESSAGE", cmd)
## TODO: purge object cache (save on memory)
## TODO: file(s) get, put (debugging, deployment)
## TODO: environment load, purge, etc.
## TODO: worker restart? Possible?
res <- switch(cmd,
PING=run_message_PING(),
ECHO=run_message_ECHO(args),
EVAL=run_message_EVAL(args),
STOP=run_message_STOP(self, message_id, args), # noreturn
INFO=run_message_INFO(self),
ENVIR=run_message_ENVIR(self, args),
PUSH=run_message_PUSH(self, args),
PULL=run_message_PULL(self, args),
DIR=run_message_DIR(args),
PAUSE=run_message_PAUSE(self, args),
RESUME=run_message_RESUME(self, args),
## FINISH=run_message_FINISH(args),
run_message_unknown(cmd, args))
self$send_response(message_id, cmd, res)
},
send_response=function(message_id, cmd, result) {
self$log("RESPONSE", cmd)
self$con$HSET(self$keys$response, message_id,
response_prepare(message_id, cmd, result))
},
task_retrieve=function(task_id) {
expr_stored <- self$con$HGET(self$keys$tasks_expr, task_id)
if (is.null(expr_stored)) {
stop(WorkerTaskMissing(self, task_id))
}
expr_stored
},
task_prepare=function(task_id, expr_stored) {
envir_id <- self$con$HGET(self$keys$tasks_envir, task_id)
envir <- new.env(parent=self$get_environment(envir_id))
expr <- restore_expression(expr_stored, envir, self$objects)
list(expr=expr, envir=envir)
},
task_cleanup=function(data, task_id, task_status) {
con <- self$con
keys <- self$keys
key_complete <- con$HGET(keys$tasks_complete, task_id)
time <- redis_time(con)
redis_multi(con, {
con$HSET(keys$tasks_result, task_id, object_to_string(data))
con$HSET(keys$tasks_status, task_id, task_status)
con$HSET(keys$tasks_time_end, task_id, time)
con$HSET(keys$workers_status, self$name, WORKER_IDLE)
con$HDEL(keys$workers_task, self$name)
## This advertises to the controller that we're done
con$RPUSH(key_complete, task_id)
self$log(paste0("TASK_", task_status), task_id)
})
},
print_info=function() {
print(worker_info(self), banner=TRUE, styles=self$styles)
},
## Error handler of last resort:
catch_error=function(e) {
self$shutdown("ERROR")
message("This is an uncaught error in rrqueue, probably a bug!")
stop(e)
},
shutdown=function(status="OK") {
## Conditional here because the heartbeat can fail to start, in
## which case we can't run the heartbeat shutdown.
if (!is.null(self$heartbeat$stop)) {
self$heartbeat$stop()
}
self$con$DEL(self$keys$heartbeat)
self$con$SREM(self$keys$workers_name, self$name)
self$con$HSET(self$keys$workers_status, self$name, WORKER_EXITED)
self$log("STOP", status)
}))
##' Create an rrqueue worker. This blocks the main loop.
##' @title Create an rrqueue worker
##' @param queue_name Queue name
##' @param redis_host Host name/IP for the Redis server
##' @param redis_port Port for the Redis server
##' @param heartbeat_period Period between heartbeat pulses
##' @param heartbeat_expire Time that heartbeat pulses will persist
##' for (must be greater than \code{heartbeat_period})
##' @param key_worker_alive Optional key to write to when the worker
##' becomes alive. The worker will push onto this key so that
##' another process can monitor it and determine when a worker has
##' come up.
##' @export
worker <- function(queue_name,
redis_host="127.0.0.1", redis_port=6379,
heartbeat_period=30,
heartbeat_expire=90,
key_worker_alive=NULL) {
.R6_worker$new(queue_name, redis_host, redis_port,
heartbeat_period, heartbeat_expire, key_worker_alive)
}
workers_len <- function(con, keys) {
## NOTE: this is going to be an *estimate* because there might
## be old workers floating around.
##
## TODO: Drop orphan workers here, at which point this becomes a
## bit slower than we'd like...
con$SCARD(keys$workers_name)
}
workers_list <- function(con, keys) {
as.character(con$SMEMBERS(keys$workers_name))
}
workers_list_exited <- function(con, keys) {
active <- workers_list(con, keys)
ids <- as.character(con$HKEYS(keys$workers_info))
setdiff(ids, active)
}
workers_status <- function(con, keys, worker_ids=NULL) {
from_redis_hash(con, keys$workers_status, worker_ids)
}
workers_times <- function(con, keys, worker_ids=NULL, unit_elapsed="secs") {
if (is.null(worker_ids)) {
worker_ids <- workers_list(con, keys)
}
f_expire_max <- function(key) {
t <- con$GET(key)
if (is.null(t)) NA_real_ else as.numeric(t)
}
key <- rrqueue_key_worker_heartbeat(keys$queue_name, worker_ids)
expire_max <- vnapply(key, f_expire_max, USE.NAMES=FALSE)
## Current time left to expire:
t_expire <- clean_pttl(vnapply(key, con$PTTL, USE.NAMES=FALSE))
log <- workers_log_tail(con, keys, worker_ids, 1)
if (nrow(log) > 0L) {
t_last <- log$time[match(worker_ids, log$worker_id)]
} else {
t_last <- rep_len(NA_real_, length(worker_ids))
}
t_curr <- as.numeric(redis_time(con))
data.frame(worker_id=worker_ids,
expire_max=expire_max,
expire=t_expire,
last_seen=as.numeric(expire_max - t_expire, unit_elapsed),
last_action=as.numeric(t_curr - t_last, unit_elapsed),
stringsAsFactors=FALSE)
}
worker_log_tail <- function(con, keys, worker_id, n=1) {
## More intuitive `n` behaviour for "print all entries"; n of Inf
if (identical(n, Inf)) {
n <- 0
}
log_key <- rrqueue_key_worker_log(keys$queue_name, worker_id)
parse_worker_log(as.character(con$LRANGE(log_key, -n, -1)))
}
workers_log_tail <- function(con, keys, worker_ids=NULL, n=1) {
if (is.null(worker_ids)) {
worker_ids <- workers_list(con, keys)
}
tmp <- lapply(worker_ids, function(i) worker_log_tail(con, keys, i, n))
if (length(tmp) > 0L) {
n <- viapply(tmp, nrow)
cbind(worker_id=rep(worker_ids, n), do.call("rbind", tmp, quote=TRUE))
} else {
## NOTE: Need to keep this in sync with parse_worker_log; get some
## tests in here to make sure...
data.frame(worker_id=worker_ids, time=character(0),
command=character(0), message=character(0),
stringsAsFactors=FALSE)
}
}
workers_task_id <- function(con, keys, worker_id) {
from_redis_hash(con, keys$workers_task, worker_id)
}
worker_overview <- function(con, keys) {
lvls <- c(WORKER_IDLE, WORKER_BUSY, WORKER_LOST)
status <- workers_status(con, keys)
table(factor(status, lvls))
}
worker_envir <- function(con, keys, worker_id) {
key <- rrqueue_key_worker_envir(keys$queue_name, worker_id)
as.character(con$SMEMBERS(key))
}
run_message_PING <- function() {
message("PONG")
"PONG"
}
run_message_ECHO <- function(msg) {
message(msg)
"OK"
}
run_message_EVAL <- function(args) {
print(try(eval(parse(text=args), .GlobalEnv)))
}
run_message_STOP <- function(worker, message_id, args) {
worker$send_response(message_id, "STOP", "BYE")
if (is.null(args)) {
args <- "BYE"
}
stop(WorkerStop(worker, args))
}
run_message_INFO <- function(worker) {
info <- worker$print_info()
worker$con$HSET(worker$keys$workers_info, worker$name,
object_to_string(info))
info
}
run_message_ENVIR <- function(worker, args) {
if (worker$initialize_environment(args)) {
"ENVIR OK"
} else {
"ENVIR ERROR"
}
}
## Push and pull
run_message_PUSH <- function(worker, args) {
## Push files from the worker into the DB.
worker$log("PUSH")
files_pack(worker$files, files=args)
}
run_message_PULL <- function(worker, args) {
worker$log("PULL")
if (!compare_hash(args)) {
files_unpack(worker$files, args, ".")
}
"OK"
}
run_message_DIR <- function(args) {
if (length(args) == 0L) {
args <- list()
}
res <- try(do.call("dir", args))
if (!inherits(res, "try-error")) {
path <- if (is.null(args$path)) res else file.path(args$path, res)
res <- setNames(rep(NA_character_, length(res)), res)
is_file <- !vlapply(path, is_directory, USE.NAMES=FALSE)
res[is_file] <- hash_files(path[is_file])
}
res
}
run_message_PAUSE <- function(worker, args) {
worker$paused <- TRUE
worker$set_key_queue(clear=TRUE)
worker$con$HSET(worker$keys$workers_status, worker$name, WORKER_PAUSED)
"OK"
}
run_message_RESUME <- function(worker, args) {
worker$paused <- FALSE
worker$set_key_queue()
worker$con$HSET(worker$keys$workers_status, worker$name, WORKER_IDLE)
"OK"
}
run_message_unknown <- function(cmd, args) {
msg <- sprintf("Recieved unknown message: [%s]", cmd)
message(msg)
structure(list(message=msg, command=cmd, args=args),
class=c("condition"))
}
##' @importFrom crayon make_style
worker_styles <- function() {
list(info=crayon::make_style("grey"),
key=crayon::make_style("gold"),
value=crayon::make_style("dodgerblue2"))
}
## To regenerate / change:
## fig <- rfiglet::figlet(sprintf("_- %s -_", "rrqueue!"), "slant")
## dput(rstrip(strsplit(as.character(fig), "\n")[[1]]))
worker_banner_text <- function() {
c(" __",
" ______________ ___ _____ __ _____ / /",
" ______ / ___/ ___/ __ `/ / / / _ \\/ / / / _ \\/ / ______",
" /_____/ / / / / / /_/ / /_/ / __/ /_/ / __/_/ /_____/",
" ______ /_/ /_/ \\__, /\\__,_/\\___/\\__,_/\\___(_) ______",
"/_____/ /_/ /_____/"
) -> txt
paste(txt, collapse="\n")
}
worker_info <- function(worker) {
sys <- sessionInfo()
redis_config <- worker$con$config()
dat <- list(version=version_string(),
protocol=rrqueue_protocol(),
platform=sys$platform,
running=sys$running,
hostname=Sys.info()[["nodename"]],
pid=Sys.getpid(),
redis_host=redis_config$host,
redis_port=redis_config$port,
worker=worker$name,
queue_name=worker$queue_name,
heartbeat_period=worker$heartbeat_period,
heartbeat_expire=worker$heartbeat_expire,
message=worker$keys$message,
response=worker$keys$response,
log=worker$keys$log,
envir=as.character(worker$con$SMEMBERS(worker$keys$envir)))
class(dat) <- "worker_info"
dat
}
##' @export
print.worker_info <- function(x, banner=FALSE, styles=worker_styles(), ...) {
xx <- x
xx$envir <- sprintf("{%s}", paste(x$envir, collapse=", "))
xx$protocol <- as.character(xx$protocol)
n <- nchar(names(xx))
pad <- vcapply(max(n) - n, strrep, str=" ")
ret <- sprintf(" %s:%s %s",
styles$key(names(xx)), pad,
styles$value(as.character(xx)))
if (banner) {
message(crayon::make_style(random_colour())(worker_banner_text()))
}
message(paste(ret, collapse="\n"))
invisible(x)
}
workers_running <- function(con, keys, worker_ids=NULL, include_times=FALSE) {
if (is.null(worker_ids)) {
worker_ids <- workers_list(con, keys)
}
hkeys <- rrqueue_key_worker_heartbeat(keys$queue_name, worker_ids)
pttl <- vnapply(hkeys, con$PTTL, USE.NAMES=FALSE)
status <- setNames(rep.int(TRUE, length(worker_ids)), worker_ids)
status[pttl == -1] <- NA
status[pttl == -2] <- FALSE
if (include_times) {
attr(status, "time") <- clean_pttl(pttl)
}
status
}
workers_identify_lost <- function(con, keys, worker_ids=NULL) {
lost <- !workers_running(con, keys, worker_ids)
if (any(lost)) {
lost_worker_ids <- names(lost)[lost]
con$SREM(keys$workers_name, lost_worker_ids)
## TODO: Not sure; perhaps this should be allowed?
con$HMSET(keys$workers_status, lost_worker_ids,
rep_len(WORKER_LOST, length(lost_worker_ids)))
## Also pick up the *tasks* that are lost here.
task_ids <- con$HMGET(keys$workers_task, lost_worker_ids)
task_ids <- as.character(unlist(task_ids))
if (length(task_ids) > 0L) {
time <- RedisAPI::redis_time(con)
n <- length(task_ids)
con$HMSET(keys$tasks_time_end, task_ids, rep_len(time, n))
con$HMSET(keys$tasks_status, task_ids, rep_len(TASK_ORPHAN, n))
con$HDEL(keys$workers_task, lost_worker_ids)
}
list(workers=lost_worker_ids, tasks=task_ids)
} else {
list(workers=character(0), tasks=character(0))
}
}
## A better way through here would be to only do exited workers:
workers_delete_exited <- function(con, keys, worker_ids=NULL) {
## This only includes things that have been processed and had task
## orphaning completed.
if (is.null(worker_ids)) {
worker_ids <- workers_list_exited(con, keys)
} else {
extra <- setdiff(worker_ids, workers_list_exited(con, keys))
if (length(extra)) {
stop(sprintf("Workers %s may not have exited;\n\trun workers_identify_lost first",
paste(extra, collapse=", ")))
}
}
if (length(worker_ids) > 0L) {
con$HDEL(keys$workers_info, worker_ids)
con$HDEL(keys$workers_status, worker_ids)
con$HDEL(keys$workers_task, worker_ids)
con$DEL(c(rrqueue_key_worker_log(keys$queue_name, worker_ids),
rrqueue_key_worker_message(keys$queue_name, worker_ids),
rrqueue_key_worker_message(keys$queue_name, worker_ids),
rrqueue_key_worker_heartbeat(keys$queue_name, worker_ids),
rrqueue_key_worker_envir(keys$queue_name, worker_ids)))
}
worker_ids
}
workers_info <- function(con, keys, worker_ids=NULL) {
from_redis_hash(con, keys$workers_info, worker_ids,
f=Vectorize(string_to_object, SIMPLIFY=FALSE))
}
## TODO: this is not really the complement of worker_envir; This
## returns a true/false vector over workers, while worker_envir
## returns a character vector of environments that a worker can do.
## Not sure that's 100% desirable.
envir_workers <- function(con, keys, envir_id, worker_ids=NULL) {
if (is.null(worker_ids)) {
worker_ids <- workers_list(con, keys)
}
keys <- rrqueue_key_worker_envir(keys$queue_name, worker_ids)
ret <- vnapply(keys, con$SISMEMBER, envir_id)
storage.mode(ret) <- "logical"
names(ret) <- worker_ids
ret
}
##' Try to stop a worker. You won't need to call this very often, hopefully.
##' @title Stop a worker
##' @param queue Name of the queue
##' @param worker_id Name of a single worker
##' @param type Either "message" to send a message or "kill" or kill the
##' worker.
##' @param host Redis hostname
##' @param port Redis port
##' @export
worker_stop <- function(queue, worker_id, type="message",
host="127.0.0.1", port=6379) {
type <- match_value(type, c("message", "kill"))
con <- redux::hiredis(host=host, port=port)
keys <- rrqueue_keys(queue)
if (type == "message") {
queue_send_message(con, keys, "STOP", worker_ids=worker_id)
} else {
queue_send_signal(con, keys, tools::SIGTERM, worker_id)
}
}
worker_stop_message <- function(worker) {
args <- list(worker$keys$queue_name, worker$name)
redis_config <- worker$con$config()
if (!(redis_config$host %in% c("127.0.0.1", "localhost"))) {
args <- c(args, list(host=redis_config$host))
}
if (redis_config$port != 6379) {
args <- c(args, list(port=redis_config$port))
}
fun <- call("::", quote(rrqueue), quote(worker_stop))
str <- deparse(as.call(c(list(fun), args)),
width.cutoff=getOption("width") - 2L)
msg <- c("If you're trying to exit with Escape/Ctrl-C that won't work",
"Instead, run this from an R instance on this machine:",
"",
paste(" ", str))
message(paste(c(worker$styles$value(msg[1:3]),
worker$styles$key(msg[-(1:3)])),
collapse="\n"))
invisible(msg)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.