R/simulation_output.R

Defines functions load_simulation_output init_simulation_output_dir map_by_condition.process_chunk map_by_condition.parallel.heuristic map_by_condition new_simulation_output

Documented in init_simulation_output_dir load_simulation_output map_by_condition map_by_condition.parallel.heuristic map_by_condition.process_chunk new_simulation_output

#' Create a eam_simulation_output object
#'
#' @keywords internal
new_simulation_output <- function(
    simulation_config,
    output_dir) {
  # validate inputs
  if (!inherits(simulation_config, "eam_simulation_config")) {
    stop("simulation_config must be a eam_simulation_config object")
  }

  if (!dir.exists(output_dir)) {
    stop("output_dir does not exist: ", output_dir)
  }

  simulation_dataset_dir <- file.path(
    output_dir,
    simulation_output_fs_proto$dataset_dir
  )

  evaluated_conditions_dir <- file.path(
    output_dir,
    simulation_output_fs_proto$evaluated_conditions_dir
  )

  # Create the output object
  ret <- list(
    simulation_config = simulation_config,
    output_dir = output_dir,
    open_dataset = local({
      dataset_dir <- simulation_dataset_dir
      function() {
        arrow::open_dataset(dataset_dir)
      }
    }),
    evaluated_conditions_dir = evaluated_conditions_dir,
    open_evaluated_conditions = local({
      eval_dir <- evaluated_conditions_dir
      function() {
        arrow::open_dataset(eval_dir)
      }
    })
  )

  # Create S3 object
  structure(ret, class = "eam_simulation_output")
}

#' Map a function by condition across simulation output chunks
#'
#' This function processes simulation output by gathering all chunks, iterating
#' through them one by one, filtering and collecting data by chunk, then
#' applying a user-defined function by condition within each chunk.
#'
#' @details
#' This function handles out-of-core computation automatically using Apache Arrow,
#' so you don't need to understand Arrow internals. It loads data chunk by chunk
#' to avoid memory issues with large simulations.
#'
#' If you prefer to manually work with the raw Arrow dataset, you can access it
#' via \code{simulation_output$open_dataset()}, which returns an Arrow Dataset
#' object. You can then use \code{dplyr} verbs to filter and query before calling
#' \code{dplyr::collect()} to load data into memory.
#'
#' @param simulation_output A eam_simulation_output object containing the
#' dataset and configuration
#' @param .f A function to apply to each condition's data. The function should
#' accept a data frame representing one condition's results
#' @param ... Additional arguments passed to the function .f
#' @param .combine Function to combine results (default: dplyr::bind_rows)
#' @param .parallel Logical or NULL.
#' @param .n_cores Integer. Number of CPU cores to use for parallel processing.
#'   If NULL, uses \code{detectCores() - 1}. Only used when
#' \code{.parallel = TRUE}.
#' @param .progress Logical, whether to show a progress bar (default: FALSE)
#' @return A list containing the results of applying .f to each condition,
#' with names corresponding to condition indices
#'
#' @examples
#' # Load simulation output
#' sim_output_path <- system.file(
#'   "extdata", "rdm_minimal", "simulation",
#'   package = "eam"
#' )
#' sim_output <- load_simulation_output(sim_output_path)
#'
#' # Define a summary pipeline
#' summary_pipe <- summarise_by(
#'   .by = c("condition_idx"),
#'   rt_mean = mean(rt),
#'   rt_quantiles = quantile(rt, probs = c(0.1, 0.5, 0.9))
#' )
#'
#' # Apply function to each condition
#' sim_sumstat <- map_by_condition(
#'   sim_output,
#'   .progress = FALSE,
#'   .parallel = FALSE,
#'   function(cond_df) {
#'     summary_pipe(cond_df)
#'   }
#' )
#'
#' @export
map_by_condition <- function(
    simulation_output,
    .f,
    ...,
    .combine = dplyr::bind_rows,
    .parallel = NULL,
    .n_cores = NULL,
    .progress = FALSE) {
  # NSE variable bindings for R CMD check
  chunk_idx <- NULL

  # Validate input
  if (!inherits(simulation_output, "eam_simulation_output")) {
    stop("simulation_output must be a eam_simulation_output object")
  }

  if (!is.function(.f)) {
    stop(".f must be a function")
  }

  # Get the dataset
  dataset <- simulation_output$open_dataset()

  # Gather all chunk indices
  chunk_indices <- dataset |>
    dplyr::select(chunk_idx) |>
    dplyr::distinct() |>
    dplyr::collect() |>
    dplyr::pull(chunk_idx) |>
    sort()

  # Apply heuristic to determine parallel processing
  if (is.null(.parallel)) {
    .parallel <- map_by_condition.parallel.heuristic(chunk_indices)
  }

  # Set up progress bar if requested
  if (.progress && requireNamespace("pbapply", quietly = TRUE)) {
    pb_fun <- pbapply::pblapply
  } else {
    pb_fun <- lapply
    if (.progress) {
      message("Install 'pbapply' package for progress bar support")
    }
  }

  # Define the chunk processing function
  process_chunk <- map_by_condition.process_chunk(dataset, .f, ...)

  # Process chunks based on parallel preference
  if (.parallel) {
    # Setup parallel cluster
    if (is.null(.n_cores)) {
      .n_cores <- max(1, parallel::detectCores(logical = FALSE) - 1)
    }

    cl <- parallel::makeCluster(min(.n_cores, length(chunk_indices)))
    on.exit(parallel::stopCluster(cl), add = TRUE)

    # Note: arrow dataset is not thread-safe, export the open_dataset
    open_dataset_fn <- simulation_output$open_dataset
    process_chunk_parallel <- map_by_condition.process_chunk(
      open_dataset_fn, .f, ...
    )

    # Export required objects and functions to cluster
    f_globals <- codetools::findGlobals(.f, merge = FALSE)
    f_env <- environment(.f)
    all_globals <- c(f_globals$functions, f_globals$variables)
    detected_deps <- all_globals[
      sapply(all_globals, exists, envir = f_env, inherits = TRUE)
    ]
    export_vars <- unique(c(
      "open_dataset_fn",
      ".f",
      "process_chunk_parallel",
      "map_by_condition.process_chunk",
      detected_deps
    ))

    parallel::clusterExport(cl, export_vars, envir = environment())

    # Load required packages on cluster nodes
    parallel::clusterEvalQ(cl, {
      library(dplyr)
      library(arrow)
      library(eam)
    })

    # Run parallel processing
    if (.progress && requireNamespace("pbapply", quietly = TRUE)) {
      all_condition_results <- pbapply::pblapply(
        chunk_indices,
        process_chunk_parallel,
        cl = cl
      )
    } else {
      if (.progress) {
        message("Install 'pbapply' package for progress bar support")
      }
      all_condition_results <- parallel::parLapply(
        cl,
        chunk_indices,
        process_chunk_parallel
      )
    }
  } else {
    # Sequential processing
    all_condition_results <- pb_fun(chunk_indices, process_chunk)
  }

  # Concatenate all chunk results while preserving condition indexing
  all_results <- do.call(c, all_condition_results)

  # If .combine is provided and we have results, combine them
  if (length(all_results) > 0 && !is.null(.combine)) {
    return(do.call(.combine, all_results))
  } else {
    return(all_results)
  }
}


#' Heuristic to determine if parallel processing should be used
#'
#' @param chunk_indices Vector of chunk indices
#' @return Logical value indicating whether to use parallel processing
#' @keywords internal
map_by_condition.parallel.heuristic <- function(chunk_indices) {
  # Check if parallel package is available and has more than 2 cores
  # then check if chunk_indices > 5 times the number of cores
  if (requireNamespace("parallel", quietly = TRUE)) {
    num_cores <- parallel::detectCores(logical = FALSE)
    if (!is.na(num_cores) && num_cores > 2 &&
      length(chunk_indices) > 5 * num_cores) {
      return(TRUE)
    } else {
      return(FALSE)
    }
  } else {
    return(FALSE)
  }
}


#' Process a single chunk for map_by_condition
#'
#' @param open_dataset_fn Arrow dataset object or function that returns a dataset
#' @param .f Function to apply to each condition's data
#' @param ... Additional arguments passed to .f
#' @return Function that processes a chunk_idx
#' @keywords internal
map_by_condition.process_chunk <- function(open_dataset_fn, .f, ...) {
  function(chunk_idx) {
    # NSE variable bindings for R CMD check
    condition_idx <- NULL
    
    # Get the dataset (either use directly or call function)
    dataset <- if (is.function(open_dataset_fn)) {
      open_dataset_fn()
    } else {
      open_dataset_fn
    }

    # Load this chunk's data
    chunk_data <- dataset |>
      dplyr::filter(chunk_idx == !!chunk_idx) |>
      dplyr::collect()

    # Split data by condition and get condition indices
    condition_groups <- chunk_data |>
      dplyr::group_by(condition_idx) |>
      dplyr::group_split(.keep = TRUE)

    # Extract condition indices for naming
    condition_indices <- sapply(
      condition_groups,
      function(x) unique(x$condition_idx)
    )

    # Apply function to each condition's data
    chunk_results <- lapply(condition_groups, .f, ...)

    # Name the results with condition indices
    names(chunk_results) <- as.character(condition_indices)

    return(chunk_results)
  }
}

simulation_output_fs_proto <- list(
  config_file = "simulation_config.rds",
  dataset_dir = "simulation_dataset",
  evaluated_conditions_dir = "evaluated_conditions"
)

#' Initialize simulation output directory structure
#'
#' Creates and validates the output directory structure for a simulation.
#' This function ensures the directory is empty (or creates it), then creates
#' the required subdirectories based on simulation_output_fs_proto.
#'
#' @param output_dir The base output directory path
#' @return The output_dir path (invisibly for chaining)
#' @keywords internal
init_simulation_output_dir <- function(output_dir) {
  # check empty output directory
  if (dir.exists(output_dir) &&
    length(list.files(output_dir, all.files = FALSE, no.. = TRUE)) > 0
  ) {
    stop(
      "The specified simulation output directory must be empty: ",
      output_dir
    )
  }

  # Create subdirectories based on fs_proto
  evaluated_conditions_dir <- file.path(
    output_dir,
    simulation_output_fs_proto$evaluated_conditions_dir
  )
  simulation_dataset_dir <- file.path(
    output_dir,
    simulation_output_fs_proto$dataset_dir
  )

  if (!dir.exists(evaluated_conditions_dir)) {
    dir.create(evaluated_conditions_dir, recursive = TRUE)
  }
  if (!dir.exists(simulation_dataset_dir)) {
    dir.create(simulation_dataset_dir, recursive = TRUE)
  }

  invisible(output_dir)
}


#' Rebuild eam_simulation_output from an existing output directory
#'
#' This function reconstructs a eam_simulation_output object from a
#' previously saved simulation output directory.
#'
#' @param output_dir The directory containing the simulation results and config
#' @return A eam_simulation_output object
#'
#' @examples
#' # Load simulation output from package data
#' sim_output_path <- system.file(
#'   "extdata", "rdm_minimal", "simulation",
#'   package = "eam"
#' )
#' sim_output <- load_simulation_output(sim_output_path)
#'
#' # Access the configuration
#' sim_output$simulation_config
#' 
#' # Access the dataset (check arrow documentation for working with the dataset)
#' dataset <- sim_output$open_dataset()
#'
#' @export
load_simulation_output <- function(output_dir) {
  # Validate that output_dir exists
  if (!dir.exists(output_dir)) {
    stop("Output directory does not exist: ", output_dir)
  }

  # Check for config file
  config_path <- file.path(
    output_dir,
    simulation_output_fs_proto$config_file
  )
  if (!file.exists(config_path)) {
    stop(
      "Simulation config not found in output directory: ", config_path,
      "\nTypically, this simulation did not complete successfully."
    )
  }

  # Check for simulation dataset directory
  simulation_dataset_dir <- file.path(
    output_dir,
    simulation_output_fs_proto$dataset_dir
  )
  if (!dir.exists(simulation_dataset_dir)) {
    stop(
      "Simulation dataset directory not found: ", simulation_dataset_dir,
      "\nThat means, this simulation output directory is incomplete."
    )
  }

  # Load the config
  config <- readRDS(config_path)

  # Validate config
  if (!inherits(config, "eam_simulation_config")) {
    stop("Invalid simulation config found in: ", config_path)
  }

  # Rebuild the output object
  ret <- new_simulation_output(
    simulation_config = config,
    output_dir = output_dir
  )

  return(ret)
}

Try the eam package in your browser

Any scripts or data that you put into this service are public.

eam documentation built on March 29, 2026, 5:07 p.m.