tests/testthat/test-shm_queue-dispatch.R

test_that("shard_map(N) can run in shm_queue mode for out-buffer workflows", {
  skip_on_cran()
  if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")

  pool_stop()
  on.exit(pool_stop(), add = TRUE)

  n <- 100L
  out <- buffer("integer", dim = n, init = 0L, backing = "mmap")

  res <- shard_map(
    n,
    out = list(out = out),
    fun = function(sh, out) {
      out[sh$idx] <- sh$idx
      NULL
    },
    workers = 2,
    chunk_size = 1,
    dispatch_mode = "shm_queue",
    diagnostics = TRUE
  )

  expect_true(succeeded(res))
  expect_equal(res$diagnostics$dispatch_mode %||% NULL, "shm_queue")

  # Results are not gathered in shm_queue mode; expect NULL placeholders.
  rr <- results(res)
  expect_true(inherits(rr, "shard_results_placeholder"))
  expect_true(all(vapply(rr, is.null, logical(1))))

  expect_equal(as.integer(out[]), 1:n)
})

test_that("shm_queue dispatch_opts can override block_size", {
  skip_on_cran()
  if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")

  pool_stop()
  on.exit(pool_stop(), add = TRUE)

  n <- 32L
  out <- buffer("integer", dim = n, init = 0L, backing = "mmap")

  res <- shard_map(
    n,
    out = list(out = out),
    fun = function(sh, out) {
      out[sh$idx] <- sh$idx
      NULL
    },
    workers = 2,
    chunk_size = 1,
    dispatch_mode = "shm_queue",
    dispatch_opts = list(block_size = 1L),
    diagnostics = TRUE
  )

  expect_true(succeeded(res))
  expect_equal(res$shards$num_shards, n)
  expect_equal(as.integer(out[]), 1:n)
})

test_that("profile='speed' auto-enables shm_queue for scalar-N out-buffer workflows", {
  skip_on_cran()
  if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")

  pool_stop()
  on.exit(pool_stop(), add = TRUE)

  n <- 100L
  out <- buffer("integer", dim = n, init = 0L, backing = "mmap")

  res <- shard_map(
    n,
    out = list(out = out),
    fun = function(sh, out) {
      out[sh$idx] <- sh$idx
      NULL
    },
    workers = 2,
    chunk_size = 1,
    profile = "speed",
    diagnostics = TRUE
  )

  expect_true(succeeded(res))
  expect_equal(res$diagnostics$dispatch_mode %||% NULL, "shm_queue")
  expect_equal(as.integer(out[]), 1:n)
})

test_that("shard_map(shard_descriptor) can run in shm_queue mode for out-buffer workflows", {
  skip_on_cran()
  if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")

  pool_stop()
  on.exit(pool_stop(), add = TRUE)

  n <- 100L
  sh <- shards(n, block_size = 10L, workers = 2)
  out <- buffer("integer", dim = n, init = 0L, backing = "mmap")

  res <- shard_map(
    sh,
    out = list(out = out),
    fun = function(shard, out) {
      out[shard$idx] <- shard$idx
      NULL
    },
    workers = 2,
    chunk_size = 1,
    dispatch_mode = "shm_queue",
    diagnostics = TRUE
  )

  expect_true(succeeded(res))
  expect_equal(res$diagnostics$dispatch_mode %||% NULL, "shm_queue")
  expect_equal(as.integer(out[]), 1:n)
})

test_that("shm_queue reports retry accounting and per-task retry_count for failures", {
  skip_on_cran()
  if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")

  pool_stop()
  on.exit(pool_stop(), add = TRUE)

  n <- 10L
  out <- buffer("integer", dim = n, init = 0L, backing = "mmap")

  res <- shard_map(
    n,
    out = list(out = out),
    fun = function(sh, out) {
      if (identical(sh$idx, 3L)) stop("boom")
      out[sh$idx] <- sh$idx
      NULL
    },
    workers = 2,
    chunk_size = 1,
    max_retries = 1,
    dispatch_mode = "shm_queue",
    dispatch_opts = list(block_size = 1L),
    diagnostics = TRUE
  )

  expect_false(succeeded(res))
  expect_equal(res$queue_status$failed %||% NULL, 1L)
  expect_equal(res$queue_status$total_retries %||% NULL, 1L)

  # Task 3 errors twice: first schedules a retry, second marks as failed.
  expect_true("3" %in% names(res$failures))
  expect_equal(res$failures[["3"]]$id %||% NULL, 3L)
  expect_equal(res$failures[["3"]]$retry_count %||% NULL, 2L)

  # Only task 3 fails; others write through.
  expect_equal(as.integer(out[])[-3], (1:n)[-3])
  expect_equal(as.integer(out[])[3], 0L)
})

test_that("shm_queue can write bounded per-worker error logs when enabled", {
  skip_on_cran()
  if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")

  pool_stop()
  on.exit(pool_stop(), add = TRUE)

  n <- 10L
  out <- buffer("integer", dim = n, init = 0L, backing = "mmap")

  res <- shard_map(
    n,
    out = list(out = out),
    fun = function(sh, out) {
      if (identical(sh$idx, 3L)) stop("boom")
      out[sh$idx] <- sh$idx
      NULL
    },
    workers = 2,
    chunk_size = 1,
    max_retries = 1,
    dispatch_mode = "shm_queue",
    dispatch_opts = list(block_size = 1L, error_log = TRUE, error_log_max_lines = 10L),
    diagnostics = TRUE
  )

  expect_false(succeeded(res))
  logs <- res$diagnostics$error_logs %||% list()
  expect_true(length(logs) >= 1L)

  paths <- vapply(logs, function(x) x$path %||% NA_character_, character(1))
  expect_true(all(file.exists(paths)))

  lines <- unlist(lapply(paths, function(p) readLines(p, warn = FALSE)), use.names = FALSE)
  expect_true(any(grepl("^3\\tboom$", lines)))
})

test_that("shm_queue timeout cleans up workers and leaves the pool reusable", {
  skip_on_cran()
  skip_if_conn_exhausted()
  if (!shard:::taskq_supported()) skip("shm_queue not supported (no atomics)")

  pool_stop()
  pool_create(n = 1)
  on.exit(pool_stop(), add = TRUE)

  out <- buffer("integer", dim = 2L, init = 0L, backing = "mmap")

  expect_error(
    shard_map(
      2L,
      out = list(out = out),
      fun = function(sh, out) {
        Sys.sleep(0.2)
        out[sh$idx] <- sh$idx
        NULL
      },
      chunk_size = 1L,
      dispatch_mode = "shm_queue",
      dispatch_opts = list(block_size = 1L),
      timeout = 0.05
    ),
    "timed out"
  )

  follow_up <- shard_map(
    shards(2L, block_size = 1L, workers = 1L),
    fun = function(shard) shard$id
  )

  expect_true(succeeded(follow_up))
  expect_equal(unname(unlist(results(follow_up))), c(1, 2))
})

Try the shard package in your browser

Any scripts or data that you put into this service are public.

shard documentation built on April 3, 2026, 9:08 a.m.