R/SummaryMeasureGenerator.R

#' SummaryMeasureGenerator
#'
#' This is a decorator for a general Data.Base object. It retrieves data from
#' it, processes it, and returns a data frame containing the summary measure
#' provided. Currently it only provides the lag of the data, but this could be
#' extended with for example the mean, variance, MSSD of the data to the
#' super learner machine learning model.
#'
#' @docType class
#' @importFrom R6 R6Class
#' @include SMG.Mock.R
#'
#' @section Methods: 
#' \describe{  
#'   \item{\code{initialize(data = NULL, SMG.list, verbose = FALSE, pre_processor = NULL)}}{ 
#'     Initializes a new \code{SummaryMeasureGenerator}. Note that generating a
#'     new SMG can also be done using the \code{SMGFactory} class.
#'
#'     @param data Data.Base (default = NULL) the data to use as the basis to
#'     generate the summary measures from. This should be an instance of a
#'     \code{Data.Base} object or \code{NULL}. When \code{NULL}, the actual
#'     data should be provided later, prior to actually using this object.
#'
#'     @param SMG.list list the list of \code{SummaryMeasureGenerator} objects
#'      to use.
#'
#'     @param verbose (default = FALSE) the verbosity of the \code{SummaryMeasureGenerator}.
#'
#'     @param pre_processor PreProcessor (default = NULL) the
#'      \code{PreProcessor} to use for processing the data. This is an optional
#'      parameter. If \code{NULL} the data won't be preprocessed / normalized.
#'   } 
#' 
#'   \item{\code{reset() }}{ 
#'     Removes all data in the SMG and resets it to a neutral state. 
#'   } 
#' 
#'   \item{\code{check_enough_data_available(relevantVariables) }}{ 
#'     This function asks each of the provided SMGs how much data they need to
#'     generate a new block. This function returns a boolean representing
#'     whether or not this condition is met.
#'
#'     @param relevantVariables list the relevant variables for which we want to
#'      check if enough data is available.
#'   } 
#' 
#'   \item{\code{setData(data) }}{ 
#'     The object can be initialized without any data. This function can then
#'     be used to inject the data to use.
#'
#'     @param data Data.Base the data to set in the instance.
#'   } 
#' 
#'   \item{\code{fillCache() }}{ 
#'     Some of the SMG's need a cache of data to be available before they can
#'     generate a new observation. This function fills this cache and makes
#'     sure enough data is available.
#'
#'     @return boolean representing whether the cache was actually updated.
#'   } 
#' 
#'   \item{\code{get_latest_covariates(data) }}{ 
#'     For online learning we need to be able to create new data blocks on the
#'     fly (as not all data is available beforehand. This function updates a
#'     set of variables to contain the current variables as well. This function
#'     can be used to get the last set of covariates given a data.table of
#'     current blocks
#'
#'     @param data data.table the currently available data. 
#'
#'     @return the updated / latest available data.table.
#'   } 
#' 
#'   \item{\code{summarize_data(data, n = 1) }}{ 
#'     Returns the latest \code{n} rows from the processed (the blocks) data. 
#'
#'     @param data data.table the data currently available for the SMGs (e.g.,
#'      the cache of this class).
#'
#'     @param n integer (default = 1) the number of blocks to retrieve from the
#'      data.
#'   } 
#' 
#'   \item{\code{getNext(n = 1) }}{ 
#'     Gets the next \code{n} blocks from a data set. This uses most functions
#'     in this class and is probably the most useful function. It summarizes
#'     the data, refreshes the cache, and returns the new blocks.
#'
#'     @param n integer (default = 1) the number of blocks one wants to retrieve.
#'
#'     @return data.table the data.table generated by this function.
#'   } 
#' 
#'   \item{\code{set_minimal_measurements_needed(minimal_measurements_needed) }}{ 
#'     Setter for the minimal measurements needed by all SMGs.
#'
#'     @param minimal_measurements_needed integer the minimal number of
#'      measurements needed. This is the maximum of all maxima of the SMGs.
#'   } 
#' 
#'   \item{\code{get_pre_processor}}{ 
#'     Active method. Returns the preprocessor provided on initialization.
#'  
#'     @return PreProcessor the \code{PreProcessor} instance (or \code{NULL} if
#'      none is available).
#'   } 
#'
#'   \item{\code{is_normalized}}{ 
#'     Active method. Returns a boolean representing whether we hae normalized
#'     the data.
#'  
#'     @return boolean representing whether we hae normalized the data.
#'   } 
#'
#'   \item{\code{getCache}}{ 
#'     Active method. Returns the cache of the \code{SummaryMeasureGenerator}.
#'  
#'     @return data.table the stored cache.
#'   } 
#'
#'   \item{\code{get_data_object}}{ 
#'     Active method. Returns the data of the current instance.
#'  
#'     @return Data.Base the instance of the data passed in on initialization
#'      or using the setter. 
#'   } 
#'
#'   \item{\code{get_smg_list}}{ 
#'     Active method. Returns the list of SMG objects that were provided on
#'     initialization.
#'  
#'     @return list the used SMG instances.
#'   } 
#'
#'   \item{\code{get_minimal_measurements_needed}}{ 
#'     Active method. Returns the number of minimal measurements needed for the
#'     \code{SummaryMeasureGenerator}s to function properly. 
#'  
#'     @return integer the minimal number of measurements needed.
#'   } 
#' 
#' }  
#' @export
SummaryMeasureGenerator <- R6Class("SummaryMeasureGenerator",
  public =
    list(
      initialize = function(data = NULL, SMG.list, verbose = FALSE, pre_processor = NULL) {

        self$set_trajectories(data)

        private$SMG.list <- SMG.list
        private$verbose <- verbose

        if (is.null(pre_processor)) {
          private$normalized <- FALSE
        } else {
          private$pre_processor <- Arguments$getInstanceOf(pre_processor, 'PreProcessor')
          private$normalized <- TRUE
        }

        ## Determine the minimal number of measurements we need in order to be able to
        ## support all our SMGs
        ## TODO: We do the -1 so we can just get new measurements, without caring about the cache getting filled or not.
        ## This could be made more explicit
        minimal_measurements_needed <- max(sapply(SMG.list, function(obj) obj$minimalObservations)) - 1
        self$set_minimal_measurements_needed(minimal_measurements_needed = minimal_measurements_needed)
      },

      reset = function() {
        ## We need to keep a cache for each of the trajectories
        cache <- lapply(private$trajectories, function(x) data.table())
        names(cache) <- self$get_trajectory_names
        private$cache <- cache
      },

      check_enough_data_available = function(relevantVariables) {
        ## Currently we do not support interactions
        needed <- self$get_needed_data(relevantVariables)
        available <- unlist(lapply(private$SMG.list, function(smg) smg$exposedVariables))
        diff <- setdiff(needed, available)

        ## check if our set is empty, in that case we cover them all
        if (length(diff) != 0) { 
          missing <- paste(diff, collapse = ', ', sep = ', ')
          throw('Not all provided variables (', missing, ') are included in the SMGs, include the correct SMGs')
        }
        TRUE
      },

      get_needed_data = function(relevantVariables) {
        unique(unlist(lapply(relevantVariables, function(rv) rv$getX)))
      },

      set_trajectories = function(data) {
        ## We can also work with a single trajectory
        if (is(data, 'Data.Static') || is.null(data)) data <- list(data)

        ## Set some names in the list to make the data sets recognizable afterwards
        if (is.null(names(data))) {
          traj_names <- lapply(seq_along(data), function(i) paste('traj', i, sep = '_'))
          names(data) <- traj_names
        }
        private$trajectory_names <- names(data)
        private$trajectories <- data

        ## Reset the cache
        self$reset()

        private$trajectories
      },

      setData = function(data) {
        warning('deprecated function: setData in SummaryMeasureGenerator')
        self$set_trajectories(data)
      },

      ## This function will fill the cache with the first N measurements if the cache is empty
      fillCache = function() {
        private$check_data_available()

        ## If no history is needed, we don't have to fill the cache
        if(self$get_minimal_measurements_needed == 0) return(FALSE)

        extra_measurements_needed <- self$get_cache_size - self$get_minimal_measurements_needed
        if(extra_measurements_needed < 0) {
          private$cache <- private$get_next_normalized(n = abs(extra_measurements_needed))
          return(TRUE)
        }
        FALSE
      },

      get_latest_covariates = function(data) {
        if(nrow(data) != 1){
          throw('Not enough data provided to support all summary measures')
        }

        private$SMG.list[[1]]$update(data)

        datas <- lapply(private$SMG.list, function(smg) {
          smg$update(copy(data))
        })

        unlist(datas) %>%
          t %>%
          as.data.table
      },

      summarize_data = function(data = NULL, n = 1){
        # TODO: Rename data to trajectories
        trajectories <- data

        ## Make sure that we are able to work with more than one trajectory
        if(is.null(trajectories)) trajectories <- private$cache

        ## In case trajectories is (by mistake) still a datatable, embed it in a list here
        if (is(trajectories, 'data.table')) trajectories <- list(trajectories)

        if(length(trajectories) < length(self$get_trajectory_names)) {
          throw('When summarizing data, always summarize for all the trajectories')
        }

        names(trajectories) <- self$get_trajectory_names

        ## If Not enough data provided to support all summary measures
        ## Data here is a list of trajectories. We just select the first one and we assume
        ## that each of them are the same
        if(nrow(trajectories[[1]]) <= self$get_minimal_measurements_needed) return(trajectories)

        ## Run each of the processors over each of the trajectories
        summarized_trajectories <- lapply(trajectories, function(trajectory) {
          lapply(private$SMG.list, function(smg) {
            result <- smg$process(copy(trajectory))
            tail(result, n)
          })
        })

        ## Perform an extra check to make sure all SMGs give enough data. If this isn't the case, return null
        enough_data <- lapply(summarized_trajectories, function(trajectory) lapply(trajectory, function(out){
          if(nrow(out) != n) {
            private$verbose && cat(private$verbose, 'The output of all summary measures should be equal to n. The colnames of the failing entry: ', colnames(out)) 
            return(FALSE)
          }
          return(TRUE)
        })) %>% unlist %>% all

        if (!enough_data) return(NULL)

        names(summarized_trajectories) <- self$get_trajectory_names
        lapply(summarized_trajectories, function(trajectory) {
          tmp <- Reduce(cbind, trajectory)
          tmp[complete.cases(tmp), ]
        })
      },

      getNext = function(n = 1) {
        #if(nrow(private$cache) == 0) browser()
        private$check_data_available()

        ## TODO: Make this much more efficient
        ## TODO: This is an exact copy of the Data.Base function
        has_filled_cache <- self$fillCache()

        ## Remove the first n measurements from the dataframe
        if(!has_filled_cache) private$remove_last_measurements_from_cache(n)

        ## Now, this combined with the cache, should be enough to get the new observations
        current <- private$get_next_normalized(n=n)

        private$build_cache(current_data = current)

        self$summarize_data(private$cache, n = n)
      },

      set_minimal_measurements_needed = function(minimal_measurements_needed) {
        private$minimal_measurements_needed = minimal_measurements_needed
      }
    ),
  active =
    list(
      get_pre_processor = function() {
        private$pre_processor
      },

      is_normalized = function() {
        private$normalized
      },

      getCache = function(){
        return(private$cache)
      },

      # TODO: Document
      get_cache_size = function(){
        ## We assume that each entry in the cache is of the same size
        nrow(self$getCache[[1]])
      },

      get_data_object = function() {
        warning('deprecated function: get_data_object in SummaryMeasureGenerator')
        return(self$get_trajectories[[1]])
      },

      get_smg_list = function() {
        return(private$SMG.list)
      },

      get_minimal_measurements_needed = function() {
        ## This value is equal to the highest lag in the system. That is, if
        ## we have a lag_2 variable, this variable is 2.
        return(private$minimal_measurements_needed)
      }, 

      # TODO: Document
      get_trajectories = function() {
        return(private$trajectories)
      },

      # TODO: Document
      get_trajectory_names = function() {
        return(private$trajectory_names)
      }

    ),
  private =
    list(
      trajectories = NULL,
      trajectory_names = NULL,
      minimal_measurements_needed = NULL,
      cache = NULL,
      SMG.list = NULL,
      verbose = NULL,
      normalized = NULL,
      pre_processor = NULL,

      remove_last_measurements_from_cache = function(n) {
        private$cache <- lapply(private$cache, function(cache) tail(cache, -n))
      },

      build_cache = function(current_data) {
        private$cache <- lapply(seq_along(current_data), function(i) { 
          list(private$cache[[i]], current_data[[i]]) %>% rbindlist
        })
        names(private$cache) <- self$get_trajectory_names
        private$cache
      },

      get_next_normalized = function(n) {
        ## Get the next N observations, rely on the data source to get this data efficient
        lapply(self$get_trajectories, function(trajectory) {
          current <- trajectory$getNextN(n = n)
          if (is.null(current)) return(NULL)

          if (self$is_normalized) current %<>% self$get_pre_processor$normalize(.)
          current
        })
      },

      check_data_available = function() {
        trajs <- self$get_trajectories
        if(is.null(trajs) || (length(trajs) == 1 && is.null(trajs[[1]]))) {
          throw('Please set the data of the summary measure generator first')
        }
      }
    )
)
frbl/OnlineSuperLearner documentation built on Feb. 9, 2020, 9:28 p.m.