tests/testthat/test-dispatch.R

test_that("dispatch_chunks processes all chunks", {
  skip_on_cran()

  pool <- pool_create(n = 2)
  on.exit(pool_stop())

  chunks <- lapply(1:5, function(i) list(id = i, value = i * 10))

  result <- dispatch_chunks(
    chunks,
    fun = function(chunk) chunk$value + 1
  )

  expect_s3_class(result, "shard_dispatch_result")
  expect_equal(result$queue_status$completed, 5L)
  expect_equal(result$queue_status$failed, 0L)

  # Check results (ignore names)
  results <- unname(unlist(result$results))
  expect_equal(sort(results), c(11, 21, 31, 41, 51))
})

test_that("pool_lapply works like lapply", {
  skip_on_cran()

  pool <- pool_create(n = 2)
  on.exit(pool_stop())

  x <- 1:10
  result <- pool_lapply(x, function(i) i^2)

  # Compare values, ignoring names
  expect_equal(unname(result), unname(as.list(x^2)))
})

test_that("pool_sapply simplifies results", {
  skip_on_cran()

  pool <- pool_create(n = 2)
  on.exit(pool_stop())

  x <- 1:5
  result <- pool_sapply(x, function(i) i * 2)

  # Compare values, ignoring names
  expect_equal(unname(result), unname(x * 2))
})

test_that("dispatch handles actual worker death by restarting and requeueing work", {
  skip_on_cran()
  skip_if_conn_exhausted()
  if (is_windows()) skip("signal-based worker tests require POSIX")

  pool <- pool_create(n = 2)
  on.exit(pool_stop())

  kill_flag <- tempfile("shard-dispatch-kill-")
  on.exit(unlink(kill_flag), add = TRUE)

  # Create chunks, one of which will kill the worker
  chunks <- list(
    list(id = 1, action = "normal"),
    list(id = 2, action = "normal"),
    list(id = 3, action = "kill"),  # This one kills the worker
    list(id = 4, action = "normal"),
    list(id = 5, action = "normal")
  )

  result <- suppressWarnings(
    dispatch_chunks(
      chunks,
      fun = function(chunk, flag) {
        if (chunk$action == "kill" && !file.exists(flag)) {
          file.create(flag)
          tools::pskill(Sys.getpid(), signal = 9L)
        }
        chunk$id
      },
      flag = kill_flag,
      health_check_interval = 1L,
      max_retries = 2L,
      timeout = 1
    )
  )

  expect_equal(result$queue_status$completed, 5L)
  expect_equal(result$queue_status$failed, 0L)
  expect_equal(sort(unname(unlist(result$results))), 1:5)
  expect_gte(pool_get()$stats$total_deaths, 1L)
})

test_that("dispatch respects max_retries", {
  skip_on_cran()

  pool <- pool_create(n = 1)
  on.exit(pool_stop())

  # Chunk that always fails
  chunks <- list(list(id = 1))

  result <- suppressWarnings(
    dispatch_chunks(
      chunks,
      fun = function(chunk) stop("always_fails"),
      max_retries = 2
    )
  )

  # Should be marked as failed after retries exhausted
  expect_equal(result$queue_status$failed, 1L)
  expect_equal(result$queue_status$completed, 0L)
  expect_gte(result$queue_status$total_retries, 2L)
})

test_that("print.shard_dispatch_result produces output", {
  skip_on_cran()

  pool <- pool_create(n = 1)
  on.exit(pool_stop())

  chunks <- list(list(id = 1, val = 42))
  result <- dispatch_chunks(chunks, fun = function(chunk) chunk$val)

  output <- capture.output(print(result))
  expect_true(any(grepl("shard dispatch result", output)))
  expect_true(any(grepl("Duration", output)))
})

Try the shard package in your browser

Any scripts or data that you put into this service are public.

shard documentation built on April 3, 2026, 9:08 a.m.