tests/testthat/test-diagnostics.R

# Tests for diagnostics API

test_that("report() returns shard_report with correct structure", {
  # Without pool
  rpt <- report()
  expect_s3_class(rpt, "shard_report")
  expect_equal(rpt$level, "summary")
  expect_true(!is.null(rpt$timestamp))
  expect_true(!is.null(rpt$memory))

  # With pool
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  rpt <- report()
  expect_s3_class(rpt, "shard_report")
  expect_true(!is.null(rpt$pool))
  expect_equal(rpt$pool$n_workers, 2L)
})

test_that("report() respects detail levels", {
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  # Summary level - no workers
  rpt_summary <- report("summary")
  expect_null(rpt_summary$workers)
  expect_null(rpt_summary$segments)

  # Workers level - has workers
  rpt_workers <- report("workers")
  expect_true(!is.null(rpt_workers$workers))
  expect_length(rpt_workers$workers, 2)

  # Segments level - has segments
  rpt_segments <- report("segments")
  expect_true(!is.null(rpt_segments$segments))
  expect_true(!is.null(rpt_segments$workers))
})

test_that("report() includes shard_result diagnostics", {
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  # Use explicit block_size=1 to get exact shard count
  result <- shard_map(shards(10, block_size = 1), function(s) sum(s$idx), workers = 2)

  rpt <- report("tasks", result = result)
  expect_true(!is.null(rpt$tasks))
  expect_s3_class(rpt$tasks, "shard_report")
  expect_equal(rpt$tasks$shards_total, 10)
  expect_true(!is.null(rpt$result_diagnostics))
})

test_that("mem_report() returns memory statistics", {
  # Without pool
  rpt <- mem_report()
  expect_s3_class(rpt, "shard_report")
  expect_equal(rpt$type, "memory")
  expect_false(rpt$pool_active)

  # With pool
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  rpt <- mem_report()
  expect_true(rpt$pool_active)
  expect_equal(rpt$n_workers, 2)
  expect_length(rpt$workers, 2)
  expect_true(all(vapply(rpt$workers, function(w) !is.null(w$id), logical(1))))
})

test_that("mem_report() computes aggregates correctly", {
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  rpt <- mem_report()

  # Should have computed total, peak, mean from worker RSS values
  expect_true(is.numeric(rpt$total_rss))
  expect_true(is.numeric(rpt$peak_rss))
  expect_true(is.numeric(rpt$mean_rss))

  # Total should be sum of workers
  worker_rss <- vapply(rpt$workers, function(w) w$rss %||% 0, numeric(1))
  valid_rss <- worker_rss[!is.na(worker_rss)]
  if (length(valid_rss) > 0) {
    expect_equal(rpt$total_rss, sum(valid_rss))
  }
})

test_that("cow_report() returns COW statistics", {
  rpt <- cow_report()
  expect_s3_class(rpt, "shard_report")
  expect_equal(rpt$type, "cow")
  expect_true(is.na(rpt$policy))

  # With result
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  result <- shard_map(shards(5), function(s) s$idx, cow = "audit", workers = 2)
  rpt <- cow_report(result)
  expect_equal(rpt$policy, "audit")
})

test_that("copy_report() returns data copy statistics", {
  rpt <- copy_report()
  expect_s3_class(rpt, "shard_report")
  expect_equal(rpt$type, "copy")
  expect_equal(rpt$borrow_exports, 0L)

  # With result
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  result <- shard_map(shards(5), function(s) sum(s$idx), workers = 2)
  rpt <- copy_report(result)
  expect_equal(rpt$result_imports, 5)  # 5 shards = 5 results
  expect_true(rpt$result_bytes > 0)  # Results have some size
})

test_that("task_report() returns task execution statistics", {
  rpt <- task_report()
  expect_s3_class(rpt, "shard_report")
  expect_equal(rpt$type, "task")
  expect_equal(rpt$shards_total, 0L)

  # With result - use explicit block_size=1 for exact shard count
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  result <- shard_map(shards(10, block_size = 1), function(s) sum(s$idx), workers = 2)
  rpt <- task_report(result)

  expect_equal(rpt$shards_total, 10)
  expect_equal(rpt$shards_processed, 10)
  expect_equal(rpt$shards_failed, 0L)
  expect_true(!is.na(rpt$duration))
  expect_true(rpt$duration > 0)
  expect_true(!is.na(rpt$throughput))
  expect_true(rpt$throughput > 0)
})

test_that("task_report() reports failures correctly", {
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  # Create a task that fails for some shards - use block_size=1 for predictable shards
  result <- suppressWarnings(
    shard_map(shards(5, block_size = 1), function(s) {
      if (s$id == 3) stop("deliberate failure")
      sum(s$idx)
    }, workers = 2, max_retries = 1L)
  )

  rpt <- task_report(result)
  expect_true(rpt$shards_failed >= 1)  # At least one failure
})

test_that("segment_report() returns segment information", {
  rpt <- segment_report()
  expect_s3_class(rpt, "shard_report")
  expect_equal(rpt$type, "segment")
  expect_true(!is.null(rpt$backing_summary))
  expect_true(!is.null(rpt$backing_summary$available))
  expect_true(!is.null(rpt$backing_summary$platform))
})

test_that("print methods work without error", {
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  result <- shard_map(shards(5), function(s) sum(s$idx), workers = 2)

  # Test all print methods
  expect_output(print(report()))
  expect_output(print(report("workers")))
  expect_output(print(report("tasks", result = result)))
  expect_output(print(report("segments")))
  expect_output(print(mem_report()))
  expect_output(print(cow_report()))
  expect_output(print(cow_report(result)))
  expect_output(print(copy_report()))
  expect_output(print(copy_report(result)))
  expect_output(print(task_report()))
  expect_output(print(task_report(result)))
  expect_output(print(segment_report()))
})

test_that("report() handles missing/dead workers", {
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  # Get report with live workers
  rpt <- report("workers")

  for (w in rpt$workers) {
    expect_true(w$status %in% c("alive", "ok", "missing", "dead"))
  }
})

test_that("recommendations() is deterministic given the same run telemetry", {
  skip_on_cran()

  pool_stop()
  pool_create(2)
  on.exit(pool_stop(), add = TRUE)

  X <- share(matrix(rnorm(200), nrow = 20), backing = "mmap")
  blocks <- shards(10, block_size = 1)

  # Force view materialization inside the task.
  result <- shard_map(blocks, borrow = list(X = X), fun = function(s, X) {
    v <- view_block(X, cols = idx_range(1, 5))
    sum(materialize(v))
  }, workers = 2, diagnostics = TRUE)

  r1 <- recommendations(result)
  r2 <- recommendations(result)
  expect_identical(r1, r2)
  expect_true(any(grepl("Views were materialized", r1, fixed = TRUE)))
})

test_that("reports have consistent timestamp format", {
  rpt1 <- report()
  rpt2 <- mem_report()
  rpt3 <- cow_report()
  rpt4 <- copy_report()
  rpt5 <- task_report()
  rpt6 <- segment_report()

  expect_s3_class(rpt1$timestamp, "POSIXct")
  expect_s3_class(rpt2$timestamp, "POSIXct")
  expect_s3_class(rpt3$timestamp, "POSIXct")
  expect_s3_class(rpt4$timestamp, "POSIXct")
  expect_s3_class(rpt5$timestamp, "POSIXct")
  expect_s3_class(rpt6$timestamp, "POSIXct")
})

Try the shard package in your browser

Any scripts or data that you put into this service are public.

shard documentation built on April 3, 2026, 9:08 a.m.