#' Generate output files resulting from fusion
#'
#' @description
#' Handles all operations needed to "do fusion" using input files generated by a successful call to \code{fusionInput}. Trains a fusion model, generates internal validation results, and then simulates multiple implicates for recipient microdata.
#'
#' @param input Character. Path to directory containing files created by \code{fusionInput}.
#' @param output Character. Optional path to directory where output files will be saved. If \code{output = NULL} (default), the output directory is automatically constructed from \code{input}.
#' @param M Integer. Desired number of fusion implicates. If \code{M = NULL} (default) it is internally set to 40 or, if \code{test_mode = TRUE}, 2 implicates.
#' @param note Character. Optional note supplied by user. Inserted in the log file for reference.
#' @param test_mode Logical. If \code{test_mode = TRUE} (default), the result files are always saved within a "/fusion_" directory in `output` (possibly created); faster hyperparameters are used for \code{\link[fusionModel]{train}}; and the internal validation step is skipped by default.
#' @param validation Logical or integer. Controls execution of internal validation (Steps 3 and 4). If `validation = 0` or `FALSE`, neither step is performed (default when `test_mode = TRUE`). If `1`, only Step 3. If `2` or `TRUE`, both Steps 3 and 4.
#' @param ncores Integer. Number of physical CPU cores used for parallel computation.
#' @param margin Numeric. Passed to same argument in \code{\link[fusionModel]{fuse}}.
#' @param ... Optional, non-default arguments passed to \code{\link[fusionModel]{train}}. For example, \code{fork = TRUE} to enable forked parallel processing.
#'
#' @details The function checks arguments and determines the file path to the appropriate `output` directory (creating it if necessary). The output files are always placed within the appropriate directory hierarchy, based on the donor and recipient information detected in the \code{input} file names. In practice, \code{output} need only be specified if working in an environment where the output files need to located somewhere different from the input files.
#'
#' The function executes the following steps:
#' 1. **Load training data inputs**. Loads donor training microdata and results of \code{\link[fusionModel]{prepXY}}.
#' 1. **Run fusionModel::train()**. Calls \code{\link[fusionModel]{train}} using sensible defaults and hyperparameters. If \code{test_mode = TRUE}, the hyperparameters are designed to do a fast/rough-and-ready model training.
#' 1. **Fuse onto training data for internal validation**. Optional step (see `validation` argument). Fuses multiple implicates to original donor training data using \code{\link[fusionModel]{fuse}}. Results saved to disk.
#' 1. **Run fusionModel::validate()**. Optional step (see `validation` argument). Passes previous step's results to \code{\link[fusionModel]{validate}}. Results saved to disk.
#' 1. **Fuse onto prediction data**. Fuses multiple implicates to supplied input prediction data using \code{\link[fusionModel]{fuse}}. Results saved to disk.
#' 1. **fusionOutput() is finished!** Upon completion, a log file named \code{"outputlog.txt"} is written to \code{output} for reference.
#'
#' @return Saves resulting `output` data files to appropriate local directory. Also saves a .txt log file alongside data files that records console output from \code{fusionOutput}.
#'
#' @examples
#' # Since 'test_mode = TRUE' by default, this will affect files in local '/fusion_' directory
#' dir <- fusionInput(donor = "RECS_2015",
#' recipient = "ACS_2015",
#' respondent = "household",
#' fuse = c("btung", "btuel", "cooltype"),
#' force = c("moneypy", "householder_race", "education", "nhsldmem", "kownrent", "recs_division"),
#' note = "Hello world. Reminder: running in test mode by default.")
#'
#' # List files in the /input directory
#' list.files(dir)
#'
#' # Using default settings
#' out <- fusionOutput(input = dir)
#' list.files(out)
#'
#' @export
# @noRd
#-----
# TESTING
# library(tidyverse)
# library(data.table)
# source("R/utils.R")
#
# donor = "RECS_2020"
# acs_year = 2015
# respondent = "household"
# fusion_vars <- c("aircond", "scaleb", "btuel")
# M <- 1
# note = NULL
# test_mode = TRUE
# validation = TRUE
# ncores = getOption("fusionData.cores")
# margin = 2
#-----
# Dummy respondent location data for testing
# geo <- fst::read_fst("geo-processed/concordance/geo_concordance.fst")
# donor = fst::read_fst("fusion_/RECS/2020/2015/RECS_2020_2015_H_donor.fst")
# temp <- geo %>%
# select(state, county10, tract10) %>%
# distinct() %>%
# slice_sample(n = nrow(donor))
# rlocation <- data.frame(hid = donor$hid)
# rlocation <- cbind(rlocation, temp)
# rlocation <- slice_sample(rlocation, prop = 0.95)
# fusionOutput(donor = "RECS_2020",
# respondent = "H",
# acs_year = 2015,
# fusion_vars = c("aircond", "scaleb", "btuel"),
# fsn = "fusion_/RECS/2020/2015/RECS_2020_2015_H_model.fsn",
# rlocation = rlocation)
#-----
fusionOutput <- function(donor,
respondent,
acs_year,
fusion_vars,
M = 1,
fsn = NULL,
rlocation = NULL,
note = NULL,
test_mode = TRUE,
validation = TRUE,
ncores = 1,
margin = 4,
...) {
tstart <- Sys.time()
# Check validity of the working directory path
# Checks if "/fusionData" is part of the path, as this is required
b <- strsplit(full.path(getwd()), .Platform$file.sep, fixed = TRUE)[[1]]
i <- which(b == "fusionData")
if (length(i) == 0) stop("'/fusionData' is not part of the working directory path; this is required.")
dir <- paste(b[1:i], collapse = .Platform$file.sep)
# Get path to the fusion file directory
donor <- strsplit(donor, "_", fixed = TRUE)[[1]]
dir <- file.path(dir, ifelse(test_mode, "fusion_", "fusion"), donor[1], donor[2], acs_year)
# Check input arguments
respondent <- substring(toupper(respondent), 1, 1)
stopifnot({
respondent %in% c("H", "P")
is.character(fusion_vars) | is.list(fusion_vars)
M > 0 & M %% 1 == 0
is.null(fsn) | is.character(fsn)
is.null(rlocation) | is.data.frame(rlocation)
is.null(note) | is.character(note)
is.logical(test_mode)
is.logical(validation)
ncores > 0 & ncores %% 1 == 0
is.numeric(margin)
})
# Check validity of the 'fusion_vars' argument
fvars <- unlist(fusion_vars)
if (anyDuplicated(fvars)) stop("Duplicate fusion variables provided (must be unique)")
# Check validity of the 'fsn' argument, if present
if (is.character(fsn)) {
stopifnot({
endsWith(fsn, ".fsn")
file.exists(fsn)
})
}
# Set number of cores for 'fst' and 'data.table' packages to use
data.table::setDTthreads(ncores)
fst::threads_fst(ncores)
# Create log file
log.temp <- tempfile()
log.txt <- file(log.temp, open = "wt")
sink(log.txt, split = TRUE, type = "output")
#-----
# Report initial messages to console and log
# The try() wrapper for fusionData is to allow case of running fusionModel::fusionOutput() on a server
cat(format(tstart, usetz = TRUE), "\n")
cat(R.version.string, "\n")
cat("Platform:", R.Version()$platform, "\n")
cat("fusionModel v", as.character(utils::packageVersion("fusionModel")), "\n\n", sep = "")
# Report number of CPU cores and available memory
gc(verbose = FALSE)
freecores <-
cat("Using", ncores, "of", ifelse(Sys.getenv("SLURM_CPUS_PER_TASK") == "", parallel::detectCores(logical = FALSE), as.integer(Sys.getenv("SLURM_CPUS_PER_TASK"))), "available CPU cores\n")
cat("Detected", signif(freeMemory() / 1e3, 3), "GB of available memory at the start\n\n")
# Print the original function arguments
# Excludes 'note', if present, since it is printed separately to log, below
print(match.call.defaults(exclude = if (is.null(note)) NULL else "note"))
cat("\n")
# Print message indicating 'test_mode' value
cat("Running in", ifelse(test_mode, "TEST", "PRODUCTION"), "mode\n\n")
# Report the fusion directory or fail with error message
if (dir.exists(dir)) {
cat("The fusion directory is:\n", dir, "\n\n")
} else {
stop("Requested fusion directory does not exist. Input files not located at: ", dir)
}
# Write 'note' argument to log file (and console), if requested
if (!is.null(note)) cat("User-supplied note:\n", note, "\n\n")
#-----
# Check for presence of training dataset
tfile <- full.path(list.files(dir, pattern = paste0(respondent, "_donor\\.fst$"), full.names = TRUE))
if (length(tfile) == 0) stop("Cannot locate 'donor.fst' file at: ", tfile)
# Check for presence of ACS prediction dataset
rfile <- sub("donor.fst", "recipient.fst", tfile)
if (!file.exists(rfile)) stop("Cannot locate 'recipient.fst' file at: ", rfile)
# Check for presence of donor processed microdata
dfile <- full.path(list.files(path = "survey-processed", pattern = paste(c(donor, respondent, "processed.fst"), collapse = "_"), recursive = TRUE, full.names = TRUE))
if (length(dfile) == 0) stop("Cannot locate donor processed microdata in /survey-processed")
# Check for presence of 'geo_predictors.fst' file
gfile <- full.path("geo-processed/geo_predictors.fst")
if (!file.exists(gfile)) stop("Cannot locate 'geo_predictors.fst' file")
# Update 'stub' used for output files
stub <- file.path(dir, sub("donor.fst", "", basename(tfile), fixed = TRUE))
# Log file paths
log.path <- paste0(stub, "outputlog.txt")
log.path0 <- paste0(stub, "outputlog0.txt") # Possible copy location when training step is skipped
#-----
cat("|=== Assemble training data ===|\n\n")
# Load the training data - TO DO: ADD fusion variables via merge with processed donor microdata
cat("Loading donor harmonized predictors:", basename(tfile), "\n")
train.data <- fst::read_fst(tfile, as.data.table = TRUE)
hvars <- grep("__", names(train.data), fixed = TRUE, value = TRUE) # Harmonized predictor variables
# If respondent locations are provided, replace the imputed state and PUMA with the known values
if (is.data.frame(rlocation)) {
cat("Updating donor imputed respondent location with disclosed location\n")
geo <- fst::read_fst("geo-processed/concordance/geo_concordance.fst")
rlocation <- geo %>%
select(state, puma10, any_of(names(rlocation))) %>%
distinct() %>%
inner_join(rlocation) %>%
select(hid, state, puma10) %>%
filter(hid %in% as.character(train.data$hid)) %>%
na.omit()
# Update the state and PUMA, when possible, using known respondent locations
if (nrow(rlocation) > 0) {
N0 <- nrow(train.data)
pct <- 100 * signif(nrow(rlocation) / nrow(train.data), 3)
cat("Disclosed location provided for ", nrow(rlocation), " donor respondents (", pct, "% of households)\n", sep = "")
train.data <- train.data %>%
mutate_at(vars(state, puma10), as.character) %>%
rows_update(rlocation, by = "hid")
}
if (nrow(train.data) > N0) stop("Problem with 'rlocation'; merge led to erroneous increase in number of donor observations.")
rm(rlocation, geo)
}
# Load donor variables to be fused
cat("Loading fusion variables from", basename(dfile), "\n")
idvars <- intersect(names(train.data), c('hid', 'pid'))
dvars <- names(fst::fst(dfile))
miss <- setdiff(fvars, dvars)
if (length(miss)) stop("The following fusion variables are not present in the donor microdata:\n", paste(miss, collapse = ","), "\n")
fuse.data <- fst::read_fst(dfile, as.data.table = TRUE, columns = c(idvars, fvars)) %>%
setkeyv(idvars)
# Load spatial predictors data (always needed)
cat("Loading spatial predictors from", basename(gfile), "\n")
# Survey vintage (year); returns approximate midpoint year in case of range (e.g. "2014-2016" returns 2015)
svintage <- ceiling(median(eval(parse(text = sub("-", ":", donor[2])))))
spatial.data <- fst::read_fst(gfile, as.data.table = TRUE) %>%
setkey(state, puma10) %>% # TEMP -- should remove 'vintage' as key in gfile
filter(vintage == svintage) %>%
select(-vintage)
# Assemble full dataset needed for model training
cat("Assembling full training dataset\n")
full.data <- train.data %>%
merge(spatial.data, all.x = TRUE, sort = FALSE) %>%
setkeyv(key(fuse.data)) %>%
merge(fuse.data, all.x = TRUE, sort = FALSE) %>%
select(-all_of(c(idvars, key(spatial.data)))) %>%
select(weight, all_of(fvars), everything())
# Safety check for excessive number of factor levels
drop <- names(which(sapply(full.data, nlevels) > 255))
if (length(drop)) {
full.data <- select(full.data, -all_of(drop))
cat("Removed the following variables due to excessive (>255) number of factor levels:\n", paste(drop, collapse = ", "), "\n")
}
cat("Number of training observations:", nrow(full.data), "\n")
rm(train.data, fuse.data)
#-----
# Check for presence of existing '*_model.fsn' object
# If requested, attempt to bypass fusionModel::train() step
mfile <- sub("donor.fst", "model.fsn", tfile)
if (file.exists(mfile) & is.null(fsn)) {
skip.train <- TRUE
fsn <- fsn.path <- mfile
} else {
skip.train <- FALSE
}
# If a 'fsn' object/template is provided (or existing .fsn model already present), use those results to create 'prep' list instead of running prepXY()
if (is.character(fsn)) {
# Extract metadata from 'fsn'
td <- tempfile()
zip::unzip(zipfile = fsn, exdir = td)
meta <- readRDS(file.path(td, "metadata.rds"))
unlink(td)
if (skip.train) {
# Build 'prep' object
prep <- list(y = meta$ylist, x = meta$xpreds)
} else {
cat("\n|=== Existing .fsn model used as training template ===|\n\n")
# Original harmonized predictors in training data of existing 'fsn' model
hvars0 <- names(fst::fst(sub("_model.fsn", "_donor.fst", fsn)))
hvars0 <- grep("__", hvars0, fixed = TRUE, value = TRUE)
# Variable importance results for existing 'fsn' model
vimp <- fusionModel::importance(fsn)$detailed %>%
distinct(y, x) %>%
filter(x %in% names(full.data))
if (!all(fvars %in% vimp$y)) stop("Not all requested 'fusion_vars' are present in the existing 'fsn' model")
if (!all(fvars %in% vimp$y)) stop("Not all requested 'fusion_vars' are present in the existing 'fsn' model")
# Harmonized predictors in current training data that were not present for 'fsn' creation and, therefore, should always be included
hvars <- grep("__", names(full.data), fixed = TRUE, value = TRUE)
force <- setdiff(hvars, hvars0)
# Build 'prep' object
ylist <- lapply(meta$ylist, function(v) intersect(v, fvars)) # Restricts 'fsn' y-ordering to the requested fusion variables, in case only a subset of the 'fsn' fusion variables is requested
ylist <- ylist[lengths(ylist) > 0]
prep <- list(y = ylist)
prep$x <- lapply(prep$y, function(v) {
filter(vimp, y %in% v) %>%
pull(x) %>%
unique() %>%
c(force)
})
xunique <- setdiff(unique(unlist(prep$x)), fvars)
cat("Retained", length(xunique), "of", ncol(full.data) - length(fvars) - 1, "potential predictor variables\n")
}
} else {
# Check for presence of existing '_prep.rds' file
pfile <- sub("donor.fst", "prep.rds", tfile)
if (file.exists(pfile) & !skip.train) {
cat("Detected existing '_prep.rds' object; bypassing fusionModel::prepXY() step\n")
prep <- readRDS(pfile)
} else {
cat("\n|=== Run fusionModel::prepXY() ===|\n\n")
n0 <- nrow(full.data)
pfrac <- min(1, ifelse(test_mode, 5e3, max(20e3, n0 * 0.1)) / n0)
prep <- fusionModel::prepXY(data = full.data,
y = fusion_vars,
x = setdiff(names(full.data), c(fvars, "weight")),
weight = "weight",
cor_thresh = 0.025,
lasso_thresh = 0.99,
xmax = 100,
xforce = NULL,
fraction = pfrac,
cores = ncores)
# Save output from prepXY()
xfile <- paste0(stub, "prep.rds")
saveRDS(prep, file = xfile)
fsize <- signif(file.size(xfile) / 1e6, 3)
cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")
}
}
# Update 'full.data' to reflect results in 'prep'; removes unnecessary predictor variables
full.data <- full.data %>%
select(weight, any_of(unique(c(fvars, unlist(prep$x)))))
#-----
if (skip.train) {
cat("\n|=== Detected existing fusion model (.fsn) object; bypassing fusionModel::train() step ===|\n\n")
# When training is skipped, copy the existing output log file to 'outputlog0.txt'
# This allows the "0" version to retain the original training step console output
# If the "0" version already exists, the copying is skipped (i.e. the "0" version should always contain original training output)
if (!file.exists(log.path0)) file.copy(from = log.path, to = log.path0)
cat("See", basename(log.path0), "for console output from original model training step\n")
} else {
cat("\n|=== Run fusionModel::train() ===|\n\n")
# If there is a "0" version of the output log file, remove it
# Since train() is called below, the new log file will contain appropriate training console output
if (file.exists(log.path0)) unlink(log.path0)
# LightGBM hyper-parameter settings
hyper.params <- if (test_mode) {
cat("Running in TEST mode using fast(er) hyper-parameter settings\n")
list(
boosting = "gbdt",
data_sample_strategy = "goss",
num_leaves = 7,
min_data_in_leaf = max(20, ceiling(nrow(full.data) * 0.01)),
num_iterations = 50,
feature_fraction = 0.5,
learning_rate = 0.2,
max_depth = 3,
max_bin = 255,
min_data_in_bin = ceiling(nrow(full.data) * 0.01),
max_cat_threshold = 32
)
} else {
cat("Running in PRODUCTION mode using the following hyper-parameter settings:\n\n")
hyper.params <- list(
boosting = "gbdt",
data_sample_strategy = "goss",
num_leaves = 31,
min_data_in_leaf = max(10, ceiling(nrow(full.data) * 0.001)),
num_iterations = 2500,
feature_fraction = 0.75,
learning_rate = 0.1,
max_depth = 5,
max_bin = 255,
min_data_in_bin = 3,
max_cat_threshold = 32
)
print(hyper.params)
}
# Train fusion model
cat("Training fusion model\n\n")
fsn.path <- fusionModel::train(data = full.data,
y = prep$y,
x = prep$x,
fsn = paste0(stub, "model.fsn"),
weight = "weight",
hyper = hyper.params,
cores = ncores,
...)
xfile <- paste0(stub, "model.fsn")
fsize <- signif(file.size(xfile) / 1e6, 3)
cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")
}
#-----
if (validation) {
cat("\n|=== Fuse onto donor microdata for internal validation ===|\n\n")
# Fuse multiple implicates to training data for internal validation analysis
validfsd <- fusionModel::fuse(data = full.data %>% select(-all_of(fvars)),
fsn = fsn.path,
M = M,
fsd = paste0(stub, "valid.fsd"),
cores = ncores,
margin = margin)
xfile <- paste0(stub, "valid.fsd")
fsize <- signif(file.size(xfile) / 1e6, 3)
cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")
}
#-----
# # WHAT TO DO WITH THIS?
# # TO DO: Skip this step for now??? Or run at very end using combined results from all acs_years?
# if (validation | validation == 2) {
#
# cat("\n|=== Run fusionModel::validate() ===|\n\n")
#
# # Pass 'valid' implicates to validate() function
# validresults <- fusionModel::validate(observed = merge(train.data, spatial.data, all.x = TRUE),
# implicates = fusionModel::read_fsd(validfsd),
# subset_vars = attr(prep, "xforce"),
# weight = "weight",
# cores = ncores)
#
# # Save 'validation' results as .rds
# vout <- paste0(stub, "validation.rds")
# saveRDS(validresults, file = vout)
# cat("Validation results saved to:\n", vout, "\n")
#
# }
#-----
cat("\n|=== Fuse onto recipient microdata ===|\n\n")
# Load the prediction data and merge spatial predictors
cat("Loading recipient harmonized predictors:", basename(rfile), "\n\n")
predict.data <- fst::read_fst(rfile, as.data.table = TRUE) %>%
merge(spatial.data, all.x = TRUE, sort = FALSE) %>%
select(any_of(c(idvars, names(full.data))))
# Add 'year' column and set keys
set(predict.data, j = 'year', value = as.integer(acs_year))
setkeyv(predict.data, cols = c('year', idvars))
# Remove unnecessary objects in memory prior to fuse()
rm(full.data)
gc(verbose = FALSE)
# Fuse multiple implicates to ACS and save results to disk
cat("Fusing to ACS ", acs_year, " microdata (", M, " ", ifelse(M == 1, "implicate", "implicates"), ")\n\n", sep = "")
fusionModel::fuse(data = predict.data,
fsn = fsn.path,
M = M,
fsd = paste0(stub, "fused.fsd"),
retain = key(predict.data), # Retain the ACS year, household ID, and possibly 'pid' in the output
cores = ncores,
margin = margin)
xfile <- paste0(stub, "fused.fsd")
fsize <- signif(file.size(xfile) / 1e6, 3)
cat("\nResults saved to:", paste0(basename(xfile), " (", fsize, " MB)"), "\n")
#-----
# DEPRECATED
# cat("\n|=== Upload /output files to Google Drive ===|\n\n")
#
# if (upload) {
# if (interactive()) {
# # Check if fusionData package is installed; it is necessary to call uploadFiles()
# fd <- !inherits(try(system.file(package='fusionData', mustWork = TRUE), silent = TRUE), "try-error")
# if (fd) {
# odf <- paste0(stub, c("model.fsn", "valid.fsd", "validation.rds", "fused.fsd"))
# odf <- odf[file.exists(odf)] # Restrict to output files that exist
# uploadFiles(files = odf, ask = TRUE)
# } else {
# cat("fusionData package not installed; file upload skipped.\n")
# }
# } else {
# cat("Non-interactive session: skipping upload to Google Drive\n")
# }
# } else {
# cat("'upload = FALSE'; file upload skipped at request of user.\n")
# }
#-----
# Clean up
rm(predict.data, spatial.data)
gc(verbose = FALSE)
cat("\n|=== fusionOutput() is finished! ===|\n\n")
# Report processing time
tout <- difftime(Sys.time(), tstart)
cat("Total processing time:", signif(as.numeric(tout), 3), attr(tout, "units"), "\n", sep = " ")
# Finish logging and copy log file to /input
cat("\nLog file saved to:\n", log.path, "\n")
sink(type = "output")
close(log.txt)
invisible(file.copy(from = log.temp, to = log.path, overwrite = TRUE))
# Return the /output path invisibly
return(invisible(dir))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.