tests/async/test-worker-pool.R

test_that("simple CRUD", {
  skip_on_cran()

  withr::with_options(c(async.worker_pool_size = 4), {
    wp <- worker_pool$new()
    on.exit(wp$kill_workers())
    wl <- wp$list_workers()
    expect_equal(nrow(wl), 4)

    ## Schedule some work, it will not start until a process is ready
    for (i in 1:10) {
      wp$add_task(function() Sys.getpid(), list(), as.character(i), 42)
    }

    ## Wait until they have started
    conns <- wp$get_poll_connections()
    expect_equal(length(conns), 4)

    ready <- NULL
    while (!identical(unlist(ready, use.names = FALSE), rep("ready", 4))) {
      ready <- processx::poll(conns, 1000L)
    }

    ## Let the queue know that they have started
    ## Returns NULL because nothing has finished
    expect_null(wp$notify_event(names(conns), event_loop = 42))

    ## Now four workers must be busy
    expect_equal(is.na(wp$list_workers()$task), rep(FALSE, 4))

    ## Check for result
    pids <- wp$get_pids()
    expect_equal(length(pids), 4)
    ready <- NULL
    while (!identical(unlist(ready, use.names = FALSE), rep("ready", 4))) {
      ready <- processx::poll(conns, 1000L)
    }

    ## Results are in, four more tasks should be queued
    expect_equal(
      sort(wp$notify_event(pids = pids, event_loop = 42)),
      as.character(1:4))
    res <- lapply(as.character(1:4), function(i) wp$get_result(i))
    pids <- viapply(res, "[[", "result")
    expect_equal(pids, wp$list_workers()$pid)
    expect_equal(sum(wp$list_tasks()$status == "running"), 4)

    ## Cancel the rest
    for (i in 5:10) wp$cancel_task(as.character(i))

    ## Workers are idle, no tasks
    expect_equal(is.na(wp$list_workers()$task), rep(TRUE, 4))
    expect_equal(NROW(wp$list_tasks()), 0)

    wp$kill_workers()
    expect_equal(NROW(wp$list_workers()), 0)
  })
})
r-lib/pkgcache documentation built on April 7, 2024, 5:57 a.m.