tests/hpc/test-parallel.R

# This interactive test checks parallel efficiency.
# We should not have to wait for all the `x2` branches to finish
# before we move on to `x3`-`x6` branches downstream of the
# first x1 branches.
library(targets)
library(testthat)
tar_script({
  options(clustermq.scheduler = "multiprocess")
  list(
    tar_target(x1, c(0, 0, 5)),
    tar_target(x2, Sys.sleep(x1), pattern = map(x1)),
    tar_target(x3, x2, pattern = map(x2)),
    tar_target(x4, x3, pattern = map(x3)),
    tar_target(x5, x4, pattern = map(x4)),
    tar_target(x6, x5, pattern = map(x5))
  )
})
tar_make_clustermq(workers = 3L, reporter = "timestamp")
tar_destroy()

# All but 1 worker should quit while x4 is running.
# Run in a basic terminal, monitor with htop -d 1, and filter on R.home().
library(targets)
tar_script({
  options(clustermq.scheduler = "multiprocess")
  list(
    tar_target(x1, Sys.sleep(1)),
    tar_target(x2, list(Sys.sleep(1), x1)),
    tar_target(x3, list(Sys.sleep(0), x2)),
    tar_target(x4, list(Sys.sleep(5), x3))
  )
})
tar_make_clustermq(workers = 4L, reporter = "timestamp", callr_function = NULL)
tar_destroy()

# All but 1 worker should quit while the last x4 branch is running.
# Run in a basic terminal, monitor with htop -d 1, and filter on R.home().
library(targets)
tar_script({
  options(clustermq.scheduler = "multiprocess")
  sleep <- function(value, sleep) {
    Sys.sleep(sleep)
    value
  }
  list(
    tar_target(x1, c(0, 0, 0, 5)),
    tar_target(x2, sleep(x1, 5), pattern = map(x1)),
    tar_target(x3, sleep(x2, 5), pattern = map(x2)),
    tar_target(x4, sleep(x3, x3), pattern = map(x3))
  )
})
tar_make_clustermq(workers = 4L, reporter = "timestamp", callr_function = NULL)
expect_equal(unname(tar_read(x4)), c(0, 0, 0, 5))
expect_equal(tar_progress_branches()$built, c(4, 4, 4))
tar_destroy()

# All but 1 worker should quit while x3 is running,
# then all workers should quit while x4 is running.
# Run in a basic terminal, monitor with htop -d 1, and filter on R.home().
library(targets)
tar_script({
  options(clustermq.scheduler = "multiprocess")
  sleep <- function(value, sleep) {
    Sys.sleep(sleep)
    value
  }
  list(
    tar_target(x1, "pass"),
    tar_target(x2, sleep(x1, 5)),
    tar_target(x3, sleep(x2, 5)),
    tar_target(x4, sleep(x3, 5), deployment = "main")
  )
})
tar_make_clustermq(workers = 4L, reporter = "timestamp", callr_function = NULL)
tar_destroy()
unlink("_targets.R")

# Should not try to communicate with workers if they all
# shut down early: https://github.com/ropensci/targets/issues/404
tar_script({
  options(clustermq.scheduler = "multiprocess")
  tar_option_set(deployment = "main")
  list(
    tar_target(index, seq_len(4)),
    tar_target(run, index, pattern = map(index), deployment = "worker"),
    tar_target(end, run)
  )
})
tar_make_clustermq()
tar_destroy()
unlink("_targets.R")

# error = "abridge" should keep current targets going.
# Workers should clean up.
tar_script({
  options(clustermq.scheduler = "multiprocess")
  error_middle <- function() {
    Sys.sleep(4)
    stop("time up")
  }
  just_sleep_short <- function() {
    Sys.sleep(8)
  }
  just_sleep_long <- function() {
    Sys.sleep(12)
  }
  list(
    tar_target(w, error_middle(), error = "abridge"),
    tar_target(x, just_sleep_short()),
    tar_target(y, just_sleep_long()),
    tar_target(z, list(w, x, y))
  )
})
tar_make_clustermq(workers = 3)
out <- tar_progress()
expect_equal(nrow(out), 3L)
expect_equal(out$progress[out$name == "w"], "errored")
expect_equal(out$progress[out$name == "x"], "built")
expect_equal(out$progress[out$name == "y"], "built")
expect_false("z" %in% out$name)
tar_destroy()
unlink("_targets.R")

Try the targets package in your browser

Any scripts or data that you put into this service are public.

targets documentation built on Oct. 12, 2023, 5:07 p.m.