test_that("simple CRUD", {
skip_on_cran()
withr::with_options(c(async.worker_pool_size = 4), {
wp <- worker_pool$new()
on.exit(wp$kill_workers())
wl <- wp$list_workers()
expect_equal(nrow(wl), 4)
## Schedule some work, it will not start until a process is ready
for (i in 1:10) {
wp$add_task(function() Sys.getpid(), list(), as.character(i), 42)
}
## Wait until they have started
conns <- wp$get_poll_connections()
expect_equal(length(conns), 4)
ready <- NULL
while (!identical(unlist(ready, use.names = FALSE), rep("ready", 4))) {
ready <- processx::poll(conns, 1000L)
}
## Let the queue know that they have started
## Returns NULL because nothing has finished
expect_null(wp$notify_event(names(conns), event_loop = 42))
## Now four workers must be busy
expect_equal(is.na(wp$list_workers()$task), rep(FALSE, 4))
## Check for result
pids <- wp$get_pids()
expect_equal(length(pids), 4)
ready <- NULL
while (!identical(unlist(ready, use.names = FALSE), rep("ready", 4))) {
ready <- processx::poll(conns, 1000L)
}
## Results are in, four more tasks should be queued
expect_equal(
sort(wp$notify_event(pids = pids, event_loop = 42)),
as.character(1:4))
res <- lapply(as.character(1:4), function(i) wp$get_result(i))
pids <- viapply(res, "[[", "result")
expect_equal(pids, wp$list_workers()$pid)
expect_equal(sum(wp$list_tasks()$status == "running"), 4)
## Cancel the rest
for (i in 5:10) wp$cancel_task(as.character(i))
## Workers are idle, no tasks
expect_equal(is.na(wp$list_workers()$task), rep(TRUE, 4))
expect_equal(NROW(wp$list_tasks()), 0)
wp$kill_workers()
expect_equal(NROW(wp$list_workers()), 0)
})
})
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.