#' Fuse variables to a recipient dataset
#'
#' @description
#' Fuse variables to a recipient dataset using a .fsn model produced by \code{\link{train}}. Output can be passed to \code{\link{analyze}} and \code{\link{validate}}.
#'
#' @param data Data frame. Recipient dataset. All categorical variables should be factors and ordered whenever possible. Data types and levels are strictly validated against predictor variables defined in \code{fsn}.
#' @param fsn Character. Path to fusion model file (.fsn) generated by \code{\link{train}}.
#' @param fsd Character. Optional fusion output file to be created ending in \code{.fsd} (i.e. "fused data"). This is a compressed binary file that can be read using the \code{\link[fst]{fst}} package. If \code{fsd = NULL} (the default), the fusion results are returned as a \code{\link[data.table]{data.table}}.
#' @param M Integer. Number of implicates to simulate.
#' @param retain Character. Names of columns in \code{data} that should be retained in the output; i.e. repeated across implicates. Useful for retaining ID or weight variables for use in subsequent analysis of fusion output.
#' @param kblock Integer. Fixed number of nearest neighbors to use when fusing variables in a block. Must be >= 5 and <= 30. Not applicable for variables fused on their own (i.e. no block).
#' @param margin Numeric. Safety margin used when estimating how many implicates can be processed in memory at once. Set higher if \code{fuse()} experiences a memory shortfall. Alternatively, can be set to a negative value to manually specify the number of chunks to use. For example, `margin = -3` splits `M` implicates into three chunks of approximately equal size.
#' @param cores Integer. Number of cores used. LightGBM prediction is parallel-enabled on all systems if OpenMP is available.
#'
#' @details TO UPDATE.
#'
#' @return If \code{fsd = NULL}, a \code{\link[data.table]{data.table}} with number of rows equal to \code{M * nrow(data)}. Integer column "M" indicates implicate assignment of each observation. Note that the ordering of recipient observations is consistent within implicates, so do not change the row order if using with \code{\link{analyze}}.
#' @return If \code{fsd} is specified, the path to .fsd file where results were written. Metadata for column classes and factor levels are stored in the column names. \code{\link{read_fsd}} should be used to load files saved via the \code{fsd} argument.
#'
#' @examples
#' # Build a fusion model using RECS microdata
#' # Note that "fusion_model.fsn" will be written to working directory
#' ?recs
#' fusion.vars <- c("electricity", "natural_gas", "aircon")
#' predictor.vars <- names(recs)[2:12]
#' fsn.path <- train(data = recs, y = fusion.vars, x = predictor.vars)
#'
#' # Generate single implicate of synthetic 'fusion.vars',
#' # using original RECS data as the recipient
#' recipient <- recs[predictor.vars]
#' sim <- fuse(data = recipient, fsn = fsn.path)
#' head(sim)
#'
#' # Calling fuse() again produces different results
#' sim <- fuse(data = recipient, fsn = fsn.path)
#' head(sim)
#'
#' # Generate multiple implicates
#' sim <- fuse(data = recipient, fsn = fsn.path, M = 5)
#' head(sim)
#' table(sim$M)
#'
#' # Optionally, write results directly to disk
#' # Note that "results.fsd" will be written to working directory
#' sim <- fuse(data = recipient, fsn = fsn.path, M = 5, fsd = "results.fsd")
#' sim <- read_fsd(sim)
#' head(sim)
#'
#' @export
# NOT USED -- maybe reintroduce at some point
# @param max_dist Numeric. Controls the maximum allowable distance when identifying up to \code{k} nearest neighbors. \code{max_dist = 0} (default) means no distance restriction is applied.
# @param idw Logical. Should inverse distance weighting be used when randomly selecting a donor observation from the \code{k} nearest neighbors? If \code{idw = FALSE} (the default), sample weights are used instead.
# @details The \code{max_dist} value is a relative scaling factor. The search radius passed to \code{\link[RANN]{nn2}} is \code{max_dist * medDist}, where \code{medDist} is the median pairwise distance calculated among all of the donor observations. This approach allows \code{max_dist} to adjust the search radius in a consistent way across all fusion variables/blocks.
# @param ignore_self Logical. If \code{TRUE}, the simulation step excludes "self matches" (i.e. row 1 in \code{data} cannot match with row 1 in the original donor. Only useful for validation exercises. Do not use otherwise.
# @param seed Set random seed if needed.
# #' # Test to confirm that 'ignore_self' works as intended
# #' recs$electricity <- jitter(recs$electricity)
# #' train(data = recs, y = c("electricity", "natural_gas"), x = predictor.vars, file = fsn.path)
# #'
# #' sim1 <- fuse(data = recipient, fsn = fsn.path, ignore_self = FALSE, k = 10)
# #' sim2 <- fuse(data = recipient, fsn = fsn.path, ignore_self = TRUE, k = 10)
# #'
# #' sum(sim1 == recs$electricity)
# #' sum(sim2 == recs$electricity)
#---------------------
# Manual testing
# library(fusionModel)
# library(tidyverse)
# library(data.table)
# source("R/utils.R")
# fusion.vars <- c("electricity", "natural_gas", "aircon")
# predictor.vars <- names(recs)[2:12]
# fsn <- train(data = recs, y = fusion.vars, x = predictor.vars)
# data <- recs
# fsn <- "~/Documents/Projects/fusionData/fusion_/RECS/2020/2015/H/output/RECS_2020_2015_H_model.fsn"
# train.data <- fst::read_fst("~/Documents/Projects/fusionData/fusion_/RECS/2020/2015/H/input/RECS_2020_2015_H_train.fst")
# spatial.data <- fst::read_fst("~/Documents/Projects/fusionData/fusion_/RECS/2020/2015/H/input/RECS_2020_2015_H_spatial.fst")
# data <- merge(train.data, spatial.data, all.x = TRUE)
# fsd = "results.fsd"
# M <- 2
# kblock = 10
# cores = 1
# margin = 2
# retain = "weight" # TEST retaining certain variables in output
#---------------------
fuse <- function(data,
fsn,
fsd = NULL,
M = 1,
retain = NULL,
kblock = 10,
margin = 2,
cores = 1) {
t0 <- Sys.time()
stopifnot(exprs = {
is.data.frame(data)
file.exists(fsn) & endsWith(fsn, ".fsn")
is.null(fsd) | is.character(fsd)
M > 0 & M %% 1 == 0
is.null(retain) | all(retain %in% names(data))
kblock >= 5 & kblock <= 30
cores > 0 & cores %% 1 == 0 & cores <= parallel::detectCores(logical = FALSE)
margin != 0
})
# Ensure 'fsd' argument is valid (is directory creation necessary?)
if (!is.null(fsd)) {
if (!endsWith(fsd, ".fsd")) stop("Argument 'fsd' must have file suffix '.fsd'")
dir <- full.path(dirname(fsd), mustWork = FALSE)
if (!dir.exists(dir)) dir.create(dir, recursive = TRUE)
fsd.path <- file.path(dir, basename(fsd))
}
# Convert input data to data.table for efficiency
data <- as.data.table(data)
dkey <- key(data) # Store original data.table key(s); if present, enforced in final output
# Temporary directory to unzip .fsn contents to and possible save .fst result chunks
td <- tempfile()
dir.create(td)
# Names of files within the .fsn object
fsn.files <- zip::zip_list(fsn)
pfixes <- sort(unique(dirname(fsn.files$filename)))
pfixes <- pfixes[pfixes != "."]
# Unzip all files in .fsn to temporary directory
zip::unzip(zipfile = fsn, exdir = td)
# Load model metadata
meta <- readRDS(file.path(td, "metadata.rds"))
# Names, order, and 'blocks' of response variables
ylist <- meta$ylist
yvars <- unlist(ylist)
# Check that necessary predictor variables are present
xvars <- names(meta$xclass)
miss <- setdiff(xvars, names(data))
if (length(miss) > 0) stop("The following predictor variables are missing from 'data':\n", paste(miss, collapse = ", "))
# Variables in 'data' that are requested to retain in output
dretain <- data[, ..retain]
# Restrict 'data' to 'xvars'
data <- data[, ..xvars]
# Check for appropriate class/type of predictor variables
xclass <- meta$xclass
xtest <- lapply(data, class)
miss <- !purrr::map2_lgl(xclass, xtest, sameClass)
if (any(miss)) stop("Incompatible data type for the following predictor variables:\n", paste(names(miss)[miss], collapse = ", "))
# Check for appropriate levels of factor predictor variables
xlevels <- meta$xlevels
xtest <- lapply(subset(data, select = names(xlevels)), levels)
miss <- !purrr::map2_lgl(xlevels, xtest, identical)
if (any(miss)) stop("Incompatible levels for the following predictor variables\n", paste(names(miss)[miss], collapse = ", "))
#-----
# Print variable information to console
cat(length(yvars), "fusion variables\n")
cat(length(xvars), "initial predictor variables\n")
cat(nrow(data), "observations\n")
# TO DO possibly: If there are NOT NA's in the training but NA's in 'data', might want to simply impute the NA's
# NOT NEEDED?
# # Detect and impute any missing values in 'data'
# na.cols <- names(which(sapply(data, anyNA)))
# if (length(na.cols) > 0) {
# cat("Missing values imputed for the following variable(s):\n", paste(na.cols, collapse = ", "), "\n")
# for (j in na.cols) {
# x <- data[[j]]
# ind <- is.na(x)
# data[ind, j] <- imputationValue(x, ind)
# }
# }
#-----
# Load the original numeric donor response data from which simulated values are drawn
# This should include all fusion variables other than solo categorical (multiclass or logical) variables
ydata <- fst::fst(path = file.path(td, "donor.fst"))
W <- ydata$W
#-----
# Allocate columns in 'data' for the fusion/response variables
N0 <- nrow(data)
rmat <- matrix(data = NA_real_, nrow = N0, ncol = length(yvars), dimnames = list(NULL, yvars))
rsize <- objectSize(rmat)
data <- cbind(data, rmat)
setcolorder(data, meta$dnames) # This ensures that 'dmat' has columns in correct/original order for purposes of LightGBM prediction
rm(rmat)
# Coerce 'data' to matrix compatible with input to LightGBM
# Ensure that 'dmat' is double precision; LightGBM enforces this internally
dmat <- to_mat(data)
storage.mode(dmat) <- "double"
#if (storage.mode(dmat) != "double") storage.mode(dmat) <- "double"
dsize <- objectSize(dmat)
rm(data)
# Determine how may implicates to process in each chunk, given available memory
# rsize * M: the maximum size of the results data.table that will be created, if loaded into memory
# dsize: initial size of the prediction data (including fusion variable placeholder columns) prior to expansion
# Resulting vector 'nimp' gives the integer number of implicates to process in each of 'nstep' chunks
# The implicates are spread as evenly as possible across chunks
mfree <- freeMemory()
if (rsize * M > mfree) stop("Insufficient memory. Try making 'M' smaller.")
# Number of implicates that can be safely processed at once in memory
# The 'margin' factor effectively provides working memory for operations on top of primary data storage
#n <- floor(mfree / (rsize + dsize * margin))
n <- floor((mfree + dsize) / (dsize * margin))
n <- min(n, M)
# Report on the memory situation
cat("Detected", signif(mfree / 1e3, 3), "GB of free memory\n")
# The number of steps/chunks can be overridden by negative 'margin' value (for testing)
nstep <- ifelse(margin > 0, ceiling(M / n), min(M, ceiling(-margin)))
nimp <- rep(floor(M / nstep), nstep)
delta <- M - sum(nimp)
if (delta > 0) nimp[1:delta] <- nimp[1:delta] + 1L
stopifnot(M == sum(nimp))
if (nstep > 1) {
cat("Generating", M, "implicates in", nstep, "chunks of size:", nimp," \n")
} else {
cat("Generating", M, ifelse(M == 1, "implicate", "implicates"), "\n")
}
# Replicate the prediction matrix ('dmat'), if necessary
if (nimp[1] > 1) {
dind <- rep(seq.int(N0), nimp[1])
dmat <- dmat[dind, ]
if (storage.mode(dmat) != "double") storage.mode(dmat) <- "double"
rm(dind)
gc(verbose = FALSE)
}
# Report parallel processing to console
if (cores > 1) cat("Using OpenMP multithreading within LightGBM (", cores, " cores)", "\n", sep = "")
# 'chunk.paths' holds the temporary, uncompressed fst file chunks
if (nstep > 1) chunk.paths <- file.path(td, paste0("fused.fst", 1:nstep))
#-----
for (mstep in 1:nstep) {
if (nstep > 1) cat("<< Processing implicate chunk", mstep, "of", nstep, ">>\n")
for (i in 1:length(pfixes)) {
v <- meta$ylist[[i]]
cat("Fusion step ", i, " of ", length(pfixes), ": ", paste(v, collapse = ", "), "\n", sep = "")
block <- length(v) > 1
# Model predictor variables
xv <- meta$xpreds[[i]]
# 'dpred' matrix used for LGB prediction
# If 'xv' is a subset of the available predictors, 'dmat' is subsetted once here to avoid multiple (expensive) subsetting below
dpred <- if (all(suppressWarnings(colnames(dmat) == xv))) {
dmat
} else {
dmat[, xv, drop = FALSE]
}
# Path to lightGBM model(s) to temp directory
mods <- grep(pattern = utils::glob2rx(paste0(pfixes[i], "*.txt")), x = fsn.files$filename, value = TRUE)
# Row indices where to generate predictions
# Modified, if necessary, in next code chunk when single continuous variables has a zero model
zind <- rep(FALSE, nrow(dmat))
#---
# Simulate zero values for case of single continuous variable
if (!block & meta$ytype[v[[1]]] == "continuous") {
m <- grep("z\\.txt$", mods, value = TRUE)
if (length(m)) {
mod <- lightgbm::lgb.load(filename = file.path(td, m))
#p <- predict(object = mod, data = dpred, reshape = TRUE, params = list(num_threads = cores, predict_disable_shape_check = TRUE))
p <- predict(object = mod, newdata = dpred, params = list(num_threads = cores, predict_disable_shape_check = TRUE))
# TEMP CHECK: To confirm that prediction is accurate when using 'predict_disable_shape_check = TRUE', above
# p2 <- predict(object = mod, newdata = dpred, params = list(num_threads = cores))
# stopifnot(all.equal(p, p2))
zind <- p > runif(n = nrow(dmat)) # Row indices where a zero has been simulated (TRUE for a zero)
}
mods <- setdiff(mods, m)
}
#---
# Make predictions for each model ('pred' object)
cat("-- Predicting LightGBM models\n")
pred <- data.table()
for (m in mods) {
y <- sub("_[^_]+$", "", basename(m))
n <- sub(".txt", "", sub(paste0(y, "_"), "", basename(m), fixed = TRUE), fixed = TRUE)
q <- substring(n, 1, 1) == "q"
z <- substring(n, 1, 1) == "z"
mod <- lightgbm::lgb.load(filename = file.path(td, m))
# It is more memory efficient (and often faster) to simply predict for all observations and then subset for 'zind' afterwards
#p <- predict(object = mod, data = dpred, reshape = TRUE, params = list(num_threads = cores, predict_disable_shape_check = TRUE))
p <- predict(object = mod, newdata = dpred, params = list(num_threads = cores, predict_disable_shape_check = TRUE))
if (any(zind)) p <- p[!zind]
# TEMP CHECK: To confirm that prediction is accurate when using 'predict_disable_shape_check = TRUE', above
# p2 <- predict(object = mod, newdata = dpred[!zind, ], params = list(num_threads = cores))
# stopifnot(identical(p, p2))
if (!is.matrix(p)) p <- matrix(p)
set(pred, i = NULL, j = paste0(y, "_", n, if (q | z) NULL else 1:ncol(p)), value = as.data.table(p))
}
rm(p)
#---
# Simulate values for case of single categorical variable
if (!block & meta$ytype[v[[1]]] != "continuous") {
cat("-- Simulating fused values\n")
ptile <- runif(n = nrow(pred))
S <- if (ncol(pred) > 1) {
for (j in 2:ncol(pred)) set(pred, i = NULL, j = j, value = pred[[j - 1]] + pred[[j]])
for (j in 1:ncol(pred)) set(pred, i = NULL, j = j, value = ptile > pred[[j]])
rowSums(pred)
} else {
pred[[1]] > ptile # The binary case
}
}
#---
if (block | meta$ytype[[v[1]]] == "continuous") {
# The following step is only necessary if there is at least one FALSE value in 'zind'
# If all 'zind' are TRUE (rare but possible), then all of the final values will be zero.
# In that case, it is possible to skip simulation of non-zero values assigned to 'S'
S <- NULL
if (any(!zind)) {
# Center and scale the conditional expectations
ncenter <- meta$ycenter[[i]]
nscale <- meta$yscale[[i]]
# Ensure 'pred' and 'ncenter' have consistent column names
setcolorder(pred, names(ncenter))
# Scale the conditional expectation columns
for (j in 1:ncol(pred)) {
if (!is.na(ncenter[j])) {
x <- pred[[j]]
eps <- 0.001
x <- (x - ncenter[j]) / nscale[j]
x <- (x - qnorm(eps)) / (2 * qnorm(1 - eps))
set(pred, i = NULL, j = j, value = x)
}
}
#---
# Extract necessary elements from 'meta'
kcenters <- meta$kcenters[[i]]
kneighbors <- meta$kneighbors[[i]]
stopifnot(all.equal(names(pred), colnames(kcenters)))
# Create vector with "best" number of neighbors (columns) in 'kneighbors'
kbest <- if (block) {
rep(kblock, nrow(kcenters)) # Fixed number of neighbors in block case
} else {
matrixStats::rowCounts(is.na(kneighbors), value = FALSE) # Column index of best 'k' value
}
#---
cat("-- Simulating fused values\n")
simChunk <- function(pchunk) {
# Get the kcenter closest to each observation in 'pred'
nn <- RANN::nn2(data = kcenters, query = pchunk, k = 1)
nn <- nn$nn.idx[, 1]
# Determine how many values we need to draw from each cluster
nt <- setorder(data.table(n = nn)[,.N, by = n], n) # Much faster than table()
sim <- lapply(1:nrow(nt), function(i) {
ki <- nt$n[i]
kj <- 1:kbest[ki]
kn <- kneighbors[ki, kj] # Nearest neighbor row indices in 'ydata'
sample(x = kn, size = nt$N[i], prob = W[kn], replace = TRUE)
})
nnord <- order(nn)
S <- vector(mode = "double", length = length(nn))
S[nnord] <- unlist(sim)
return(S)
}
# KU commented out 4/27/23 to allow simChunk() to be run without parallel chunking to test memory shortfall issue with Berkeley server
# split.vec <- rep(1:cores, each = ceiling(nrow(pred) / cores))[1:nrow(pred)]
# pred <- split(pred, f = split.vec)
# KU commented out 4/27/23 to allow simChunk() to be run without parallel chunking to test memory shortfall issue with Berkeley server
# Simulated indices
#S <- unlist(parallel::mclapply(pred, simChunk, mc.cores = cores))
# KU added 4/27/23 to allow simChunk() to be run without parallel chunking to test memory shortfall issue with Berkeley server
# TO DO: Figure out how to do the simulation step in parallel without blowing up memory
S <- simChunk(pred)
rm(pred)
# Convert simulated index to donor response values
S <- ydata[S, v]
}
# If zeros are simulated separately, integrate them into 'S'
# This is only relevant when fusing a single continuous variable
# If 'S' is NULL, all of the output values are zeros and the set() operation below is skipped
if (any(zind)) {
out <- data.table(rep(0, nrow(dmat)))
setnames(out, v)
if (!is.null(S)) set(out, i = which(!zind), j = v, value = S)
S <- out
rm(out)
}
}
# Add the simulated values to 'dmat' prior to next iteration
dmat[, v] <- as.matrix(S)
rm(S)
}
#---
# Extract the fusion variable simulated values
dtemp <- as.data.table(dmat[, yvars, drop = FALSE])
# Add 'M' column to 'dtemp' identifying the implicates
m <- c(0, cumsum(nimp))[c(mstep + c(0, 1))]
m <- (m[1] + 1):m[2]
set(dtemp, i = NULL, j = "M", value = rep(m, each = N0))
setcolorder(dtemp, "M")
# Assign meta data column names used by 'fsd' file format
#setnames(dtemp, ymeta)
# Update 'dmat' prior to next iteration in 'nstep'
if (mstep < nstep) {
if (nimp[mstep + 1] < nimp[mstep]) dmat <- dmat[-seq.int(N0), ] # If the next chunk step has 1 less implicate, remove 1 implicate worth of rows
dmat[, yvars] <- NA_real_ # Not strictly necessary, since the values are all overwritten, anyway
} else {
rm(dmat) # Remove 'dmat' if the loop is complete
gc(verbose = FALSE)
}
# If necessary, write 'dtemp' to temporary file (uncompressed .fst file)
# This frees up memory for processing (i.e. larger chunk size), since prior results chunks do not need to be kept in memory
if (nstep > 1) {
fst::write_fst(x = dtemp,
path = chunk.paths[mstep],
compress = 0)
rm(dtemp)
gc(verbose = FALSE)
}
}
#-----
# If fsd = NULL, load the fusion results into memory and return data.table
# Otherwise, invisibly return path to .csv file containing results
if (nstep > 1) {
dtemp <- chunk.paths %>%
lapply(fst::read_fst, as.data.table = TRUE) %>%
rbindlist()
}
# Remove temporary directory
unlink(td)
gc(verbose = FALSE)
# Coerce simulated fusion variables to correct class and factor labels
# This is necessary because the raw output in 'dtemp' is integer for factors
yclass <- meta$yclass
ylevels <- meta$ylevels
for (v in names(dtemp)[-1]) {
if ("factor" %in% yclass[[v]]) {
lev <- ylevels[[v]]
ord <- "ordered" %in% yclass[[v]]
set(dtemp, i = NULL, j = v, value = factor(lev[dtemp[[v]] + 1L], levels = lev, ordered = ord))
}
if (yclass[v] == "logical") set(dtemp, i = NULL, j = v, value = as.logical(dtemp[[v]]))
if (yclass[v] == "integer") set(dtemp, i = NULL, j = v, value = as.integer(dtemp[[v]]))
}
# Add any 'retain' variables
for (v in retain) set(dtemp, i = NULL, j = v, value = rep(dretain[[v]], M))
setcolorder(dtemp, c("M", retain, yvars))
rm(dretain)
# Set data.table key for 'M' column and any original data.table keys still present in output
setkeyv(dtemp, cols = intersect(names(dtemp), c('M', dkey)))
# Final result to return or write to disk
out <- if (is.null(fsd)) {
dtemp
} else {
gc(verbose = FALSE)
fst::write_fst(dtemp, path = fsd.path, compress = 90)
cat("Fusion output saved to:\n", fsd.path, "\n")
invisible(fsd.path)
}
# Report processing time
tout <- difftime(Sys.time(), t0)
cat("Total processing time:", signif(as.numeric(tout), 3), attr(tout, "units"), "\n", sep = " ")
# Either return data.table 'out' or invisible path to file on disk
#return(if(is.null(fsd)) out else invisible(fsd.path))
return(out)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.