tests/testthat/helper.R

mlr3misc::walk(
  list.files(system.file("testthat", package = "rush"), pattern = "^helper.*\\.[rR]", full.names = TRUE),
  source
)


wl_default = function(rush) {
  while (TRUE) {
    xs = list(x1 = runif(1, 0, 10), x2 = runif(1, 0, 10))
    keys = rush$push_running_tasks(list(xs))
    ys = list(y = xs$x1 + xs$x2)
    rush$finish_tasks(keys, yss = list(ys))
    Sys.sleep(1)
  }

  NULL
}

wl_finished = function(rush) {
  xss = list(list(x1 = 1, x2 = 2))
  keys = rush$push_running_tasks(xss)
  rush$finish_tasks(keys, yss = list(list(y = 3)))
  NULL
}

# pops tasks from the queue and finishes them
wl_queue = function(rush) {
  while (!rush$terminated) {
    task = rush$pop_task(fields = c("xs"))
    if (!is.null(task)) {
      fun = function(x1, x2) list(y = x1 + x2)
      ys = mlr3misc::invoke(fun, .args = task$xs)
      rush$finish_tasks(task$key, yss = list(ys))
    }
  }

  NULL
}

# fails a task with an R error if x1 < 1
wl_fail = function(rush) {
  while (!rush$terminated) {
    task = rush$pop_task(fields = c("xs"))

    if (!is.null(task)) {
      if (task$xs$x1 < 1) {
        condition = list(message = "Test error")
        rush$fail_tasks(task$key, conditions = list(condition))
      } else {
        fun = function(x1, x2, ...) {
          list(y = x1 + x2)
        }
        ys = mlr3misc::invoke(fun, .args = task$xs)
        rush$finish_tasks(task$key, yss = list(ys))
      }
    }
  }

  NULL
}

# simulates a segfault by killing the worker process after adding a running task
wl_segfault = function(rush) {
  xs = list(x1 = 1, x2 = 2)
  rush$push_running_tasks(list(xs))
  Sys.sleep(1)
  tools::pskill(Sys.getpid(), tools::SIGKILL)
}

Try the rush package in your browser

Any scripts or data that you put into this service are public.

rush documentation built on April 24, 2026, 5:08 p.m.