tests/testthat/test-consolidation.R

test_that("Consolidation stores and retrieves results correctly", {
  outdir <- tempfile("piecemeal_consolidate_")
  sim <- piecemeal::init(outdir)
  sim$factorial(a = 1:2, b = 1:2)$nrep(2)
  sim$worker(function(a, b, .seed) list(result = a + b + .seed))

  # Run the simulation
  sim$run(shuffle = FALSE)

  # Check that results exist
  initial_files <- list.files(outdir, pattern = "\\.rds$", recursive = TRUE)
  expect_true(length(initial_files) > 0)

  # Get initial result count
  initial_results <- sim$result_list()
  initial_count <- length(initial_results)
  expect_equal(initial_count, 8) # 2x2x2

  # Consolidate the results
  count <- sim$consolidate()
  expect_true(count > 0)

  # Check that files were removed
  remaining_files <- list.files(outdir, pattern = "\\.rds$", recursive = TRUE)
  remaining_files <- remaining_files[!grepl("consolidated\\.db", remaining_files)]
  expect_true(length(remaining_files) < length(initial_files))

  # Check that we can still read all results
  consolidated_results <- sim$result_list()
  expect_equal(length(consolidated_results), initial_count)

  # Check that results are identical - verify contents in detail
  for (i in seq_along(initial_results)) {
    expect_equal(initial_results[[i]]$seed, consolidated_results[[i]]$seed)
    expect_equal(initial_results[[i]]$treatment, consolidated_results[[i]]$treatment)
    expect_equal(initial_results[[i]]$output, consolidated_results[[i]]$output)
    expect_equal(initial_results[[i]]$OK, consolidated_results[[i]]$OK)
    # Verify the actual computed result value
    expected_result <- initial_results[[i]]$treatment$a +
                      initial_results[[i]]$treatment$b +
                      initial_results[[i]]$seed
    expect_equal(consolidated_results[[i]]$output$result, expected_result)
  }

  # Check that result_df still works
  df <- sim$result_df()
  expect_equal(nrow(df), 8)

  # Verify df contains correct values
  for (i in seq_len(nrow(df))) {
    expected_result <- df$a[i] + df$b[i] + df$.seed[i]
    expect_equal(df$result[i], expected_result)
  }

  unlink(outdir, recursive = TRUE)
})

test_that("Consolidation only consolidates successful runs", {
  outdir <- tempfile("piecemeal_consolidate_error_")
  sim <- piecemeal::init(outdir)
  sim$factorial(a = 1:3)$nrep(1)
  sim$worker(function(a) {
    if (a == 2) stop("Intentional error")
    list(result = a)
  })

  # Run the simulation (will have errors)
  suppressMessages(sim$run(shuffle = FALSE))

  # Count files before consolidation
  before_files <- list.files(outdir, pattern = "\\.rds$", recursive = TRUE)
  before_files <- before_files[!grepl("consolidated\\.db", before_files)]
  before_count <- length(before_files)
  expect_equal(before_count, 3) # 3 results (2 success, 1 error)

  # Consolidate
  count <- sim$consolidate()
  expect_equal(count, 2) # Only 2 successful runs consolidated

  # Check that error file remains
  after_files <- list.files(outdir, pattern = "\\.rds$", recursive = TRUE)
  after_files <- after_files[!grepl("consolidated\\.db", after_files)]
  expect_equal(length(after_files), 1) # Only the error file remains

  # Check that we can still read all results (including the error)
  results <- sim$result_list()
  expect_equal(length(results), 3)

  # Check that the error is still accessible
  ok_count <- sum(sapply(results, function(r) r$OK))
  expect_equal(ok_count, 2)

  # Verify contents of successful consolidated runs
  successful_results <- results[sapply(results, function(r) r$OK)]
  expect_equal(length(successful_results), 2)
  for (r in successful_results) {
    # Verify the output matches the input
    expect_equal(r$output$result, r$treatment$a)
    # Verify it's either a=1 or a=3 (not a=2 which errored)
    expect_true(r$treatment$a %in% c(1, 3))
  }

  unlink(outdir, recursive = TRUE)
})

test_that("Consolidation can be called multiple times safely", {
  outdir <- tempfile("piecemeal_consolidate_lock_")
  sim <- piecemeal::init(outdir)
  sim$factorial(a = 1:5)$nrep(1)
  sim$worker(function(a) list(result = a))

  # Run the simulation
  sim$run(shuffle = FALSE)

  # Consolidate all files
  count <- sim$consolidate()
  expect_true(count > 0)

  # Try to consolidate again (should return 0 since no files left)
  count2 <- sim$consolidate()
  expect_equal(count2, 0)

  unlink(outdir, recursive = TRUE)
})

test_that("Consolidation processes all files at once", {
  outdir <- tempfile("piecemeal_consolidate_all_")
  sim <- piecemeal::init(outdir)
  sim$factorial(a = 1:10)$nrep(1)
  sim$worker(function(a) list(result = a))

  # Run the simulation
  sim$run(shuffle = FALSE)

  # Consolidate all files
  count <- sim$consolidate()
  expect_equal(count, 10)

  # Check that all files are consolidated
  final_files <- list.files(outdir, pattern = "\\.rds$", recursive = TRUE)
  final_files <- final_files[!grepl("consolidated\\.db", final_files)]
  expect_equal(length(final_files), 0)

  # Verify all results are still accessible
  results <- sim$result_list()
  expect_equal(length(results), 10)

  # Verify contents of all consolidated results
  for (r in results) {
    expect_true(r$OK)
    # Verify the output matches the input
    expect_equal(r$output$result, r$treatment$a)
    # Verify the value is in the expected range
    expect_true(r$treatment$a >= 1 && r$treatment$a <= 10)
  }

  unlink(outdir, recursive = TRUE)
})

test_that("Consolidation removes empty directories", {
  skip_on_os("windows")
  
  outdir <- tempfile("piecemeal_consolidate_dirs_")
  
  # Create a simulation with subdirectories by using the subdir parameter
  sim <- piecemeal::init(outdir)
  
  # Manually create subdirectories and files to test directory cleanup
  subdir1 <- file.path(outdir, "subdir1")
  subdir2 <- file.path(outdir, "subdir2")
  nested_dir <- file.path(subdir1, "nested")
  
  dir.create(subdir1, recursive = TRUE)
  dir.create(subdir2, recursive = TRUE)
  dir.create(nested_dir, recursive = TRUE)
  
  # Create some result files in subdirectories
  # We need to create valid result files
  result1 <- list(treatment = list(x = 1), seed = 1, output = list(result = 1), OK = TRUE)
  result2 <- list(treatment = list(x = 2), seed = 2, output = list(result = 2), OK = TRUE)
  result3 <- list(treatment = list(x = 3), seed = 3, output = list(result = 3), OK = TRUE)
  
  saveRDS(result1, file.path(subdir1, "result1.rds"))
  saveRDS(result2, file.path(nested_dir, "result2.rds"))
  saveRDS(result3, file.path(subdir2, "result3.rds"))
  
  # Verify directories exist before consolidation
  expect_true(dir.exists(subdir1))
  expect_true(dir.exists(subdir2))
  expect_true(dir.exists(nested_dir))
  
  # Consolidate
  count <- sim$consolidate()
  expect_equal(count, 3)
  
  # Check that empty subdirectories were removed
  expect_false(dir.exists(subdir1), info = "subdir1 should be removed after consolidation")
  expect_false(dir.exists(subdir2), info = "subdir2 should be removed after consolidation")
  expect_false(dir.exists(nested_dir), info = "nested directory should be removed after consolidation")
  
  # Verify the outdir still exists
  expect_true(dir.exists(outdir))
  
  # Verify consolidated.db exists
  expect_true(file.exists(file.path(outdir, "consolidated.db")))
  
  unlink(outdir, recursive = TRUE)
})

test_that("Consolidation preserves file modification times for ETA", {
  outdir <- tempfile("piecemeal_mtime_")
  sim <- piecemeal::init(outdir)
  sim$factorial(a = 1:3)$nrep(2)
  sim$worker(function(a) {
    Sys.sleep(0.1) # Small delay to ensure different mtimes
    list(result = a)
  })
  
  # Run the simulation
  sim$run(shuffle = FALSE)
  
  # Get the result count before consolidation
  results_before <- sim$result_list()
  count_before <- length(results_before)
  expect_equal(count_before, 6)
  
  # Consolidate
  count <- sim$consolidate()
  expect_equal(count, 6)
  
  # Get the result count after consolidation
  results_after <- sim$result_list()
  count_after <- length(results_after)
  
  # The count should be preserved
  expect_equal(count_before, count_after)
  
  # Test that ETA calculation works with consolidated files
  # (This should not throw an error)
  eta_result <- sim$eta(window = 3600)
  expect_true(!is.null(eta_result))
  expect_true(is.numeric(eta_result$recent))
  
  # Verify that the eta() method can access mtimes from consolidated database
  # If mtimes weren't preserved, eta() would fail or return incorrect results
  # The fact that it runs without error is our test that mtimes work
  
  unlink(outdir, recursive = TRUE)
})

test_that("status() works efficiently with consolidated results", {
  outdir <- tempfile("piecemeal_status_")
  sim <- piecemeal::init(outdir)
  sim$factorial(a = 1:3, b = 1:2)$nrep(2)
  sim$worker(function(a, b) {
    if (a == 2 && b == 1) stop("Intentional error")
    list(result = a + b)
  })
  
  # Run the simulation
  # 3 treatments × 2 b-values × 2 reps = 12 total runs
  # Error when a==2 and b==1: 1 treatment combo × 2 reps = 2 error runs
  # Successful runs: 12 - 2 = 10
  suppressMessages(sim$run(shuffle = FALSE))
  
  # Check status before consolidation
  status_before <- sim$status()
  expect_true("Done" %in% names(status_before))
  expect_equal(as.numeric(status_before["Done"]), 10)  # 10 successful runs
  
  # There should be errors
  expect_true(any(grepl("Intentional error", names(status_before))))
  
  # Consolidate (should consolidate only the 10 successful runs)
  count <- sim$consolidate()
  expect_equal(count, 10)
  
  # Check status after consolidation
  # This tests that status() correctly counts consolidated files without reading blob contents
  status_after <- sim$status()
  expect_equal(status_before["Done"], status_after["Done"])
  
  # Error should still be present
  expect_true(any(grepl("Intentional error", names(status_after))))
  
  # Verify counts are consistent
  expect_equal(sum(status_before), sum(status_after))
  
  unlink(outdir, recursive = TRUE)
})

test_that("status() counts only consolidated successful runs", {
  outdir <- tempfile("piecemeal_status_consolidated_")
  sim <- piecemeal::init(outdir)
  sim$factorial(x = 1:5)$nrep(1)
  sim$worker(function(x) list(result = x * 2))
  
  # Run all successful
  sim$run(shuffle = FALSE)
  
  # Status before consolidation - all should be Done
  status_before <- sim$status()
  expect_equal(as.numeric(status_before["Done"]), 5)
  expect_equal(sum(status_before), 5)
  
  # Consolidate all
  count <- sim$consolidate()
  expect_equal(count, 5)
  
  # Status after consolidation - should still be all Done
  status_after <- sim$status()
  expect_equal(as.numeric(status_after["Done"]), 5)
  expect_equal(sum(status_after), 5)
  
  # The results should be identical
  expect_equal(as.numeric(status_before), as.numeric(status_after))
  expect_equal(names(status_before), names(status_after))
  
  unlink(outdir, recursive = TRUE)
})

test_that("run_config() skips runs already in consolidated database", {
  outdir <- tempfile("piecemeal_skip_consolidated_")
  sim <- piecemeal::init(outdir)
  sim$factorial(a = 1:3)$nrep(2)
  sim$worker(function(a) list(result = a * 10))

  # Run the simulation
  sim$run(shuffle = FALSE)

  # Verify all runs completed
  initial_results <- sim$result_list()
  expect_equal(length(initial_results), 6) # 3 treatments × 2 reps

  # Consolidate all results
  count <- sim$consolidate()
  expect_equal(count, 6)

  # Verify all individual files were removed
  remaining_files <- list.files(outdir, pattern = "\\.rds$", recursive = TRUE)
  expect_equal(length(remaining_files), 0)

  # Now try to run again - all runs should be skipped
  # because they're in the consolidated database
  statuses <- sim$run(shuffle = FALSE)

  # Check that all runs were skipped
  # statuses is a character vector with messages like "path\nSKIPPED"
  # Split by \n and check the second part
  status_parts <- strsplit(statuses, "\n", fixed = TRUE)
  status_codes <- sapply(status_parts, function(x) x[2])
  expect_true(all(status_codes == "SKIPPED"))

  # Verify results are still correct and unchanged
  final_results <- sim$result_list()
  expect_equal(length(final_results), 6)

  # Verify the results are identical to before
  for (i in seq_along(initial_results)) {
    expect_equal(initial_results[[i]]$seed, final_results[[i]]$seed)
    expect_equal(initial_results[[i]]$treatment, final_results[[i]]$treatment)
    expect_equal(initial_results[[i]]$output, final_results[[i]]$output)
  }

  unlink(outdir, recursive = TRUE)
})

Try the piecemeal package in your browser

Any scripts or data that you put into this service are public.

piecemeal documentation built on March 11, 2026, 5:07 p.m.