R/fusionOutput.R

Defines functions fusionOutput

Documented in fusionOutput

#' Generate output files resulting from fusion
#'
#' @description
#' Handles all operations needed to "do fusion" using input files generated by a successful call to \code{fusionInput}. Trains a fusion model, generates internal validation results, and then simulates multiple implicates for recipient microdata.
#'
#' @param input Character. Path to directory containing files created by \code{fusionInput}.
#' @param output Character. Optional path to directory where output files will be saved. If \code{output = NULL} (default), the output directory is automatically constructed from \code{input}.
#' @param M Integer. Desired number of fusion implicates. If \code{M = NULL} (default) it is internally set to 40 or, if \code{test_mode = TRUE}, 2 implicates.
#' @param note Character. Optional note supplied by user. Inserted in the log file for reference.
#' @param test_mode Logical. If \code{test_mode = TRUE} (default), the result files are always saved within a "/fusion_" directory in `output` (possibly created); faster hyperparameters are used for \code{\link[fusionModel]{train}}; and the internal validation step is skipped by default.
#' @param validation Logical or integer. Controls execution of internal validation (Steps 3 and 4). If `validation = 0` or `FALSE`, neither step is performed (default when `test_mode = TRUE`). If `1`, only Step 3. If `2` or `TRUE`, both Steps 3 and 4.
#' @param ncores Integer. Number of physical CPU cores used for parallel computation.
#' @param margin Numeric. Passed to same argument in \code{\link[fusionModel]{fuse}}.
#' @param ... Optional, non-default arguments passed to \code{\link[fusionModel]{train}}. For example, \code{fork = TRUE} to enable forked parallel processing.
#'
#' @details The function checks arguments and determines the file path to the appropriate `output` directory (creating it if necessary). The output files are always placed within the appropriate directory hierarchy, based on the donor and recipient information detected in the \code{input} file names. In practice, \code{output} need only be specified if working in an environment where the output files need to located somewhere different from the input files.
#'
#' The function executes the following steps:
#' 1. **Load training data inputs**. Loads donor training microdata and results of \code{\link[fusionModel]{prepXY}}.
#' 1. **Run fusionModel::train()**. Calls \code{\link[fusionModel]{train}} using sensible defaults and hyperparameters. If \code{test_mode = TRUE}, the hyperparameters are designed to do a fast/rough-and-ready model training.
#' 1. **Fuse onto training data for internal validation**. Optional step (see `validation` argument). Fuses multiple implicates to original donor training data using \code{\link[fusionModel]{fuse}}. Results saved to disk.
#' 1. **Run fusionModel::validate()**. Optional step (see `validation` argument). Passes previous step's results to \code{\link[fusionModel]{validate}}. Results saved to disk.
#' 1. **Fuse onto prediction data**. Fuses multiple implicates to supplied input prediction data using \code{\link[fusionModel]{fuse}}. Results saved to disk.
#' 1. **fusionOutput() is finished!** Upon completion, a log file named \code{"outputlog.txt"} is written to \code{output} for reference.
#'
#' @return Saves resulting `output` data files to appropriate local directory. Also saves a .txt log file alongside data files that records console output from \code{fusionOutput}.
#'
#' @examples
#' # Since 'test_mode = TRUE' by default, this will affect files in local '/fusion_' directory
#' dir <- fusionInput(donor = "RECS_2015",
#'                    recipient = "ACS_2015",
#'                    respondent = "household",
#'                    fuse = c("btung", "btuel", "cooltype"),
#'                    force = c("moneypy", "householder_race", "education", "nhsldmem", "kownrent", "recs_division"),
#'                    note = "Hello world. Reminder: running in test mode by default.")
#'
#' # List files in the /input directory
#' list.files(dir)
#'
#' # Using default settings
#' out <- fusionOutput(input = dir)
#' list.files(out)
#'
#' @export
# @noRd

#-----

# TESTING
# library(tidyverse)
# library(data.table)
# source("R/utils.R")
#
# donor = "RECS_2020"
# acs_year = 2015
# respondent = "household"
# fusion_vars <- c("aircond", "scaleb", "btuel")
# M <- 1
# note = NULL
# test_mode = TRUE
# validation = TRUE
# ncores = getOption("fusionData.cores")
# margin = 2

#-----

# Dummy respondent location data for testing
# geo <- fst::read_fst("geo-processed/concordance/geo_concordance.fst")
# donor = fst::read_fst("fusion_/RECS/2020/2015/RECS_2020_2015_H_donor.fst")
# temp <- geo %>%
#   select(state, county10, tract10) %>%
#   distinct() %>%
#   slice_sample(n = nrow(donor))
# rlocation <- data.frame(hid = donor$hid)
# rlocation <- cbind(rlocation, temp)
# rlocation <- slice_sample(rlocation, prop = 0.95)

# fusionOutput(donor = "RECS_2020",
#              respondent = "H",
#              acs_year = 2015,
#              fusion_vars = c("aircond", "scaleb", "btuel"),
#              fsn = "fusion_/RECS/2020/2015/RECS_2020_2015_H_model.fsn",
#              rlocation = rlocation)

#-----

fusionOutput <- function(donor,
                         respondent,
                         acs_year,
                         fusion_vars,
                         M = 1,
                         fsn = NULL,
                         rlocation = NULL,
                         note = NULL,
                         test_mode = TRUE,
                         validation = TRUE,
                         ncores = 1,
                         margin = 4,
                         ...) {

  tstart <- Sys.time()

  # Check validity of the working directory path
  # Checks if "/fusionData" is part of the path, as this is required
  b <- strsplit(full.path(getwd()), .Platform$file.sep, fixed = TRUE)[[1]]
  i <- which(b == "fusionData")
  if (length(i) == 0) stop("'/fusionData' is not part of the working directory path; this is required.")
  dir <- paste(b[1:i], collapse = .Platform$file.sep)

  # Get path to the fusion file directory
  donor <- strsplit(donor, "_", fixed = TRUE)[[1]]
  dir <- file.path(dir, ifelse(test_mode, "fusion_", "fusion"), donor[1], donor[2], acs_year)

  # Check input arguments
  respondent <- substring(toupper(respondent), 1, 1)
  stopifnot({
    respondent %in% c("H", "P")
    is.character(fusion_vars) | is.list(fusion_vars)
    M > 0 & M %% 1 == 0
    is.null(fsn) | is.character(fsn)
    is.null(rlocation) | is.data.frame(rlocation)
    is.null(note) | is.character(note)
    is.logical(test_mode)
    is.logical(validation)
    ncores > 0 & ncores %% 1 == 0
    is.numeric(margin)
  })

  # Check validity of the 'fusion_vars' argument
  fvars <- unlist(fusion_vars)
  if (anyDuplicated(fvars)) stop("Duplicate fusion variables provided (must be unique)")

  # Check validity of the 'fsn' argument, if present
  if (is.character(fsn)) {
    stopifnot({
      endsWith(fsn, ".fsn")
      file.exists(fsn)
    })
  }

  # Set number of cores for 'fst' and 'data.table' packages to use
  data.table::setDTthreads(ncores)
  fst::threads_fst(ncores)

  # Create log file
  log.temp <- tempfile()
  log.txt <- file(log.temp, open = "wt")
  sink(log.txt, split = TRUE, type = "output")

  #-----

  # Report initial messages to console and log
  # The try() wrapper for fusionData is to allow case of running fusionModel::fusionOutput() on a server
  cat(format(tstart, usetz = TRUE), "\n")
  cat(R.version.string, "\n")
  cat("Platform:", R.Version()$platform, "\n")
  cat("fusionModel v", as.character(utils::packageVersion("fusionModel")), "\n\n", sep = "")

  # Report number of CPU cores and available memory
  gc(verbose = FALSE)
  freecores <-
  cat("Using", ncores, "of", ifelse(Sys.getenv("SLURM_CPUS_PER_TASK") == "", parallel::detectCores(logical = FALSE), as.integer(Sys.getenv("SLURM_CPUS_PER_TASK"))), "available CPU cores\n")
  cat("Detected", signif(freeMemory() / 1e3, 3), "GB of available memory at the start\n\n")

  # Print the original function arguments
  # Excludes 'note', if present, since it is printed separately to log, below
  print(match.call.defaults(exclude = if (is.null(note)) NULL else "note"))
  cat("\n")

  # Print message indicating 'test_mode' value
  cat("Running in", ifelse(test_mode, "TEST", "PRODUCTION"), "mode\n\n")

  # Report the fusion directory or fail with error message
  if (dir.exists(dir)) {
    cat("The fusion directory is:\n", dir, "\n\n")
  } else {
    stop("Requested fusion directory does not exist. Input files not located at: ", dir)
  }

  # Write 'note' argument to log file (and console), if requested
  if (!is.null(note)) cat("User-supplied note:\n", note, "\n\n")

  #-----

  # Check for presence of training dataset
  tfile <- full.path(list.files(dir, pattern = paste0(respondent, "_donor\\.fst$"), full.names = TRUE))
  if (length(tfile) == 0) stop("Cannot locate 'donor.fst' file at: ", tfile)

  # Check for presence of ACS prediction dataset
  rfile <- sub("donor.fst", "recipient.fst", tfile)
  if (!file.exists(rfile)) stop("Cannot locate 'recipient.fst' file at: ", rfile)

  # Check for presence of donor processed microdata
  dfile <- full.path(list.files(path = "survey-processed", pattern = paste(c(donor, respondent, "processed.fst"), collapse = "_"), recursive = TRUE, full.names = TRUE))
  if (length(dfile) == 0) stop("Cannot locate donor processed microdata in /survey-processed")

  # Check for presence of 'geo_predictors.fst' file
  gfile <- full.path("geo-processed/geo_predictors.fst")
  if (!file.exists(gfile)) stop("Cannot locate 'geo_predictors.fst' file")

  # Update 'stub' used for output files
  stub <- file.path(dir, sub("donor.fst", "", basename(tfile), fixed = TRUE))

  # Log file paths
  log.path <- paste0(stub, "outputlog.txt")
  log.path0 <- paste0(stub, "outputlog0.txt") # Possible copy location when training step is skipped

  #-----

  cat("|=== Assemble training data ===|\n\n")

  # Load the training data - TO DO: ADD fusion variables via merge with processed donor microdata
  cat("Loading donor harmonized predictors:", basename(tfile), "\n")
  train.data <- fst::read_fst(tfile, as.data.table = TRUE)
  hvars <- grep("__", names(train.data), fixed = TRUE, value = TRUE)  # Harmonized predictor variables

  # If respondent locations are provided, replace the imputed state and PUMA with the known values
  if (is.data.frame(rlocation)) {
    cat("Updating donor imputed respondent location with disclosed location\n")
    geo <- fst::read_fst("geo-processed/concordance/geo_concordance.fst")
    rlocation <- geo %>%
      select(state, puma10, any_of(names(rlocation))) %>%
      distinct() %>%
      inner_join(rlocation) %>%
      select(hid, state, puma10) %>%
      filter(hid %in% as.character(train.data$hid)) %>%
      na.omit()

    # Update the state and PUMA, when possible, using known respondent locations
    if (nrow(rlocation) > 0) {
      N0 <- nrow(train.data)
      pct <- 100 * signif(nrow(rlocation) / nrow(train.data), 3)
      cat("Disclosed location provided for ", nrow(rlocation), " donor respondents (", pct, "% of households)\n", sep = "")
      train.data <- train.data %>%
        mutate_at(vars(state, puma10), as.character) %>%
        rows_update(rlocation, by = "hid")
    }
    if (nrow(train.data) > N0) stop("Problem with 'rlocation'; merge led to erroneous increase in number of donor observations.")
    rm(rlocation, geo)
  }

  # Load donor variables to be fused
  cat("Loading fusion variables from", basename(dfile), "\n")
  idvars <- intersect(names(train.data), c('hid', 'pid'))
  dvars <- names(fst::fst(dfile))
  miss <- setdiff(fvars, dvars)
  if (length(miss)) stop("The following fusion variables are not present in the donor microdata:\n", paste(miss, collapse = ","), "\n")
  fuse.data <- fst::read_fst(dfile, as.data.table = TRUE, columns = c(idvars, fvars)) %>%
    setkeyv(idvars)

  # Load spatial predictors data (always needed)
  cat("Loading spatial predictors from", basename(gfile), "\n")
  # Survey vintage (year); returns approximate midpoint year in case of range (e.g. "2014-2016" returns 2015)
  svintage <- ceiling(median(eval(parse(text = sub("-", ":", donor[2])))))
  spatial.data <- fst::read_fst(gfile, as.data.table = TRUE) %>%
    setkey(state, puma10) %>%   # TEMP -- should remove 'vintage' as key in gfile
    filter(vintage == svintage) %>%
    select(-vintage)

  # Assemble full dataset needed for model training
  cat("Assembling full training dataset\n")
  full.data <- train.data %>%
    merge(spatial.data, all.x = TRUE, sort = FALSE) %>%
    setkeyv(key(fuse.data)) %>%
    merge(fuse.data, all.x = TRUE, sort = FALSE) %>%
    select(-all_of(c(idvars, key(spatial.data)))) %>%
    select(weight, all_of(fvars), everything())

  # Safety check for excessive number of factor levels
  drop <- names(which(sapply(full.data, nlevels) > 255))
  if (length(drop)) {
    full.data <- select(full.data, -all_of(drop))
    cat("Removed the following variables due to excessive (>255) number of factor levels:\n", paste(drop, collapse = ", "), "\n")
  }

  cat("Number of training observations:", nrow(full.data), "\n")
  rm(train.data, fuse.data)

  #-----

  # Check for presence of existing '*_model.fsn' object
  # If requested, attempt to bypass fusionModel::train() step
  mfile <- sub("donor.fst", "model.fsn", tfile)
  if (file.exists(mfile) & is.null(fsn)) {
    skip.train <- TRUE
    fsn <- fsn.path <- mfile
  } else {
    skip.train <- FALSE
  }

  # If a 'fsn' object/template is provided (or existing .fsn model already present), use those results to create 'prep' list instead of running prepXY()
  if (is.character(fsn)) {

    # Extract metadata from 'fsn'
    td <- tempfile()
    zip::unzip(zipfile = fsn, exdir = td)
    meta <- readRDS(file.path(td, "metadata.rds"))
    unlink(td)

    if (skip.train) {

      # Build 'prep' object
      prep <- list(y = meta$ylist, x = meta$xpreds)

    } else {

      cat("\n|=== Existing .fsn model used as training template ===|\n\n")

      # Original harmonized predictors in training data of existing 'fsn' model
      hvars0 <- names(fst::fst(sub("_model.fsn", "_donor.fst", fsn)))
      hvars0 <- grep("__", hvars0, fixed = TRUE, value = TRUE)

      # Variable importance results for existing 'fsn' model
      vimp <- fusionModel::importance(fsn)$detailed %>%
        distinct(y, x) %>%
        filter(x %in% names(full.data))

      if (!all(fvars %in% vimp$y)) stop("Not all requested 'fusion_vars' are present in the existing 'fsn' model")
      if (!all(fvars %in% vimp$y)) stop("Not all requested 'fusion_vars' are present in the existing 'fsn' model")

      # Harmonized predictors in current training data that were not present for 'fsn' creation and, therefore, should always be included
      hvars <- grep("__", names(full.data), fixed = TRUE, value = TRUE)
      force <- setdiff(hvars, hvars0)

      # Build 'prep' object
      ylist <- lapply(meta$ylist, function(v) intersect(v, fvars))  # Restricts 'fsn' y-ordering to the requested fusion variables, in case only a subset of the 'fsn' fusion variables is requested
      ylist <- ylist[lengths(ylist) > 0]
      prep <- list(y = ylist)
      prep$x <- lapply(prep$y, function(v) {
        filter(vimp, y %in% v) %>%
          pull(x) %>%
          unique() %>%
          c(force)
      })

      xunique <- setdiff(unique(unlist(prep$x)), fvars)
      cat("Retained", length(xunique), "of", ncol(full.data) - length(fvars) - 1, "potential predictor variables\n")

    }

  } else {

    # Check for presence of existing '_prep.rds' file
    pfile <- sub("donor.fst", "prep.rds", tfile)
    if (file.exists(pfile) & !skip.train) {

      cat("Detected existing '_prep.rds' object; bypassing fusionModel::prepXY() step\n")
      prep <- readRDS(pfile)

    } else {

      cat("\n|=== Run fusionModel::prepXY() ===|\n\n")

      n0 <- nrow(full.data)
      pfrac <- min(1, ifelse(test_mode, 5e3, max(20e3, n0 * 0.1)) / n0)
      prep <- fusionModel::prepXY(data = full.data,
                                  y = fusion_vars,
                                  x = setdiff(names(full.data), c(fvars, "weight")),
                                  weight = "weight",
                                  cor_thresh = 0.025,
                                  lasso_thresh = 0.99,
                                  xmax = 100,
                                  xforce = NULL,
                                  fraction = pfrac,
                                  cores = ncores)

      # Save output from prepXY()
      xfile <- paste0(stub, "prep.rds")
      saveRDS(prep, file = xfile)
      fsize <- signif(file.size(xfile) / 1e6, 3)
      cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")

    }

  }

  # Update 'full.data' to reflect results in 'prep'; removes unnecessary predictor variables
  full.data <- full.data %>%
    select(weight, any_of(unique(c(fvars, unlist(prep$x)))))

  #-----


  if (skip.train) {

    cat("\n|=== Detected existing fusion model (.fsn) object; bypassing fusionModel::train() step ===|\n\n")

    # When training is skipped, copy the existing output log file to 'outputlog0.txt'
    # This allows the "0" version to retain the original training step console output
    # If the "0" version already exists, the copying is skipped (i.e. the "0" version should always contain original training output)
    if (!file.exists(log.path0)) file.copy(from = log.path, to = log.path0)
    cat("See", basename(log.path0), "for console output from original model training step\n")

  } else {

    cat("\n|=== Run fusionModel::train() ===|\n\n")

    # If there is a "0" version of the output log file, remove it
    # Since train() is called below, the new log file will contain appropriate training console output
    if (file.exists(log.path0)) unlink(log.path0)

    # LightGBM hyper-parameter settings
    hyper.params <- if (test_mode) {
      cat("Running in TEST mode using fast(er) hyper-parameter settings\n")
      list(
        boosting = "gbdt",
        data_sample_strategy = "goss",
        num_leaves = 7,
        min_data_in_leaf = max(20, ceiling(nrow(full.data) * 0.01)),
        num_iterations = 50,
        feature_fraction = 0.5,
        learning_rate = 0.2,
        max_depth = 3,
        max_bin = 255,
        min_data_in_bin = ceiling(nrow(full.data) * 0.01),
        max_cat_threshold = 32
      )
    } else {
      cat("Running in PRODUCTION mode using the following hyper-parameter settings:\n\n")
      hyper.params <- list(
        boosting = "gbdt",
        data_sample_strategy = "goss",
        num_leaves = 31,
        min_data_in_leaf = max(10, ceiling(nrow(full.data) * 0.001)),
        num_iterations = 2500,
        feature_fraction = 0.75,
        learning_rate = 0.1,
        max_depth = 5,
        max_bin = 255,
        min_data_in_bin = 3,
        max_cat_threshold = 32
      )
      print(hyper.params)
    }

    # Train fusion model
    cat("Training fusion model\n\n")
    fsn.path <- fusionModel::train(data = full.data,
                                   y = prep$y,
                                   x = prep$x,
                                   fsn = paste0(stub, "model.fsn"),
                                   weight = "weight",
                                   hyper = hyper.params,
                                   cores = ncores,
                                   ...)

    xfile <- paste0(stub, "model.fsn")
    fsize <- signif(file.size(xfile) / 1e6, 3)
    cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")

  }

  #-----

  if (validation) {

    cat("\n|=== Fuse onto donor microdata for internal validation ===|\n\n")

    # Fuse multiple implicates to training data for internal validation analysis
    validfsd <- fusionModel::fuse(data = full.data %>% select(-all_of(fvars)),
                                  fsn = fsn.path,
                                  M = M,
                                  fsd = paste0(stub, "valid.fsd"),
                                  cores = ncores,
                                  margin = margin)

    xfile <- paste0(stub, "valid.fsd")
    fsize <- signif(file.size(xfile) / 1e6, 3)
    cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")

  }

  #-----

  # # WHAT TO DO WITH THIS?
  # # TO DO: Skip this step for now??? Or run at very end using combined results from all acs_years?
  # if (validation | validation == 2) {
  #
  #   cat("\n|=== Run fusionModel::validate() ===|\n\n")
  #
  #   # Pass 'valid' implicates to validate() function
  #   validresults <- fusionModel::validate(observed = merge(train.data, spatial.data, all.x = TRUE),
  #                                         implicates = fusionModel::read_fsd(validfsd),
  #                                         subset_vars = attr(prep, "xforce"),
  #                                         weight = "weight",
  #                                         cores = ncores)
  #
  #   # Save 'validation' results as .rds
  #   vout <- paste0(stub, "validation.rds")
  #   saveRDS(validresults, file = vout)
  #   cat("Validation results saved to:\n", vout, "\n")
  #
  # }

  #-----

  cat("\n|=== Fuse onto recipient microdata ===|\n\n")

  # Load the prediction data and merge spatial predictors
  cat("Loading recipient harmonized predictors:", basename(rfile), "\n\n")
  predict.data <- fst::read_fst(rfile, as.data.table = TRUE) %>%
    merge(spatial.data, all.x = TRUE, sort = FALSE) %>%
    select(any_of(c(idvars, names(full.data))))

  # Add 'year' column and set keys
  set(predict.data, j = 'year', value = as.integer(acs_year))
  setkeyv(predict.data, cols = c('year', idvars))

  # Remove unnecessary objects in memory prior to fuse()
  rm(full.data)
  gc(verbose = FALSE)

  # Fuse multiple implicates to ACS and save results to disk
  cat("Fusing to ACS ", acs_year, " microdata (", M, " ", ifelse(M == 1, "implicate", "implicates"), ")\n\n", sep = "")

  fusionModel::fuse(data = predict.data,
                    fsn = fsn.path,
                    M = M,
                    fsd = paste0(stub, "fused.fsd"),
                    retain = key(predict.data),  # Retain the ACS year, household ID, and possibly 'pid' in the output
                    cores = ncores,
                    margin = margin)

  xfile <- paste0(stub, "fused.fsd")
  fsize <- signif(file.size(xfile) / 1e6, 3)
  cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")

  #-----

  # DEPRECATED
  # cat("\n|=== Upload /output files to Google Drive ===|\n\n")
  #
  # if (upload) {
  #   if (interactive()) {
  #     # Check if fusionData package is installed; it is necessary to call uploadFiles()
  #     fd <- !inherits(try(system.file(package='fusionData', mustWork = TRUE), silent = TRUE), "try-error")
  #     if (fd) {
  #       odf <- paste0(stub, c("model.fsn", "valid.fsd", "validation.rds", "fused.fsd"))
  #       odf <- odf[file.exists(odf)]  # Restrict to output files that exist
  #       uploadFiles(files = odf, ask = TRUE)
  #     } else {
  #       cat("fusionData package not installed; file upload skipped.\n")
  #     }
  #   } else {
  #     cat("Non-interactive session: skipping upload to Google Drive\n")
  #   }
  # } else {
  #   cat("'upload = FALSE'; file upload skipped at request of user.\n")
  # }

  #-----

  # Clean up
  rm(predict.data, spatial.data)
  gc(verbose = FALSE)

  cat("\n|=== fusionOutput() is finished! ===|\n\n")

  # Report processing time
  tout <- difftime(Sys.time(), tstart)
  cat("Total processing time:", signif(as.numeric(tout), 3), attr(tout, "units"), "\n", sep = " ")

  # Finish logging and copy log file to /input
  cat("\nLog file saved to:\n", log.path, "\n")
  sink(type = "output")
  close(log.txt)
  invisible(file.copy(from = log.temp, to = log.path, overwrite = TRUE))

  # Return the /output path invisibly
  return(invisible(dir))

}
ummel/fusionModel documentation built on June 1, 2025, 11 p.m.