R/train.R

Defines functions train

Documented in train

#' Train a fusion model
#'
#' @description
#' Train a fusion model on "donor" data using sequential \href{https://lightgbm.readthedocs.io/en/latest/}{LightGBM} models to model the conditional distributions. The resulting fusion model (.fsn file) can be used with \code{\link{fuse}} to simulate outcomes for a "recipient" dataset.
#'
#' @param data Data frame. Donor dataset. Categorical variables must be factors and ordered whenever possible.
#' @param y Character or list. Variables in \code{data} to eventually fuse to a recipient dataset. Variables are fused in the order provided. If \code{y} is a list, each entry is a character vector possibly indicating multiple variables to fuse as a block.
#' @param x Character or list. Predictor variables in \code{data} common to donor and eventual recipient. If a list, each slot specifies the \code{x} predictors to use for each \code{y}.
#' @param fsn Character. File path where fusion model will be saved. Must use \code{.fsn} suffix.
#' @param weight Character. Name of the observation weights column in \code{data}. If NULL (default), uniform weights are assumed.
#' @param nfolds Numeric. Number of cross-validation folds used for LightGBM model training. Or, if \code{nfolds < 1}, the fraction of observations to use for training set; remainder used for validation (faster than cross-validation).
#' @param nquantiles Numeric. Number of quantile models to train for continuous \code{y} variables, in addition to the conditional mean. \code{nquantiles} evenly-distributed percentiles are used. For example, the default \code{nquantiles = 2} yields quantile models for the 25th and 75th percentiles. Higher values may produce more accurate conditional distributions at the expense of computation time. Even \code{nquantiles} is recommended since the conditional mean tends to capture the central tendency, making a median model superfluous.
#' @param nclusters Numeric. Maximum number of k-means clusters to use. Higher is better but at computational cost. \code{nclusters = 0} or \code{nclusters = Inf} turn off clustering.
#' @param krange Numeric. Minimum and maximum number of nearest neighbors to use for construction of continuous conditional distributions. Higher \code{max(krange)} is better but at computational cost.
#' @param hyper List. LightGBM hyperparameters to be used during model training. If \code{NULL}, default values are used. See Details and Examples.
#' @param fork Logical. Should parallel processing via forking be used, if possible? See Details.
#' @param cores Integer. Number of physical CPU cores used for parallel computation. When \code{fork = FALSE} or on Windows platform (since forking is not possible), the fusion variables/blocks are processed serially but LightGBM uses \code{cores} for internal multithreading via OpenMP. On a Unix system, if \code{fork = TRUE}, \code{cores > 1}, and \code{cores <= length(y)} then the fusion variables/blocks are processed in parallel via \code{\link[parallel]{mclapply}}.
#'
#' @details When \code{y} is a list, each slot indicates either a single variable or, alternatively, multiple variables to fuse as a block. Variables within a block are sampled jointly from the original donor data during fusion. See Examples.
#' @details `y` variables that exhibit no variance or continuous `y` variables with less than `10 * nfolds` non-zero observations (minimum required for cross-validation) are automatically removed with a warning.
#' @details The fusion model written to \code{fsn} is a zipped archive created by \code{\link[zip]{zip}} containing models and data required by \code{\link{fuse}}.
#' @details The \code{hyper} argument can be used to specify the LightGBM hyperparameter values over which to perform a "grid search" during model training. \href{https://lightgbm.readthedocs.io/en/latest/Parameters.html}{See here} for the full list of parameters. For each combination of hyperparameters, \code{nfolds} cross-validation is performed using \code{\link[lightgbm]{lgb.cv}} with an early stopping condition. The parameter combination with the lowest loss function value is used to fit the final model via \code{\link[lightgbm]{lgb.train}}. The more candidate parameter values specified in \code{hyper}, the longer the processing time. If \code{hyper = NULL}, a single set of parameters is used with the following default values:
#' @details \itemize{
#'   \item boosting = "gbdt"
#'   \item data_sample_strategy = "goss"
#'   \item num_leaves = 31
#'   \item feature_fraction = 0.8
#'   \item max_depth = 5
#'   \item min_data_in_leaf = max(10, round(0.001 * nrow(data)))
#'   \item num_iterations = 2500
#'   \item learning_rate= 0.1
#'   \item max_bin = 255
#'   \item min_data_in_bin = 3
#'   \item max_cat_threshold = 32
#'  }
#'  Typical users will only have reason to modify the hyperparameters listed above. Note that \code{num_iterations} only imposes a ceiling, since early stopping will typically result in models with a lower number of iterations. See Examples.
#' @details Testing with small-to-medium size datasets suggests that forking is typically faster than OpenMP multithreading (the default). However, forking will sometimes "hang" (continue to run with no CPU usage or error message) if an OpenMP process has been previously used in the same session. The issue appears to be related to Intel's OpenMP implementation (\href{https://github.com/Rdatatable/data.table/issues/2418}{see here}). This can be triggered when other operations are called before \code{train()} that use \code{\link[data.table]{data.table}} or \code{\link[fst]{fst}} in multithread mode. If you experience hanged forking, try calling \code{data.table::setDTthreads(1)} and \code{fst::threads_fst(1)} immediately after \code{library(fusionModel)} in a new session.
#'
#' @return A fusion model object (.fsn) is saved to \code{fsn}.
#'
#' @examples
#' # Build a fusion model using RECS microdata
#' # Note that "fusion_model.fsn" will be written to working directory
#' ?recs
#' fusion.vars <- c("electricity", "natural_gas", "aircon")
#' predictor.vars <- names(recs)[2:12]
#' fsn.path <- train(data = recs, y = fusion.vars, x = predictor.vars)
#'
#' # When 'y' is a list, it can specify variables to fuse as a block
#' fusion.vars <- list("electricity", "natural_gas", c("heating_share", "cooling_share", "other_share"))
#' fusion.vars
#' train(data = recs, y = fusion.vars, x = predictor.vars)
#'
#' # When 'x' is a list, it specifies which predictor variables to use for each 'y'
#' xlist <- list(predictor.vars[1:4], predictor.vars[2:8], predictor.vars)
#' xlist
#' train(data = recs, y = fusion.vars, x = xlist)
#'
#' # Specify a single set of LightGBM hyperparameters
#' # Here we use Random Forests instead of the default Gradient Boosting Decision Trees
#' train(data = recs, y = fusion.vars, x = predictor.vars,
#'       hyper = list(boosting = "rf",
#'                    feature_fraction = 0.6,
#'                    max_depth = 10
#'       ))
#'
#' # Specify a range of LightGBM hyperparameters to search over
#' # This takes longer, because there are more models to test
#' train(data = recs, y = fusion.vars, x = predictor.vars,
#'       hyper = list(max_depth = c(5, 10),
#'                    feature_fraction = c(0.7, 0.9)
#'       ))
#' @export

#---------------------

# Manual testing
# library(fusionModel)
# library(tidyverse)
# library(data.table)
# library(dplyr)
# source("R/utils.R")
# source("R/fitLGB.R")

# # Inputs for testing - with some modification for harder test cases
# data <- recs[1:28]
# recipient <- subset(recs, select = c(weight, income:heat_type))
# y = setdiff(names(data), names(recipient))[1:8]
# weight <- "weight"
# x <- setdiff(names(recipient), weight)
# fsn = "fusion_model.fsn"
# nfolds = 0.75
# nquantiles = 3
# nclusters = 2000
# hyper = NULL
# krange = c(10, 300)
# fork = FALSE
# cores = 2

# Try with variable block
#y <- c(list(y[1:2]), y[-c(1:2)])



#---------------------

train <- function(data,
                  y,
                  x,
                  fsn = "fusion_model.fsn",
                  weight = NULL,
                  nfolds = 5,
                  nquantiles = 2,
                  nclusters = 2000,
                  krange = c(10, 500),
                  hyper = NULL,
                  fork = FALSE,
                  cores = 1) {

  t0 <- Sys.time()

  # TO DO: Make data.table operations throughout (just applies to pre-loop checks)
  # TO DO: Make collapse operations where possible
  if (is.data.table(data)) data <- as.data.frame(data)

  # Create correct 'y' and 'ylist', depending on input type
  if (is.list(y)) {
    ylist <- y
    y <- unlist(ylist)
  } else {
    ylist <- as.list(y)
  }

  # Create correct 'x' and 'xlist', depending on input type
  if (is.list(x)) {
    xlist <- x
    x <- unique(unlist(x))
    cat("Using specified set of predictors for each fusion variable\n")
  } else {
    #xlist <- rep(list(x), length(ylist))
    xlist <- lapply(1:length(ylist), function(i) c({if (i == 1) NULL else unlist(ylist[1:(i - 1)])}, x))
    cat("Using all available predictors for each fusion variable\n")
  }
  x <- setdiff(x, y)

  #---

  # Check validity of other inputs
  stopifnot(exprs = {
    is.data.frame(data)
    all(y %in% names(data))
    !any(c("M", "W..", "R..") %in% y)  # Reserved variable names
    all(lengths(ylist) > 0)
    all(x %in% names(data))
    all(lengths(xlist) > 0)
    length(ylist) == length(xlist)
    length(intersect(y, x)) == 0
    is.character(fsn) & endsWith(fsn, ".fsn")
    is.null(weight) | (length(weight) == 1 & weight %in% names(data) & !weight %in% c(y, x))
    nfolds > 0
    nquantiles > 0
    nclusters >= 0
    all(krange >= 5) & length(krange) == 2
    is.null(hyper) | (is.list(hyper) & !anyDuplicated(names(hyper)))
    is.logical(fork)
    cores %in% 1:parallel::detectCores(logical = FALSE)
  })

  if (!any(x %in% xlist[[1]])) stop("There must be at least one 'x' predictor variable assigned to the first 'y' variable")

  #---

  # Calculate the percentiles to use for quantile models
  ptiles <- seq(from = 1 / nquantiles / 2, length.out = nquantiles, by = 2 * 1 / nquantiles / 2)

  # Determine if parallel forking will be used
  # This forces use of OpenMP if there are more cores than fusion steps
  fork <- fork & .Platform$OS.type == "unix" & cores > 1 & length(ylist) > 1 & cores <= length(ylist)

  # Check 'fsn' path and create parent directories, if necessary
  dir <- full.path(dirname(fsn), mustWork = FALSE)
  if (!dir.exists(dir)) dir.create(dir, recursive = TRUE)
  fsn <- file.path(dir, basename(fsn))

  # Temporary directory to save outputs to
  td <- tempfile()
  dir.create(td, showWarnings = FALSE)

  #-----

  # Create and/or check observation weights
  W.lgb <- if (is.null(weight)) {
    rep(1L, nrow(data))
  } else {
    if (anyNA(data[[weight]])) stop("Missing (NA) values are not allowed in 'weight'")
    if (any(data[[weight]] < 0)) cat("Setting negative observation weights to zero\n")
    set(data, j = weight, value = pmax(0, data[[weight]]))
    data[[weight]] / mean(data[[weight]]) # Scaled weights to avoid numerical issues
  }

  # Integerized version of the observation weights (requires less space when saved to .fsn object)
  W.int <- integerize(W.lgb, mincor = 0.999)

  #-----

  # Check data and variable name validity
  if (anyNA(data[y])) stop("Missing (NA) values are not allowed in 'y'")

  # Check that the 'x' and 'y' contain only syntactically valid names
  bad <- setdiff(c(x, y), make.names(c(x, y)))
  if (length(bad)) stop("Fix invalid column names (see ?make.names):\n", paste(bad, collapse = ", "))

  # Check for character-type variables; stop with error if any detected
  xc <- sapply(data[c(x, y)], is.character)
  if (any(xc)) stop("Coerce character variables to factor:\n", paste(names(which(xc)), collapse = ", "))

  # 4/28/25: Removing unse of checkData(); zero-variance allowed to pass through
  # Initial testing suggests LightGBM handles zero-variance cases without error
  # Check 'data' for type consistency and remove no-variance (constant) variables
  # data <- checkData(data, y, x, nfolds = ceiling(nfolds), impute = FALSE)
  # # To account for possible removal of zero-variance 'x' variables
  # x <- intersect(x, names(data))
  # for (i in 1:seq_along(xlist)) xlist[[i]] <- intersect(xlist[[i]], x)
  # # To account for possible removal of zero-variance 'y' variables
  # y <- intersect(y, names(data))
  # for (i in 1:seq_along(ylist)) ylist[[i]] <- intersect(ylist[[i]], y)
  # # Update 'xlist' and 'ylist' to reflect possible removal of cases with only non-varying x or y
  # drop <- which(lengths(ylist) == 0 | lengths(xlist) == 0)
  # ylist <- ylist[-drop]
  # if (length(ylist) == 0) stop("There are no valid 'y' variables remaining with non-zero variance (nothing to predict)!")
  # xlist <- xlist[-drop]
  # if (length(xlist) == 0) stop("There are no valid 'x' variables remaining with non-zero variance (nothing to predict with)!")

  # Determine fusion variable directory prefixes for saving to disk
  pfixes <- formatC(seq_along(ylist), width = nchar(length(ylist)), format = "d", flag = "0")

  #-----

  # Extract data classes and levels for the 'y' variables
  d <- data[y]
  yclass <- lapply(d, class)
  ylevels <- lapply(d[grepl("factor", yclass)], levels)

  # Extract data classes and levels for the 'x' variables
  d <- data[x]
  xclass <- lapply(d, class)
  xlevels <- lapply(d[grepl("factor", xclass)], levels)

  # Determine LightGBM response variable types
  ytype <- ifelse(sapply(data[y], is.factor), "multiclass", "continuous")
  ytype <- ifelse(sapply(data[y], function(x) is.logical(x) | length(levels(x)) == 2), "binary", ytype)

  # Nominal/unordered categorical variables (both x and y variables)
  # Passed to LightGBM so it knows which variables to treat as unordered factors
  nominal <- names(select(data, where(is.factor) & !where(is.ordered)))

  rm(d)

  #-----

  # Print variable information to console
  cat(length(y), "fusion variables\n")
  cat(length(x), "predictor variables\n")
  cat(nrow(data), "observations\n")

  # Report if using different sets of predictor variables across the fusion variables
  # if (all(lengths(xlist) == length(x))) {
  #   cat("Using all available predictors for each fusion variable\n")
  # } else {
  #   cat("Using specified set of predictors for each fusion variable\n")
  # }

  # Limit 'data' to the necessary variables
  data <- data[c(x, y)]

  # Coerce 'data' to sparse numeric matrix for use with LightGBM
  dmat <- to_mat(data)
  rm(data)

  # Write to disk (donor.fst)
  # Save the necessary response/fusion variables and observation weights to disk
  # This information is used be fuse() to select simulated values
  # This should include all fusion variables other than solo categorical (multiclass or logical) variables
  ysave <- y[ytype == "continuous" | y %in% unlist(ylist[lengths(ylist) > 1])]
  dtemp <- as.data.frame(dmat[, ysave, drop = FALSE])
  dtemp$W <- W.int
  fst::write_fst(x = dtemp, path = file.path(td, "donor.fst"), compress = 95)
  rm(dtemp)

  #-----

  # Set the 'hyper.default' object
  if (is.null(hyper)) hyper <- list()

  # Default hyperparameter values, per LightGBM documentation
  # Parameters: https://lightgbm.readthedocs.io/en/latest/Parameters.html
  # Parameter tuning: https://lightgbm.readthedocs.io/en/latest/Parameters-Tuning.html
  # More details: https://sites.google.com/view/lauraepp/parameters
  hyper.default <- list(
    boosting = "gbdt",
    data_sample_strategy = "goss",
    num_leaves = 31,
    feature_fraction = 0.8,
    min_data_in_leaf = max(10, round(0.001 * nrow(dmat))),
    num_iterations = 2500,
    learning_rate = 0.1,
    max_depth = 5,
    max_bin = 255,
    min_data_in_bin = 3,
    max_cat_threshold = 32
  )

  # Use default hyperparameters, if not specified by user
  for (v in names(hyper.default)) {
    if (!v %in% names(hyper)) {
      hyper[[v]] <- hyper.default[[v]]
    }
  }

  # Set the number of LightGBM threads
  # If forking, LightGBM uses single core internally
  hyper$num_threads <- ifelse(fork, 1L, cores)

  # The 'dataset' parameters 'min_data_in_leaf', 'max_bin', 'min_data_in_bin', and 'max_cat_threshold' can only have a single value (they are not eligible to be varied within fitLGB)
  # https://lightgbm.readthedocs.io/en/latest/Parameters.html#dataset-parameters
  for (v in c("min_data_in_leaf", "max_bin", "min_data_in_bin", "max_cat_threshold")) {
    if (length(hyper[[v]]) > 1) {
      hyper[[v]] <- hyper[[v]][1]
      cat("Only one", v, "value allowed. Using:", hyper[[v]], "\n")
    }
  }
  # Note that 'feature_pre_filter' is forced to TRUE (for speed), which means only one 'min_data_in_leaf' value is allowed in 'hyper'
  dparams <- list(max_bin = hyper$max_bin,
                  min_data_in_bin = hyper$min_data_in_bin,
                  max_cat_threshold = hyper$max_cat_threshold,
                  min_data_in_leaf = hyper$min_data_in_leaf,
                  feature_pre_filter = TRUE)
  # Remove 'dparams' hyperparamters from 'hyper' object
  hyper[names(dparams)] <- NULL

  # Create hyperparameter grid to search
  hyper.grid <- hyper %>%
    expand.grid() %>%
    distinct() %>%
    split(seq(nrow(.)))

  #-----

  # Function to build LightGBM prediction model for step 'i' in 'ylist'
  buildFun <- function(i, verbose = FALSE) {

    v <- ylist[[i]]
    block <- length(v) > 1

    # Print message to console
    if (verbose) cat("-- Training step ", i, " of ", length(pfixes), ": ", paste(v, collapse = ", "), "\n", sep = "")

    # 'y' variables from prior clusters to be included as predictors
    yv <- if (i == 1) NULL else unlist(ylist[1:(i - 1)])

    # Full set of predictor variables, including possible 'y' variables from earlier in the sequence
    #xv <- c(xlist[[i]], yv)
    xv <- xlist[[i]]

    path <- file.path(td, pfixes[i])
    dir.create(path)

    # Placeholder objects for necessary outputs
    cd <- yi <- wi <- ycenter <- yscale <- kcenters <- nn <- r2 <- NULL

    #---

    for (y in v) {

      # Response variable type and values
      Y <- dmat[, y]
      type <- ytype[[y]]

      # Placeholder objects
      zc <- mc <- qc <- dtrain <- dvalid <- NULL
      ti <- pi <- rep(TRUE, length(Y))
      hyper.results <- list()

      #-----

      # Build zero model, if necessary
      if (type == "continuous" & inflated(Y)) {

        # Indices to identify which observations to use for subsequent training (ti) and prediction (pi)
        ti <- Y != 0
        if (!block) pi <- ti

        #---

        # Build LightGBM datasets for zero-inflated model

        # List indicating assignment of folds OR vector indicating training observations when nfolds <= 1
        # List indicating random assignment of folds
        cv.folds <- stratify(y = (Y == 0), ycont = FALSE, tfrac = nfolds, cv_list = TRUE)

        # Create full LGB training dataset with all available observations
        dfull <- lightgbm::lgb.Dataset(data = dmat[, xv, drop = FALSE],
                                       label = as.integer(Y == 0),
                                       weight = W.lgb,
                                       categorical_feature = intersect(xv, nominal),
                                       params = dparams) %>%
          lightgbm::lgb.Dataset.construct()

        # Create 'dtrain' and 'dvalid' sets, if requested
        if (is.logical(cv.folds)) {
          ind <- which(cv.folds)
          dtrain <- lightgbm::lgb.Dataset(data = dmat[ind, xv, drop = FALSE],
                                          label = as.integer(Y == 0)[ind],
                                          weight = W.lgb[ind],
                                          categorical_feature = intersect(xv, nominal),
                                          params = dparams) %>%
            lightgbm::lgb.Dataset.construct()

          dvalid <- lightgbm::lgb.Dataset(data = dmat[-ind, xv, drop = FALSE],
                                          label = as.integer(Y == 0)[-ind],
                                          weight = W.lgb[-ind],
                                          categorical_feature = intersect(xv, nominal),
                                          params = dparams,
                                          reference = dtrain) %>%
            lightgbm::lgb.Dataset.construct()
        }

        #---

        # Set the loss function and performance metric used with lightGBM
        params.obj <- list(objective = "binary", metric = "binary_logloss")

        # Fit model
        zmod <- fitLGB(dfull = dfull,
                       dtrain = dtrain,
                       dvalid = dvalid,
                       hyper.grid = hyper.grid,
                       params.obj = params.obj,
                       cv.folds = cv.folds)
        # TEMP
        hyper.results$z <- zmod$record_evals

        # Save LightGBM mean model (m.txt) to disk
        lightgbm::lgb.save(booster = zmod, filename = file.path(path, paste0(y, "_z.txt")))

        # Conditional probability of zero for the non-zero training observations
        # Note that 'zc' will be NULL in case of single continuous variables (zeros are simulated directly by 'zmod')
        if (block) {
          #zc <- matrix(predict(object = zmod, data = dmat[pi, xv], reshape = TRUE))
          zc <- matrix(predict(object = zmod, newdata = dmat[pi, xv]))
          colnames(zc) <- paste0(y, "_z")
        }

        rm(zmod)

      }

      #---

      # Build LightGBM datasets for mean and quantile models

      # List indicating assignment of folds OR vector indicating training observations when nfolds <= 1
      cv.folds <- stratify(y = Y[ti], ycont = (type == "continuous"), tfrac = nfolds, ntiles = 10, cv_list = TRUE)

      # Create full LGB training dataset with all available observations
      dfull <- lightgbm::lgb.Dataset(data = dmat[ti, xv, drop = FALSE],
                                     label = Y[ti],
                                     weight = W.lgb[ti],
                                     categorical_feature = intersect(xv, nominal),
                                     params = dparams) %>%
        lightgbm::lgb.Dataset.construct()

      # Create 'dtrain' and 'dvalid' sets, if requested (only necessary when nfolds < 1)
      if (is.logical(cv.folds)) {
        ind <- which(ti)[cv.folds]
        dtrain <- lightgbm::lgb.Dataset(data = dmat[ind, xv, drop = FALSE],
                                        label = Y[ind],
                                        weight = W.lgb[ind],
                                        categorical_feature = intersect(xv, nominal),
                                        params = dparams) %>%
          lightgbm::lgb.Dataset.construct()

        dvalid <- lightgbm::lgb.Dataset(data = dmat[-ind, xv, drop = FALSE],
                                        label = Y[-ind],
                                        weight = W.lgb[-ind],
                                        categorical_feature = intersect(xv, nominal),
                                        params = dparams,
                                        reference = dtrain) %>%
          lightgbm::lgb.Dataset.construct()
      }

      #---

      # Set the loss function and performance metric used with lightGBM
      # This is here so that the Huber loss 'alpha' is set based on observed MAD
      # See full list of built-in metrics: https://lightgbm.readthedocs.io/en/latest/Parameters.html#metric-parameters
      # And here about Huber: https://stats.stackexchange.com/questions/465937/how-to-choose-delta-parameter-in-huber-loss-function
      params.obj <- switch(type,
                           #continuous = list(objective = "regression", metric = "huber", alpha = 1.35 * mad(Y)),  # Will throw error if mad(Y) is zero
                           continuous = list(objective = "regression", metric = "l2"),
                           multiclass = list(objective = "multiclass", metric = "multi_logloss", num_class = 1L + max(Y)),
                           binary = list(objective = "binary", metric = "binary_logloss"))

      #-----

      # Fit mean response model
      mmod <- fitLGB(dfull = dfull,
                     dtrain = dtrain,
                     dvalid = dvalid,
                     hyper.grid = hyper.grid,
                     params.obj = params.obj,
                     cv.folds = cv.folds)

      # TEMP
      hyper.results$m <- mmod$record_evals

      # Save LightGBM mean model (m.txt) to disk
      # NOTE: mmod$best_iter and mmod$best_score custom attributes are not retained in save
      lightgbm::lgb.save(booster = mmod, filename = file.path(path, paste0(y, "_m.txt")))

      #-----

      # Predict conditional mean/probabilities for the training observations
      if (block | type == "continuous") {
        #mc <- predict(object = mmod, data = dmat[pi, xv, drop = FALSE], reshape = TRUE)
        mc <- predict(object = mmod, newdata = dmat[pi, xv, drop = FALSE])
        if (!is.matrix(mc)) mc <- matrix(mc)
        colnames(mc) <- paste0(y, "_m", 1:ncol(mc))
      }

      rm(mmod)

      #----

      # Fit quantile models
      if (type == "continuous") {

        # Fit quantile models for each value in 'ptiles'
        qmods <- vector(mode = "list", length = length(ptiles))
        names(qmods) <- paste0("q", 1:length(ptiles))

        for (k in seq_along(ptiles)) {
          qmods[[k]] <- fitLGB(dfull = dfull,
                               dtrain = dtrain,
                               dvalid = dvalid,
                               hyper.grid = hyper.grid,
                               params.obj = list(objective = "quantile", metric = "quantile", alpha = ptiles[k]),
                               cv.folds = cv.folds)
        }

        # TEMP
        for (k in seq_along(ptiles)) hyper.results[[names(qmods)[k]]] <- qmods[[k]]$record_evals

        # Predict conditional quantiles for full dataset
        qc <- matrix(data = NA, nrow = sum(pi), ncol = length(ptiles))
        for (k in seq_along(ptiles)) qc[, k] <- predict(object = qmods[[k]], newdata = dmat[pi, xv, drop = FALSE])
        colnames(qc) <- paste0(y, "_", names(qmods))

        # Save LightGBM quantile models (q**.txt) to disk
        for (k in seq_along(ptiles)) lightgbm::lgb.save(booster = qmods[[k]], filename = file.path(path, paste0(colnames(qc)[k], ".txt")))
        rm(qmods)

      }

      #----

      # Construct objects needed for subsequent nearest-neighbor and clustering operations
      # Conditional expectations for each training observation
      # Note that 'zc' only exists if 'y' is part of a block; NULL otherwise
      if (block | type == "continuous") {
        cd <- cbind(cd, data.table(zc, mc, qc)) # Add conditional expectations to retained 'cd' object
        yi <- cbind(yi, Y[pi])  # Observed values associated with 'cd'
        wi <- cbind(wi, W.int[pi]) # Observed weights associated with 'cd'
      }

    } # Done processing 'y'; loop repeats with remaining variables in 'v' (if any)

    #-----

    if (!is.null(cd)) {

      # Center and scale the conditional expectations
      # Scaling is only applied to conditional expectations that are not probabilities
      # The simple check below for values between 0 and 1 is not entirely safe but unlikely to cause problems in practice
      for (j in 1:ncol(cd)) {
        x <- cd[[j]]
        if (all(x >= 0 & x <= 1)) {  # Checks if the column looks to be conditional probabilities (no scaling applied)
          ncenter <- NA
          nscale <- NA
        } else {
          ncenter <- median(x)
          nscale <- mad(x)
          if (nscale == 0) nscale <- sd(x) # Use sd() if mad() returns zero
          if (nscale == 0) nscale <- 1  # If scale is still zero, set to 1 so that normalized values will all be 0.5 (prevents errors)
          eps <- 0.001
          x <- (x - ncenter) / nscale
          x <- (x - qnorm(eps)) / (2 * qnorm(1 - eps))
          set(cd, i = NULL, j = j, value = x)
        }
        ycenter <- c(ycenter, ncenter)
        yscale <- c(yscale, nscale)
      }
      names(ycenter) <- names(yscale) <- names(cd)

      #-----

      # Potentially partition the training observations into 'nclusters' clusters via kmeans
      if (nclusters == 0) nclusters <- Inf
      ncd <- data.table::uniqueN(cd)
      nclusters <- min(nclusters, ncd)

      if (ncd > nclusters) {

        # Distance of each row in 'cd' from the approximate median point
        temp <- as.matrix(cd)
        for (j in 1:ncol(temp)) temp[, j] <- (temp[, j] - median(temp[, j])) ^ 2
        temp <- sqrt(rowSums(temp))

        # Initial cluster centers for k-means
        # This allows cluster selection to be deterministic and initial centers evenly spread through the distribution
        qt <- quantile(temp, probs = seq(from = 0, to = 1, length.out = nclusters), type = 1)
        ind <- unique(match(qt, temp))

        # Perform k-means to identify optimal cluster centers
        km <- stats::kmeans(x = cd, centers = cd[ind, ], iter.max = 30)
        kcenters <- kc <- km$centers

      } else {
        kcenters <- kc <- as.matrix(cd)
      }

      #-----

      # Find the indices of nearest neighbors in

      if (block) {

        # If block = TRUE, then we simply retaining a fixed number (30) of nearest neighbors
        # For each cluster center, find the 30 nearest neighbors in 'd' (approximate match; eps = 0.1)
        nn <- RANN::nn2(data = cd, query = kcenters, k = 30, eps = 0.1)
        nn <- nn$nn.idx

      } else {

        # If block = FALSE, then we constructing conditional expectation neighborhood with up to max(krange) neighbors
        # We are dealing with a single continuous fusion variable in this case

        # For each cluster center, find the max(krange) nearest neighbors in 'd' (approximate match; eps = 0.1)
        nn <- RANN::nn2(data = cd, query = kcenters, k = min(max(krange), nrow(cd)), eps = 0.1)
        nn <- nn$nn.idx

        # Convert the 'kc' cluster centers back to original response units
        # Original units are necessary for computing the objective function below
        for (j in 1:ncol(kc)) kc[, j] <- denormalize(z = kc[, j], center = ycenter[j], scale = yscale[j], eps = 0.001)

        # Create 'm' and 'w' matrices with the nearest-neighbor values and weights
        m <- nn; m[] <- yi[m]  # Neighbor values matrix
        w <- nn; w[] <- wi[w]  # Neighbor weights matrix
        w <- w / mean(w)  # Prevents integer overflow in cumulative sum below

        # Cumulative weighted means
        denom <- matrixStats::rowCumsums(w)
        m1 <- matrixStats::rowCumsums(m * w) / denom

        # Approximate conditional standard deviation, based on conditional quantiles
        sdc <- abs(as.vector(matrixStats::rowDiffs(kc[, c(2, ncol(kc))]))) / diff(qnorm(range(ptiles)))  # Uses the max ptiles as the moment (need to test with larger number of ptiles)
        z <- (m1 - kc[, 1]) / sdc  # Z-score using the assumed SD of the conditional distribution under assumption of normality

        # Mean expectation error
        merr <- 1 - dnorm(z) / dnorm(0)

        # Quantile expectation error
        qerr <- lapply(2:ncol(kc), function(i) {
          tau <- ptiles[i - 1]  # Target percentile
          emax <- ifelse(tau > 0.5, tau, 1 - tau)  # Maximum possible error
          p <- matrixStats::rowCumsums((m <= kc[, i]) * w) / denom
          abs(p - tau) / emax
        })

        # Visualize the maximum error as function of 'tau' (percentile)
        # tau <- seq(0, 1, length.out = 100)
        # plot(tau, ifelse(tau > 0.5, tau, 1 - tau))
        # plot(tau, pmax(tau, 1 - tau))  # Identical

        #---

        # Average error (mean and quantile) across the conditional values
        # Note that this is effectively a weighted mean using the weights applied previous via 'colweight' vector
        err <- merr + Reduce("+", qerr)

        # This is only applied if there are enough columns in 'err'
        if (ncol(err) > 5) {

          # Moving average of the 'err' values
          # This smooths the noise when the number of neighbors is low
          # Lagged, 5-neighbor window
          err <- sapply(5:ncol(err), function(j) matrixStats::rowMeans2(err, cols = (j - 4):j))

          # Set full columns of 'err' to Inf to ensure min(krange) number of selected neighbors is respected
          if (min(krange) > 5) err[, 1:(min(krange) - 5)] <- Inf

          # Offset used in following code
          delta <- 4L

        } else {
          delta <- 0L
        }

        # Each row's minimum error column, prior to enforcing 'tol'
        b0 <- max.col(-err, ties.method = "first")

        # Enforce absolute and relative error tolerance
        # Any error within 'tol' of the minimum is considering identical to the minimum and all are set to zero
        # This reduces the number of neighbor indices that need to be retained without affecting results materially
        kerror <- matrixStats::rowMins(err, na.rm = TRUE)
        err[err <= pmax(kerror, 0.01)] <- 0  # Absolute tolerance
        err[err <= pmax(kerror * 1.05, kerror + 0.005)] <- 0  # Relative tolerance

        # Error-minimizing column index after absolute tolerance fix
        # Can compare to 'b0' to see how 'tol' affects number of neighbors
        kbest <- max.col(-err, ties.method = "first")

        # Final "best k"
        # Add 4 (delta) because the first moving average window (i.e column 1 in 'err') includes the 5 nearest neighbors
        kbest <- kbest + delta

        # Manual check of results
        # r <- 1724  # Row number
        # vj <- m[r, 1:kbest[r]]
        # c(mean(vj), quantile(vj, probs = ptiles))
        # kc[r, ]
        # plot(density(vj, from = 0))
        # abline(v = kc[r,], col = 2)

        #---

        # Set neighbors beyond the error-minimizing k to NA
        # Reduces size on disk when 'nn' matrix is saved
        for (j in 1:ncol(err)) nn[, j + delta] <- replace(nn[, j + delta], kbest < (j + delta), NA)

        # This operation can be used in fuse() to recreate 'kbest' quickly from 'nn'
        # test <- matrixStats::rowCounts(is.na(nn), value = FALSE)
        # all.equal(test, kbest)

        # Drop any columns in 'nn' that are only NA's
        # Not necessary to retain on disk
        keep <- matrixStats::colSums2(nn, na.rm = TRUE) > 0
        if (any(!keep)) nn <- nn[, keep]

        # Convert the index values in 'nn' to "complete data" indices
        # This allows fuse() to lookup response values using the full dataset, rather than worry about non-zero subsets
        # Note that this is unnecessary in blocked case, b/c 'pi' necessarily contains all indices (no zero model)
        nn[] <- which(pi)[nn]

        #---

        # Calculate the R-squared value between each cluster's mean expectation and the mean value of the 'kbest' neighbors
        # These values should be pretty close overall; report result to console
        mb <- matrixStats::rowCollapse(m1, kbest)  # Extract the neighborhood mean associated with 'best' k
        ssr <- sum((kc[, 1] - mb) ^ 2)
        sst <- sum((kc[, 1] - mean(kc[, 1])) ^ 2)
        r2 <- 1 - ssr / sst  # r-squared
        if (verbose) cat("  -- R-squared of cluster means:", round(r2, 3), "\n")
        #plot(kc[, 1], mb); abline(0, 1, col = 2)

        # Report to console info about the preferred number of neighbors across clusters
        if (verbose) {
          cat("  -- Number of neighbors in each cluster:\n")
          print(summary(kbest))
        }

      }

      #-----

      # If one wanted to smooth the neighborhood values...
      # Play with kernel density
      # x <- na.omit(m[2196, ])  # Neighborhood values
      # xw <- na.omit(w[2196, ])
      # ds <- density(x, weights = xw / sum(xw), from = 0, to = 1.2 * max(x))
      # plot(ds)
      # weighted.mean(x, xw)
      # weighted.mean(ds$x, ds$y)
      # quantile(x, probs = ptiles)
      # Use weighted quantile function to see how density matches 'kc'
      # Then could optimize bandwith to minimize quantile error
      # The issue is probably how to estimate it quickly; maybe a manual calculation
      # See here: https://rpubs.com/mcocam12/KDF_byHand


    }

    #---

    # Assemble output list
    out <- list(xpreds = xv,
                ycenter = ycenter,
                yscale = yscale,
                kcenters = kcenters,
                kneighbors = nn,
                cluster_mean_r2 = r2,
                hyper_results = hyper.results)

    return(out)

  }

  #-----

  # Apply buildFun() to each index in 'ylist', using forked parallel processing or serial (depending on 'fork' variable)
  # NOTE: pblapply() has significant overhead, so using straight mclapply for the time being
  if (fork) {
    cat("Processing ", length(pfixes), " training steps in parallel via forking (", cores, " cores)", "\n", sep = "")
    out <- parallel::mclapply(X = 1:length(ylist),
                              FUN = buildFun,
                              mc.cores = cores,
                              mc.preschedule = FALSE,
                              verbose = FALSE)
  } else {
    if (cores > 1) cat("Using OpenMP multithreading within LightGBM (", cores, " cores)", "\n", sep = "")
    out <- lapply(X = 1:length(ylist),
                  FUN = buildFun,
                  verbose = TRUE)
  }

  # Troubleshooting buildFun()
  #for (i in 1:length(ylist)) buildFun(i, verbose = TRUE)

  #-----

  # Assemble metadata
  metadata <- list(
    xclass = xclass,
    xlevels = xlevels,
    yclass = yclass,
    ylevels = ylevels,
    ylist = ylist,
    ytype = ytype,
    ptiles = ptiles,
    dnames = colnames(dmat),
    nobs = nrow(dmat),
    timing = difftime(Sys.time(), t0),
    version = list(fusionModel = utils::packageVersion("fusionModel"), R = getRversion()),
    #call = match.call.defaults()
    call = match.call()
  )

  # Add the metadata information returned by buildFun()
  metadata <- c(metadata, purrr::transpose(out))

  # Save metadata to disk
  saveRDS(metadata, file = file.path(td, "metadata.rds"))

  #-----

  # Save final model object to disk
  # Requires 'zip' package: https://cran.r-project.org/web/packages/zip/index.html
  # Zip up all of the model directories
  zip::zip(zipfile = fsn, files = list.files(td, recursive = TRUE), root = td, mode = "mirror", include_directories = TRUE)  # Zips to the temporary directory
  file.copy(from = list.files(td, "\\.fsn$", full.names = TRUE), to = fsn, overwrite = TRUE)  # Copy .zip/.fsn file to desired location
  cat("Fusion model saved to:\n", fsn, "\n")
  unlink(td)

  # Report processing time
  tout <- difftime(Sys.time(), t0)
  cat("Total processing time:", signif(as.numeric(tout), 3), attr(tout, "units"), "\n", sep = " ")

  # Return .fsn file path invisibly
  invisible(fsn)

}
ummel/fusionModel documentation built on June 1, 2025, 11 p.m.