Nothing
test_that("shard_map(N) can run in shm_queue mode for out-buffer workflows", {
skip_on_cran()
if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")
pool_stop()
on.exit(pool_stop(), add = TRUE)
n <- 100L
out <- buffer("integer", dim = n, init = 0L, backing = "mmap")
res <- shard_map(
n,
out = list(out = out),
fun = function(sh, out) {
out[sh$idx] <- sh$idx
NULL
},
workers = 2,
chunk_size = 1,
dispatch_mode = "shm_queue",
diagnostics = TRUE
)
expect_true(succeeded(res))
expect_equal(res$diagnostics$dispatch_mode %||% NULL, "shm_queue")
# Results are not gathered in shm_queue mode; expect NULL placeholders.
rr <- results(res)
expect_true(inherits(rr, "shard_results_placeholder"))
expect_true(all(vapply(rr, is.null, logical(1))))
expect_equal(as.integer(out[]), 1:n)
})
test_that("shm_queue dispatch_opts can override block_size", {
skip_on_cran()
if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")
pool_stop()
on.exit(pool_stop(), add = TRUE)
n <- 32L
out <- buffer("integer", dim = n, init = 0L, backing = "mmap")
res <- shard_map(
n,
out = list(out = out),
fun = function(sh, out) {
out[sh$idx] <- sh$idx
NULL
},
workers = 2,
chunk_size = 1,
dispatch_mode = "shm_queue",
dispatch_opts = list(block_size = 1L),
diagnostics = TRUE
)
expect_true(succeeded(res))
expect_equal(res$shards$num_shards, n)
expect_equal(as.integer(out[]), 1:n)
})
test_that("profile='speed' auto-enables shm_queue for scalar-N out-buffer workflows", {
skip_on_cran()
if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")
pool_stop()
on.exit(pool_stop(), add = TRUE)
n <- 100L
out <- buffer("integer", dim = n, init = 0L, backing = "mmap")
res <- shard_map(
n,
out = list(out = out),
fun = function(sh, out) {
out[sh$idx] <- sh$idx
NULL
},
workers = 2,
chunk_size = 1,
profile = "speed",
diagnostics = TRUE
)
expect_true(succeeded(res))
expect_equal(res$diagnostics$dispatch_mode %||% NULL, "shm_queue")
expect_equal(as.integer(out[]), 1:n)
})
test_that("shard_map(shard_descriptor) can run in shm_queue mode for out-buffer workflows", {
skip_on_cran()
if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")
pool_stop()
on.exit(pool_stop(), add = TRUE)
n <- 100L
sh <- shards(n, block_size = 10L, workers = 2)
out <- buffer("integer", dim = n, init = 0L, backing = "mmap")
res <- shard_map(
sh,
out = list(out = out),
fun = function(shard, out) {
out[shard$idx] <- shard$idx
NULL
},
workers = 2,
chunk_size = 1,
dispatch_mode = "shm_queue",
diagnostics = TRUE
)
expect_true(succeeded(res))
expect_equal(res$diagnostics$dispatch_mode %||% NULL, "shm_queue")
expect_equal(as.integer(out[]), 1:n)
})
test_that("shm_queue reports retry accounting and per-task retry_count for failures", {
skip_on_cran()
if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")
pool_stop()
on.exit(pool_stop(), add = TRUE)
n <- 10L
out <- buffer("integer", dim = n, init = 0L, backing = "mmap")
res <- shard_map(
n,
out = list(out = out),
fun = function(sh, out) {
if (identical(sh$idx, 3L)) stop("boom")
out[sh$idx] <- sh$idx
NULL
},
workers = 2,
chunk_size = 1,
max_retries = 1,
dispatch_mode = "shm_queue",
dispatch_opts = list(block_size = 1L),
diagnostics = TRUE
)
expect_false(succeeded(res))
expect_equal(res$queue_status$failed %||% NULL, 1L)
expect_equal(res$queue_status$total_retries %||% NULL, 1L)
# Task 3 errors twice: first schedules a retry, second marks as failed.
expect_true("3" %in% names(res$failures))
expect_equal(res$failures[["3"]]$id %||% NULL, 3L)
expect_equal(res$failures[["3"]]$retry_count %||% NULL, 2L)
# Only task 3 fails; others write through.
expect_equal(as.integer(out[])[-3], (1:n)[-3])
expect_equal(as.integer(out[])[3], 0L)
})
test_that("shm_queue can write bounded per-worker error logs when enabled", {
skip_on_cran()
if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")
pool_stop()
on.exit(pool_stop(), add = TRUE)
n <- 10L
out <- buffer("integer", dim = n, init = 0L, backing = "mmap")
res <- shard_map(
n,
out = list(out = out),
fun = function(sh, out) {
if (identical(sh$idx, 3L)) stop("boom")
out[sh$idx] <- sh$idx
NULL
},
workers = 2,
chunk_size = 1,
max_retries = 1,
dispatch_mode = "shm_queue",
dispatch_opts = list(block_size = 1L, error_log = TRUE, error_log_max_lines = 10L),
diagnostics = TRUE
)
expect_false(succeeded(res))
logs <- res$diagnostics$error_logs %||% list()
expect_true(length(logs) >= 1L)
paths <- vapply(logs, function(x) x$path %||% NA_character_, character(1))
expect_true(all(file.exists(paths)))
lines <- unlist(lapply(paths, function(p) readLines(p, warn = FALSE)), use.names = FALSE)
expect_true(any(grepl("^3\\tboom$", lines)))
})
test_that("shm_queue timeout cleans up workers and leaves the pool reusable", {
skip_on_cran()
skip_if_conn_exhausted()
if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")
pool_stop()
pool_create(n = 1)
on.exit(pool_stop(), add = TRUE)
out <- buffer("integer", dim = 2L, init = 0L, backing = "mmap")
expect_error(
shard_map(
2L,
out = list(out = out),
fun = function(sh, out) {
Sys.sleep(0.2)
out[sh$idx] <- sh$idx
NULL
},
chunk_size = 1L,
dispatch_mode = "shm_queue",
dispatch_opts = list(block_size = 1L),
timeout = 0.05
),
"timed out"
)
follow_up <- shard_map(
shards(2L, block_size = 1L, workers = 1L),
fun = function(shard) shard$id
)
expect_true(succeeded(follow_up))
expect_equal(unname(unlist(results(follow_up))), c(1, 2))
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.