knitr::opts_chunk$set(
  error = FALSE,
  collapse = TRUE,
  comment = "#>"
)

lang_output <- function(x, lang) {
  cat(c(sprintf("```%s", lang), x, "```"), sep = "\n")
}

In a perfect world, nothing would fail, and this vignette would not be needed. But here we are.

When you run tasks within a single process and things go wrong, you know immediately you see it happen. With a queuing system like rrq it's less obvious when something bad has happened because the task is running on another process, possibly on a different machine!

This vignette discusses how you can mitigate against and recover from these sorts of issues. We start with the expected errors and build up to considerations for creating a fault tolerant queue.

Error handling

We start with the simplest sort of fault, and can just as easily happen locally as remotely with rrq. In this case the handling is fairly well defined and there's not much you need to do. This section is not really "fault tolerance" at all, but simply how rrq handles errors and what you can do about it.

Consider this simple function which would fit a linear model between two variables:

lang_output(readLines("fault.R"), "r")
library(rrq)
obj <- rrq_controller(paste0("rrq:", ids::random_id(bytes = 4)))
rrq_default_controller_set(obj)
rrq_worker_envir_set(rrq_envir(sources = "fault.R"))
w <- rrq_worker_spawn()

In the happy case, everything works as expected:

x <- runif(5)
y <- 2 * x + rnorm(length(x), 0, 0.2)
t <- rrq_task_create_expr(fit_model(x, y))
rrq_task_wait(t, 10)
rrq_task_status(t)
rrq_task_result(t)

But if we provide invalid input, the task will error:

t <- rrq_task_create_expr(fit_model(x, NULL))
rrq_task_wait(t, 10)
rrq_task_result(t)

The first thing to note here is that the task does not throw an error when you fetch the result, either via task_wait as above, or via task_result:

r <- rrq_task_result(t)
r

However, the result will be an object of rrq_task_error which you can test for using inherits:

inherits(r, "rrq_task_error")

The default behaviour of rrq is not to error when fetching a task as that would require that you use tryCatch everywhere where you retrieve tasks that might have failed, and because errors are often interesting themselves. For example, rrq_task_error objects include stack traces alongside the error:

r$trace

These are rlang stack traces, which are somewhat richer than those produced by traceback(), containing the same set of stacks but arranged in a tree. See the documentation there for details. This error object will also include any warnings captured while the task ran.

Objects of class rrq_task_error inherit from error and condition so, once thrown, will behave as expected in programs using errors for flow control (e.g., with tryCatch); you can throw them yourself with stop():

stop(r)

You can also change the default behaviour to error on failure by passing error = TRUE to rrq_task_result() or rrq_tasks_result(), which will immediately rethrow the error in your R session (your program could then stop or you could again catch it with tryCatch):

rrq_task_result(t, error = TRUE)

This process is unchanged if the task is run in a separate process (with separate_process = TRUE passed to enqueue), with the same status and return type, and with the trace information available after failure.

Increasing resiliance via separate processes

Running tasks in separate processes (e.g., rrq_task_create_expr(mytask(), separate_process = TRUE)) is the simplest way of making things more resiliant because this creates a layer of isolation between the worker and the task. If your task crashes R (e.g., a segmentation fault due to a bug in your C/C++ code) or is killed by the operating system then the worker process survives and can update the keys in Redis directly to advertise this fact. This is generally much nicer than when the worker dies and the task status cannot be updated.

The downside of using separate processes is that it is much slower; compare the time taken to queue, run an retrieve a trivial task run in the same worker process (look at the elapsed entry)

system.time(
  rrq_task_wait(rrq_task_create_expr(identity(1)), 10))

with the same task run in a separate process on the worker

system.time(
  rrq_task_wait(rrq_task_create_expr(identity(1), separate_process = TRUE), 10))

We expect this difference to be ~100 fold in local workers. Almost all the cost is due to the overhead of starting and terminating a fresh R session. If your queue uses a lot of packages there will be additional overhead here too as these are loaded. However, this cost is fixed and will decrease as a fraction of the total running time as the running time of your task increases. So for long-running tasks the additional safety of separate processes is probably worthwhile. For smaller tasks you may want to make sure you run a heartbeat process so that you can handle failures via that route.

Consider running some long running job; this one simply sleeps for an hour

t <- rrq_task_create_expr(Sys.sleep(3600), separate_process = TRUE)
# small sleep here needed so that the pid is actually available;
# two small conditions to pass.
rrq:::wait_status_change(obj, t, rrq:::TASK_PENDING)
rrq:::wait_timeout("pid not available in time", 10,
                   function() is.null(rrq_task_info(t)$pid))

To simulate the process crashing, we've killed it. Because we have queued this on a separate process we can use rrq_task_info() to fetch the process id (PID) of the process that the task is running in (this is different to that of the worker).

info <- rrq_task_info(t)
tools::pskill(info$pid)
rrq:::wait_status_change(obj, t, rrq:::TASK_RUNNING)

Because the task was run in a separate process, our worker could detect that the task has died unexpectedly:

rrq_task_status(t)

This is different to the error status in the previous section (that was r rrq:::TASK_ERROR). Note that if we had not run the task in a separate process the task status would be unchanged as r rrq:::TASK_RUNNING) because nothing could ever update the task status!

Retrieving the result has similar behaviour to the error case; we don't throw but instead return an object of class rrq_task_error (which also inherits from error and condition). However, this time there's really not much extra information in the error:

r <- rrq_task_result(t)
r

There's also no trace available

r$trace

Loss of workers

In this section we outline what you can do about unexplained and unreported failures in your task, or loss of workers. This will typically be that your worker has crashed (due to your task crashing it perhaps), killed (e.g., by the operating system or an administrator) or the loss of the machine that it is working on.

In order to enable fault tolerance for this sort of issue, you first need to enable a "heatbeat" on the worker processes. This is a second process on each worker that periodically writes to the Redis database on a key that will expire in a time slightly longer than that period, in effect making a dead man's switch - see rrq::rrq_heartbeat for details. So if the worker process dies for any reason, then after a while we'll detect that as its key has expired. We can then take some action.

rrq_worker_stop(timeout = 10)
rrq_worker_delete_exited()

To enable the heartbeat, save a worker configuration before starting a worker:

rrq_worker_config_save(
  "localhost",
  rrq_worker_config(heartbeat_period = 3))

When you spawn a worker it will pick up this configuration, and we'll be able to detect if it has died.

w <- rrq_worker_spawn()

In order to simulate the loss of the worker, we will kill it after starting a long-running task:

Now, we can queue some long running process, which this worker will start:

t <- rrq_task_create_expr(Sys.sleep(3600))
rrq:::wait_status_change(obj, t, rrq:::TASK_PENDING)

The worker wil pick the task up fairly quickly, and the status will change to RUNNING:

rrq_task_status(t)
rrq_worker_status(w$id)

We kill the worker (simulating the job crashing, or the machine turning off, etc):

w$kill()

Because the worker has been killed, it can't write to redis to tell us that the task can't be completed, so this status will never change:

rrq_task_status(t)
rrq_worker_status(w$id)

The heartbeat will persist for 3 times the period given above (this multiplier is not configurable and while any number greater than 1 should be OK, we picked this as it allows for occasional network connectivity issues or slowness on the node - we may reduce it in a future version). This means that after 9s (3 * 3s) the key will have expired:

Sys.sleep(10)

We can then use the worker_detect_exited() method to clean up

rrq_worker_detect_exited()

At this point, the statuses of our task and worker are correct:

rrq_task_status(t)
rrq_worker_status(w$id)

Fetching the task result provides the same r rrq:::TASK_DIED error as above:

rrq_task_result(t)

This is still not terribly useful, as we have not provided any mechanism to automatically requeue a lost task or restart a dead worker, which we cover in the next section.

Retrying tasks

Regardless of how your tasks got to be in a broken situation, the mechanism for getting them out of this situation remains the same; you want to retry the task:

Alternatively, perhaps your task ran to successful completion but you simply want to rerun it (e.g., some stochastic algorithm that is displaying a non-fatal pathology).

You can use rrq_task_retry to retry any number of tasks that have completed (in one of the terminal states - including COMPLETE but also ERROR, CANCELLED DIED or TIMEOUT). This is nondestructive to any of the task data, in particular its result, so you will still have access to the failed run and its stack trace (but see below).

An example where retrying fixes an error

In this example, we'll run some "model" that may or may not converge, and we want to retry until it does. This represents some badly behaved task with a stochastic component that will eventually work if tried enough times:

lang_output(readLines("fault2.R"), "r")

We can create a new environment for our worker, to pick up this function:

rrq_worker_envir_set(rrq_envir(sources = "fault2.R"))
w <- rrq::rrq_worker_spawn()
# this ensures that the worker produces the random numbers
#   0.2655087, 0.3721239, 0.5728534
# in order, which we rely on below
rrq::rrq_message_send_and_wait(
  "EVAL",
  quote(set.seed(1)),
  w$id,
  progress = FALSE,
  timeout = 10)

Now, we enqueue a task as usual, then wait on it

t1 <- rrq_task_create_expr(stochastic_failure(0.5))
rrq_task_wait(t1, timeout = 10)
rrq_task_result(t1)

Oh no, this task has failed!

We can use the task_retry method to requeue this job, meaning that it will run exactly as before. However, the random number stream has moved on and this task will return a different answer - perhaps it will work this time?

t2 <- rrq_task_retry(t1)
rrq_task_wait(t2, timeout = 10)
rrq_task_result(t2)

The task has failed again. Note here that task_retry has taken a task id and returned a new task id, which we have waited on. However, we could also have used the original id (more on this below).

Let's give it one more go:

t3 <- rrq_task_retry(t2)
rrq_task_wait(t3, timeout = 10)
rrq_task_result(t3)

Success! The task has run to completion and we no longer have an error.

When you retry a task, rrq sets up redirects that point from the old task id to the new one, and in most cases you can use whichever you find more convenient. For example, we can now read the successful task result from any of the task ids:

rrq_task_results(c(t1, t2, t3))

To prevent this, pass follow = FALSE to return the original result

rrq_task_results(c(t1, t2, t3), follow = FALSE)

Here, we can see that the first two times we ran the task they failed, along with the error and any backtraces. The same logic applies to other functions, such as rrq_task_status()

rrq_task_status(c(t1, t2, t3))
rrq_task_status(c(t1, t2, t3), follow = FALSE)

Note that the task status above has overwritten the original status (previously it was r rrq:::TASK_ERROR, but now it is r rrq:::TASK_MOVED). However, you can always read the original status in the error object's status field, should you need it.

You can also extract times that events happened with rrq_task_times()

rrq_task_times(c(t1, t2, t3))
rrq_task_times(c(t1, t2, t3), follow = FALSE)

This means that most of the time you can ignore that a task has been retried and work with whatever task id you have handy.

You can inspect the chain of tasks by looking at the return values rrq_task_info(). In the above example we have a chain of three tasks r paste0(substr(c(t1, t2, t3), 1, 6), "...", collapse = " -> "), which we can discover this way:

rrq_task_info(t1)

Here, the moved field shows the tasks that are upstream (up) and downstream (down) of this task. For tasks that have never been retried both elements are NULL. The elements are always ordered from oldest to newest, and chains of tasks are always linear (i.e., there is no forking).

For the middle task in the chain, both up and down point at different tasks:

rrq_task_info(t2)$moved

and for the leaf task:

rrq_task_info(t3)$moved

Performance considerations

It is possible to change the default behaviour of follow to not follow through a task chain. Doing this might be slightly more efficient in cases where you are interested in running large numbers of small tasks and do not want to retry failed tasks. This is because almost every request that involves a task id will have to check to see if it has been moved.

To change this behaviour, pass the follow argument to the queue constructor, for example:

obj_nofollow <- rrq_controller(obj$queue_id, follow = FALSE)
rrq_task_result(t1, controller = obj_nofollow)

Here, we would create a queue controller object with the default follow behaviour set to FALSE and so accessing a task result of a moved task would not search down through any potential retries unless follow = TRUE was explicitly passed.

Deleting tasks that have been retried

Deletion will operate on all tasks in a chain. So if you delete a task that has been retried then it deletes everything upstream (up to the original task) and downstream (down to the last time that the task was retried). This is because otherwise it is too easy to end up with inconsistent state. So if we run

rrq_task_delete(t1)

all three tasks in the chain are deleted:

rrq_task_status(c(t1, t2, t3), follow = FALSE)

Going further

Currently this system does not allow you to do a few things that would be useful

We will relax these limitations soon.



mrc-ide/rrq documentation built on May 1, 2024, 5:35 a.m.