R/fuse.R

Defines functions fuse

Documented in fuse

#' Fuse variables to a recipient dataset
#'
#' @description
#' Fuse variables to a recipient dataset using a .fsn model produced by \code{\link{train}}. Output can be passed to \code{\link{analyze}} and \code{\link{validate}}.
#'
#' @param data Data frame. Recipient dataset. All categorical variables should be factors and ordered whenever possible. Data types and levels are strictly validated against predictor variables defined in \code{fsn}.
#' @param fsn Character. Path to fusion model file (.fsn) generated by \code{\link{train}}.
#' @param fsd Character. Optional fusion output file to be created ending in \code{.fsd} (i.e. "fused data"). This is a compressed binary file that can be read using the \code{\link[fst]{fst}} package. If \code{fsd = NULL} (the default), the fusion results are returned as a \code{\link[data.table]{data.table}}.
#' @param M Integer. Number of implicates to simulate.
#' @param retain Character. Names of columns in \code{data} that should be retained in the output; i.e. repeated across implicates. Useful for retaining ID or weight variables for use in subsequent analysis of fusion output.
#' @param kblock Integer. Fixed number of nearest neighbors to use when fusing variables in a block. Must be >= 5 and <= 30. Not applicable for variables fused on their own (i.e. no block).
#' @param margin Numeric. Safety margin used when estimating how many implicates can be processed in memory at once. Set higher if \code{fuse()} experiences a memory shortfall. Alternatively, can be set to a negative value to manually specify the number of chunks to use. For example, `margin = -3` splits `M` implicates into three chunks of approximately equal size.
#' @param cores Integer. Number of cores used. LightGBM prediction is parallel-enabled on all systems if OpenMP is available.
#'
#' @details TO UPDATE.
#'
#' @return If \code{fsd = NULL}, a \code{\link[data.table]{data.table}} with number of rows equal to \code{M * nrow(data)}. Integer column "M" indicates implicate assignment of each observation. Note that the ordering of recipient observations is consistent within implicates, so do not change the row order if using with \code{\link{analyze}}.
#' @return If \code{fsd} is specified, the path to .fsd file where results were written. Metadata for column classes and factor levels are stored in the column names. \code{\link{read_fsd}} should be used to load files saved via the \code{fsd} argument.
#'
#' @examples
#' # Build a fusion model using RECS microdata
#' # Note that "fusion_model.fsn" will be written to working directory
#' ?recs
#' fusion.vars <- c("electricity", "natural_gas", "aircon")
#' predictor.vars <- names(recs)[2:12]
#' fsn.path <- train(data = recs, y = fusion.vars, x = predictor.vars)
#'
#' # Generate single implicate of synthetic 'fusion.vars',
#' #  using original RECS data as the recipient
#' recipient <- recs[predictor.vars]
#' sim <- fuse(data = recipient, fsn = fsn.path)
#' head(sim)
#'
#' # Calling fuse() again produces different results
#' sim <- fuse(data = recipient, fsn = fsn.path)
#' head(sim)
#'
#' # Generate multiple implicates
#' sim <- fuse(data = recipient, fsn = fsn.path, M = 5)
#' head(sim)
#' table(sim$M)
#'
#' # Optionally, write results directly to disk
#' # Note that "results.fsd" will be written to working directory
#' sim <- fuse(data = recipient, fsn = fsn.path, M = 5, fsd = "results.fsd")
#' sim <- read_fsd(sim)
#' head(sim)
#'
#' @export

# NOT USED -- maybe reintroduce at some point
# @param max_dist Numeric. Controls the maximum allowable distance when identifying up to \code{k} nearest neighbors. \code{max_dist = 0} (default) means no distance restriction is applied.
# @param idw Logical. Should inverse distance weighting be used when randomly selecting a donor observation from the \code{k} nearest neighbors? If \code{idw = FALSE} (the default), sample weights are used instead.
# @details The \code{max_dist} value is a relative scaling factor. The search radius passed to \code{\link[RANN]{nn2}} is \code{max_dist * medDist}, where \code{medDist} is the median pairwise distance calculated among all of the donor observations. This approach allows \code{max_dist} to adjust the search radius in a consistent way across all fusion variables/blocks.
# @param ignore_self Logical. If \code{TRUE}, the simulation step excludes "self matches" (i.e. row 1 in \code{data} cannot match with row 1 in the original donor. Only useful for validation exercises. Do not use otherwise.
# @param seed Set random seed if needed.
# #' # Test to confirm that 'ignore_self' works as intended
# #' recs$electricity <- jitter(recs$electricity)
# #' train(data = recs, y = c("electricity", "natural_gas"), x = predictor.vars, file = fsn.path)
# #'
# #' sim1 <- fuse(data = recipient, fsn = fsn.path, ignore_self = FALSE, k = 10)
# #' sim2 <- fuse(data = recipient, fsn = fsn.path, ignore_self = TRUE, k = 10)
# #'
# #' sum(sim1 == recs$electricity)
# #' sum(sim2 == recs$electricity)

#---------------------

# Manual testing
# library(fusionModel)
# library(tidyverse)
# library(data.table)
# source("R/utils.R")

# fusion.vars <- c("electricity", "natural_gas", "aircon")
# predictor.vars <- names(recs)[2:12]
# fsn <- train(data = recs, y = fusion.vars, x = predictor.vars)
# data <- recs

# fsn <- "~/Documents/Projects/fusionData/fusion_/RECS/2020/2015/H/output/RECS_2020_2015_H_model.fsn"
# train.data <- fst::read_fst("~/Documents/Projects/fusionData/fusion_/RECS/2020/2015/H/input/RECS_2020_2015_H_train.fst")
# spatial.data <- fst::read_fst("~/Documents/Projects/fusionData/fusion_/RECS/2020/2015/H/input/RECS_2020_2015_H_spatial.fst")
# data <- merge(train.data, spatial.data, all.x = TRUE)
# fsd = "results.fsd"
# M <- 2
# kblock = 10
# cores = 1
# margin = 2
# retain = "weight" # TEST retaining certain variables in output

#---------------------

fuse <- function(data,
                 fsn,
                 fsd = NULL,
                 M = 1,
                 retain = NULL,
                 kblock = 10,
                 margin = 2,
                 cores = 1) {

  t0 <- Sys.time()

  stopifnot(exprs = {
    is.data.frame(data)
    file.exists(fsn) & endsWith(fsn, ".fsn")
    is.null(fsd) | is.character(fsd)
    M > 0 & M %% 1 == 0
    is.null(retain) | all(retain %in% names(data))
    kblock >= 5 & kblock <= 30
    cores > 0 & cores %% 1 == 0 & cores <= parallel::detectCores(logical = FALSE)
    margin != 0
  })

  # Ensure 'fsd' argument is valid (is directory creation necessary?)
  if (!is.null(fsd)) {
    if (!endsWith(fsd, ".fsd")) stop("Argument 'fsd' must have file suffix '.fsd'")
    dir <- full.path(dirname(fsd), mustWork = FALSE)
    if (!dir.exists(dir)) dir.create(dir, recursive = TRUE)
    fsd.path <- file.path(dir, basename(fsd))
  }

  # Convert input data to data.table for efficiency
  data <- as.data.table(data)
  dkey <- key(data)  # Store original data.table key(s); if present, enforced in final output

  # Temporary directory to unzip .fsn contents to and possible save .fst result chunks
  td <- tempfile()
  dir.create(td)

  # Names of files within the .fsn object
  fsn.files <- zip::zip_list(fsn)
  pfixes <- sort(unique(dirname(fsn.files$filename)))
  pfixes <- pfixes[pfixes != "."]

  # Unzip all files in .fsn to temporary directory
  zip::unzip(zipfile = fsn, exdir = td)

  # Load model metadata
  meta <- readRDS(file.path(td, "metadata.rds"))

  # Names, order, and 'blocks' of response variables
  ylist <- meta$ylist
  yvars <- unlist(ylist)

  # Check that necessary predictor variables are present
  xvars <- names(meta$xclass)
  miss <- setdiff(xvars, names(data))
  if (length(miss) > 0) stop("The following predictor variables are missing from 'data':\n", paste(miss, collapse = ", "))

  # Variables in 'data' that are requested to retain in output
  dretain <- data[, ..retain]

  # Restrict 'data' to 'xvars'
  data <- data[, ..xvars]

  # Check for appropriate class/type of predictor variables
  xclass <- meta$xclass
  xtest <- lapply(data, class)
  miss <- !purrr::map2_lgl(xclass, xtest, sameClass)
  if (any(miss)) stop("Incompatible data type for the following predictor variables:\n", paste(names(miss)[miss], collapse = ", "))

  # Check for appropriate levels of factor predictor variables
  xlevels <- meta$xlevels
  xtest <- lapply(subset(data, select = names(xlevels)), levels)
  miss <- !purrr::map2_lgl(xlevels, xtest, identical)
  if (any(miss)) stop("Incompatible levels for the following predictor variables\n", paste(names(miss)[miss], collapse = ", "))

  #-----

  # Print variable information to console
  cat(length(yvars), "fusion variables\n")
  cat(length(xvars), "initial predictor variables\n")
  cat(nrow(data), "observations\n")

  # TO DO possibly: If there are NOT NA's in the training but NA's in 'data', might want to simply impute the NA's
  # NOT NEEDED?
  # # Detect and impute any missing values in 'data'
  # na.cols <- names(which(sapply(data, anyNA)))
  # if (length(na.cols) > 0) {
  #   cat("Missing values imputed for the following variable(s):\n", paste(na.cols, collapse = ", "), "\n")
  #   for (j in na.cols) {
  #     x <- data[[j]]
  #     ind <- is.na(x)
  #     data[ind, j] <-  imputationValue(x, ind)
  #   }
  # }

  #-----

  # Load the original numeric donor response data from which simulated values are drawn
  # This should include all fusion variables other than solo categorical (multiclass or logical) variables
  ydata <- fst::fst(path = file.path(td, "donor.fst"))
  W <- ydata$W

  #-----

  # Allocate columns in 'data' for the fusion/response variables
  N0 <- nrow(data)
  rmat <- matrix(data = NA_real_, nrow = N0, ncol = length(yvars), dimnames = list(NULL, yvars))
  rsize <- objectSize(rmat)
  data <- cbind(data, rmat)
  setcolorder(data, meta$dnames)  # This ensures that 'dmat' has columns in correct/original order for purposes of LightGBM prediction
  rm(rmat)

  # Coerce 'data' to matrix compatible with input to LightGBM
  # Ensure that 'dmat' is double precision; LightGBM enforces this internally
  dmat <- to_mat(data)
  storage.mode(dmat) <- "double"
  #if (storage.mode(dmat) != "double") storage.mode(dmat) <- "double"
  dsize <- objectSize(dmat)
  rm(data)

  # Determine how may implicates to process in each chunk, given available memory
  # rsize * M: the maximum size of the results data.table that will be created, if loaded into memory
  # dsize: initial size of the prediction data (including fusion variable placeholder columns) prior to expansion
  # Resulting vector 'nimp' gives the integer number of implicates to process in each of 'nstep' chunks
  # The implicates are spread as evenly as possible across chunks
  mfree <- freeMemory()
  if (rsize * M > mfree) stop("Insufficient memory. Try making 'M' smaller.")

  # Number of implicates that can be safely processed at once in memory
  # The 'margin' factor effectively provides working memory for operations on top of primary data storage
  #n <- floor(mfree / (rsize + dsize * margin))
  n <- floor((mfree + dsize) / (dsize * margin))
  n <- min(n, M)

  # Report on the memory situation
  cat("Detected", signif(mfree / 1e3, 3), "GB of free memory\n")

  # The number of steps/chunks can be overridden by negative 'margin' value (for testing)
  nstep <- ifelse(margin > 0, ceiling(M / n), min(M, ceiling(-margin)))
  nimp <- rep(floor(M / nstep), nstep)
  delta <- M - sum(nimp)
  if (delta > 0) nimp[1:delta] <- nimp[1:delta] + 1L
  stopifnot(M == sum(nimp))
  if (nstep > 1) {
    cat("Generating", M, "implicates in", nstep, "chunks of size:", nimp," \n")
  } else {
    cat("Generating", M, ifelse(M == 1, "implicate", "implicates"), "\n")
  }

  # Replicate the prediction matrix ('dmat'), if necessary
  if (nimp[1] > 1) {
    dind <- rep(seq.int(N0), nimp[1])
    dmat <- dmat[dind, ]
    if (storage.mode(dmat) != "double") storage.mode(dmat) <- "double"
    rm(dind)
    gc(verbose = FALSE)
  }

  # Report parallel processing to console
  if (cores > 1) cat("Using OpenMP multithreading within LightGBM (", cores, " cores)", "\n", sep = "")

  # 'chunk.paths' holds the temporary, uncompressed fst file chunks
  if (nstep > 1) chunk.paths <- file.path(td, paste0("fused.fst", 1:nstep))

  #-----

  for (mstep in 1:nstep) {

    if (nstep > 1) cat("<< Processing implicate chunk", mstep, "of", nstep, ">>\n")

    for (i in 1:length(pfixes)) {

      v <- meta$ylist[[i]]

      cat("Fusion step ", i, " of ", length(pfixes), ": ", paste(v, collapse = ", "), "\n", sep = "")

      block <- length(v) > 1

      # Model predictor variables
      xv <- meta$xpreds[[i]]

      # 'dpred' matrix used for LGB prediction
      # If 'xv' is a subset of the available predictors, 'dmat' is subsetted once here to avoid multiple (expensive) subsetting below
      dpred <- if (all(suppressWarnings(colnames(dmat) == xv))) {
        dmat
      } else {
        dmat[, xv, drop = FALSE]
      }

      # Path to lightGBM model(s) to temp directory
      mods <- grep(pattern = utils::glob2rx(paste0(pfixes[i], "*.txt")), x = fsn.files$filename, value = TRUE)

      # Row indices where to generate predictions
      # Modified, if necessary, in next code chunk when single continuous variables has a zero model
      zind <- rep(FALSE, nrow(dmat))

      #---

      # Simulate zero values for case of single continuous variable
      if (!block & meta$ytype[v[[1]]] == "continuous") {
        m <- grep("z\\.txt$", mods, value = TRUE)
        if (length(m)) {
          mod <- lightgbm::lgb.load(filename = file.path(td, m))
          #p <- predict(object = mod, data = dpred, reshape = TRUE, params = list(num_threads = cores, predict_disable_shape_check = TRUE))
          p <- predict(object = mod, newdata = dpred, params = list(num_threads = cores, predict_disable_shape_check = TRUE))

          # TEMP CHECK: To confirm that prediction is accurate when using 'predict_disable_shape_check = TRUE', above
          # p2 <- predict(object = mod, newdata = dpred, params = list(num_threads = cores))
          # stopifnot(all.equal(p, p2))

          zind <- p > runif(n = nrow(dmat)) # Row indices where a zero has been simulated (TRUE for a zero)
        }
        mods <- setdiff(mods, m)
      }

      #---

      # Make predictions for each model ('pred' object)
      cat("-- Predicting LightGBM models\n")
      pred <- data.table()
      for (m in mods) {
        y <- sub("_[^_]+$", "", basename(m))
        n <- sub(".txt", "", sub(paste0(y, "_"), "", basename(m), fixed = TRUE), fixed = TRUE)
        q <- substring(n, 1, 1) == "q"
        z <- substring(n, 1, 1) == "z"
        mod <- lightgbm::lgb.load(filename = file.path(td, m))

        # It is more memory efficient (and often faster) to simply predict for all observations and then subset for 'zind' afterwards
        #p <- predict(object = mod, data = dpred, reshape = TRUE, params = list(num_threads = cores, predict_disable_shape_check = TRUE))
        p <- predict(object = mod, newdata = dpred, params = list(num_threads = cores, predict_disable_shape_check = TRUE))
        if (any(zind)) p <- p[!zind]

        # TEMP CHECK: To confirm that prediction is accurate when using 'predict_disable_shape_check = TRUE', above
        # p2 <- predict(object = mod, newdata = dpred[!zind, ], params = list(num_threads = cores))
        # stopifnot(identical(p, p2))

        if (!is.matrix(p)) p <- matrix(p)
        set(pred, i = NULL, j = paste0(y, "_", n, if (q | z) NULL else 1:ncol(p)), value = as.data.table(p))
      }

      rm(p)

      #---

      # Simulate values for case of single categorical variable
      if (!block & meta$ytype[v[[1]]] != "continuous") {
        cat("-- Simulating fused values\n")
        ptile <- runif(n = nrow(pred))
        S <- if (ncol(pred) > 1) {
          for (j in 2:ncol(pred)) set(pred, i = NULL, j = j, value = pred[[j - 1]] + pred[[j]])
          for (j in 1:ncol(pred)) set(pred, i = NULL, j = j, value = ptile > pred[[j]])
          rowSums(pred)
        } else {
          pred[[1]] > ptile  # The binary case
        }
      }

      #---

      if (block | meta$ytype[[v[1]]] == "continuous") {

        # The following step is only necessary if there is at least one FALSE value in 'zind'
        # If all 'zind' are TRUE (rare but possible), then all of the final values will be zero.
        # In that case, it is possible to skip simulation of non-zero values assigned to 'S'
        S <- NULL
        if (any(!zind)) {

          # Center and scale the conditional expectations
          ncenter <- meta$ycenter[[i]]
          nscale <- meta$yscale[[i]]

          # Ensure 'pred' and 'ncenter' have consistent column names
          setcolorder(pred, names(ncenter))

          # Scale the conditional expectation columns
          for (j in 1:ncol(pred)) {
            if (!is.na(ncenter[j])) {
              x <- pred[[j]]
              eps <- 0.001
              x <- (x - ncenter[j]) / nscale[j]
              x <- (x - qnorm(eps)) / (2 * qnorm(1 - eps))
              set(pred, i = NULL, j = j, value = x)
            }
          }

          #---

          # Extract necessary elements from 'meta'
          kcenters <- meta$kcenters[[i]]
          kneighbors <- meta$kneighbors[[i]]
          stopifnot(all.equal(names(pred), colnames(kcenters)))

          # Create vector with "best" number of neighbors (columns) in 'kneighbors'
          kbest <- if (block) {
            rep(kblock, nrow(kcenters))  # Fixed number of neighbors in block case
          } else {
            matrixStats::rowCounts(is.na(kneighbors), value = FALSE)  # Column index of best 'k' value
          }

          #---

          cat("-- Simulating fused values\n")

          simChunk <- function(pchunk) {

            # Get the kcenter closest to each observation in 'pred'
            nn <- RANN::nn2(data = kcenters, query = pchunk, k = 1)
            nn <- nn$nn.idx[, 1]

            # Determine how many values we need to draw from each cluster
            nt <- setorder(data.table(n = nn)[,.N, by = n], n)  # Much faster than table()

            sim <- lapply(1:nrow(nt), function(i) {
              ki <- nt$n[i]
              kj <- 1:kbest[ki]
              kn <- kneighbors[ki, kj]  # Nearest neighbor row indices in 'ydata'
              sample(x = kn, size = nt$N[i], prob = W[kn], replace = TRUE)
            })

            nnord <- order(nn)
            S <- vector(mode = "double", length = length(nn))
            S[nnord] <- unlist(sim)

            return(S)

          }

          # KU commented out 4/27/23 to allow simChunk() to be run without parallel chunking to test memory shortfall issue with Berkeley server
          # split.vec <- rep(1:cores, each = ceiling(nrow(pred) / cores))[1:nrow(pred)]
          # pred <- split(pred, f = split.vec)

          # KU commented out 4/27/23 to allow simChunk() to be run without parallel chunking to test memory shortfall issue with Berkeley server
          # Simulated indices
          #S <- unlist(parallel::mclapply(pred, simChunk, mc.cores = cores))

          # KU added 4/27/23 to allow simChunk() to be run without parallel chunking to test memory shortfall issue with Berkeley server
          # TO DO: Figure out how to do the simulation step in parallel without blowing up memory
          S <- simChunk(pred)
          rm(pred)

          # Convert simulated index to donor response values
          S <- ydata[S, v]

        }

        # If zeros are simulated separately, integrate them into 'S'
        # This is only relevant when fusing a single continuous variable
        # If 'S' is NULL, all of the output values are zeros and the set() operation below is skipped
        if (any(zind)) {
          out <- data.table(rep(0, nrow(dmat)))
          setnames(out, v)
          if (!is.null(S)) set(out, i = which(!zind), j = v, value = S)
          S <- out
          rm(out)
        }

      }

      # Add the simulated values to 'dmat' prior to next iteration
      dmat[, v] <- as.matrix(S)
      rm(S)

    }

    #---

    # Extract the fusion variable simulated values
    dtemp <- as.data.table(dmat[, yvars, drop = FALSE])

    # Add 'M' column to 'dtemp' identifying the implicates
    m <- c(0, cumsum(nimp))[c(mstep + c(0, 1))]
    m <- (m[1] + 1):m[2]
    set(dtemp, i = NULL, j = "M", value = rep(m, each = N0))
    setcolorder(dtemp, "M")

    # Assign meta data column names used by 'fsd' file format
    #setnames(dtemp, ymeta)

    # Update 'dmat' prior to next iteration in 'nstep'
    if (mstep < nstep) {
      if (nimp[mstep + 1] < nimp[mstep]) dmat <- dmat[-seq.int(N0), ]  # If the next chunk step has 1 less implicate, remove 1 implicate worth of rows
      dmat[, yvars] <- NA_real_  # Not strictly necessary, since the values are all overwritten, anyway
    } else {
      rm(dmat)  # Remove 'dmat' if the loop is complete
      gc(verbose = FALSE)
    }

    # If necessary, write 'dtemp' to temporary file (uncompressed .fst file)
    # This frees up memory for processing (i.e. larger chunk size), since prior results chunks do not need to be kept in memory
    if (nstep > 1) {
      fst::write_fst(x = dtemp,
                     path = chunk.paths[mstep],
                     compress = 0)
      rm(dtemp)
      gc(verbose = FALSE)
    }

  }

  #-----

  # If fsd = NULL, load the fusion results into memory and return data.table
  # Otherwise, invisibly return path to .csv file containing results

  if (nstep > 1) {
    dtemp <- chunk.paths %>%
      lapply(fst::read_fst, as.data.table = TRUE) %>%
      rbindlist()
  }

  # Remove temporary directory
  unlink(td)
  gc(verbose = FALSE)

  # Coerce simulated fusion variables to correct class and factor labels
  # This is necessary because the raw output in 'dtemp' is integer for factors
  yclass <- meta$yclass
  ylevels <- meta$ylevels
  for (v in names(dtemp)[-1]) {
    if ("factor" %in% yclass[[v]]) {
      lev <- ylevels[[v]]
      ord <- "ordered" %in% yclass[[v]]
      set(dtemp, i = NULL, j = v, value = factor(lev[dtemp[[v]] + 1L], levels = lev, ordered = ord))
    }
    if (yclass[v] == "logical") set(dtemp, i = NULL, j = v, value = as.logical(dtemp[[v]]))
    if (yclass[v] == "integer") set(dtemp, i = NULL, j = v, value = as.integer(dtemp[[v]]))
  }

  # Add any 'retain' variables
  for (v in retain) set(dtemp, i = NULL, j = v, value = rep(dretain[[v]], M))
  setcolorder(dtemp, c("M", retain, yvars))
  rm(dretain)

  # Set data.table key for 'M' column and any original data.table keys still present in output
  setkeyv(dtemp, cols = intersect(names(dtemp), c('M', dkey)))

  # Final result to return or write to disk
  out <- if (is.null(fsd)) {
    dtemp
  } else {
    gc(verbose = FALSE)
    fst::write_fst(dtemp, path = fsd.path, compress = 90)
    cat("Fusion output saved to:\n", fsd.path, "\n")
    invisible(fsd.path)
  }

  # Report processing time
  tout <- difftime(Sys.time(), t0)
  cat("Total processing time:", signif(as.numeric(tout), 3), attr(tout, "units"), "\n", sep = " ")

  # Either return data.table 'out' or invisible path to file on disk
  #return(if(is.null(fsd)) out else invisible(fsd.path))
  return(out)

}
ummel/fusionModel documentation built on June 1, 2025, 11 p.m.