Nothing
test_that("dispatch_chunks processes all chunks", {
skip_on_cran()
pool <- pool_create(n = 2)
on.exit(pool_stop())
chunks <- lapply(1:5, function(i) list(id = i, value = i * 10))
result <- dispatch_chunks(
chunks,
fun = function(chunk) chunk$value + 1
)
expect_s3_class(result, "shard_dispatch_result")
expect_equal(result$queue_status$completed, 5L)
expect_equal(result$queue_status$failed, 0L)
# Check results (ignore names)
results <- unname(unlist(result$results))
expect_equal(sort(results), c(11, 21, 31, 41, 51))
})
test_that("pool_lapply works like lapply", {
skip_on_cran()
pool <- pool_create(n = 2)
on.exit(pool_stop())
x <- 1:10
result <- pool_lapply(x, function(i) i^2)
# Compare values, ignoring names
expect_equal(unname(result), unname(as.list(x^2)))
})
test_that("pool_sapply simplifies results", {
skip_on_cran()
pool <- pool_create(n = 2)
on.exit(pool_stop())
x <- 1:5
result <- pool_sapply(x, function(i) i * 2)
# Compare values, ignoring names
expect_equal(unname(result), unname(x * 2))
})
test_that("dispatch handles actual worker death by restarting and requeueing work", {
skip_on_cran()
skip_if_conn_exhausted()
if (is_windows()) skip("signal-based worker tests require POSIX")
pool <- pool_create(n = 2)
on.exit(pool_stop())
kill_flag <- tempfile("shard-dispatch-kill-")
on.exit(unlink(kill_flag), add = TRUE)
# Create chunks, one of which will kill the worker
chunks <- list(
list(id = 1, action = "normal"),
list(id = 2, action = "normal"),
list(id = 3, action = "kill"), # This one kills the worker
list(id = 4, action = "normal"),
list(id = 5, action = "normal")
)
result <- suppressWarnings(
dispatch_chunks(
chunks,
fun = function(chunk, flag) {
if (chunk$action == "kill" && !file.exists(flag)) {
file.create(flag)
tools::pskill(Sys.getpid(), signal = 9L)
}
chunk$id
},
flag = kill_flag,
health_check_interval = 1L,
max_retries = 2L,
timeout = 1
)
)
expect_equal(result$queue_status$completed, 5L)
expect_equal(result$queue_status$failed, 0L)
expect_equal(sort(unname(unlist(result$results))), 1:5)
expect_gte(pool_get()$stats$total_deaths, 1L)
})
test_that("dispatch respects max_retries", {
skip_on_cran()
pool <- pool_create(n = 1)
on.exit(pool_stop())
# Chunk that always fails
chunks <- list(list(id = 1))
result <- suppressWarnings(
dispatch_chunks(
chunks,
fun = function(chunk) stop("always_fails"),
max_retries = 2
)
)
# Should be marked as failed after retries exhausted
expect_equal(result$queue_status$failed, 1L)
expect_equal(result$queue_status$completed, 0L)
expect_gte(result$queue_status$total_retries, 2L)
})
test_that("print.shard_dispatch_result produces output", {
skip_on_cran()
pool <- pool_create(n = 1)
on.exit(pool_stop())
chunks <- list(list(id = 1, val = 42))
result <- dispatch_chunks(chunks, fun = function(chunk) chunk$val)
output <- capture.output(print(result))
expect_true(any(grepl("shard dispatch result", output)))
expect_true(any(grepl("Duration", output)))
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.