#' SummaryMeasureGenerator
#'
#' This is a decorator for a general Data.Base object. It retrieves data from
#' it, processes it, and returns a data frame containing the summary measure
#' provided. Currently it only provides the lag of the data, but this could be
#' extended with for example the mean, variance, MSSD of the data to the
#' super learner machine learning model.
#'
#' @docType class
#' @importFrom R6 R6Class
#' @include SMG.Mock.R
#'
#' @section Methods:
#' \describe{
#' \item{\code{initialize(data = NULL, SMG.list, verbose = FALSE, pre_processor = NULL)}}{
#' Initializes a new \code{SummaryMeasureGenerator}. Note that generating a
#' new SMG can also be done using the \code{SMGFactory} class.
#'
#' @param data Data.Base (default = NULL) the data to use as the basis to
#' generate the summary measures from. This should be an instance of a
#' \code{Data.Base} object or \code{NULL}. When \code{NULL}, the actual
#' data should be provided later, prior to actually using this object.
#'
#' @param SMG.list list the list of \code{SummaryMeasureGenerator} objects
#' to use.
#'
#' @param verbose (default = FALSE) the verbosity of the \code{SummaryMeasureGenerator}.
#'
#' @param pre_processor PreProcessor (default = NULL) the
#' \code{PreProcessor} to use for processing the data. This is an optional
#' parameter. If \code{NULL} the data won't be preprocessed / normalized.
#' }
#'
#' \item{\code{reset() }}{
#' Removes all data in the SMG and resets it to a neutral state.
#' }
#'
#' \item{\code{check_enough_data_available(relevantVariables) }}{
#' This function asks each of the provided SMGs how much data they need to
#' generate a new block. This function returns a boolean representing
#' whether or not this condition is met.
#'
#' @param relevantVariables list the relevant variables for which we want to
#' check if enough data is available.
#' }
#'
#' \item{\code{setData(data) }}{
#' The object can be initialized without any data. This function can then
#' be used to inject the data to use.
#'
#' @param data Data.Base the data to set in the instance.
#' }
#'
#' \item{\code{fillCache() }}{
#' Some of the SMG's need a cache of data to be available before they can
#' generate a new observation. This function fills this cache and makes
#' sure enough data is available.
#'
#' @return boolean representing whether the cache was actually updated.
#' }
#'
#' \item{\code{get_latest_covariates(data) }}{
#' For online learning we need to be able to create new data blocks on the
#' fly (as not all data is available beforehand. This function updates a
#' set of variables to contain the current variables as well. This function
#' can be used to get the last set of covariates given a data.table of
#' current blocks
#'
#' @param data data.table the currently available data.
#'
#' @return the updated / latest available data.table.
#' }
#'
#' \item{\code{summarize_data(data, n = 1) }}{
#' Returns the latest \code{n} rows from the processed (the blocks) data.
#'
#' @param data data.table the data currently available for the SMGs (e.g.,
#' the cache of this class).
#'
#' @param n integer (default = 1) the number of blocks to retrieve from the
#' data.
#' }
#'
#' \item{\code{getNext(n = 1) }}{
#' Gets the next \code{n} blocks from a data set. This uses most functions
#' in this class and is probably the most useful function. It summarizes
#' the data, refreshes the cache, and returns the new blocks.
#'
#' @param n integer (default = 1) the number of blocks one wants to retrieve.
#'
#' @return data.table the data.table generated by this function.
#' }
#'
#' \item{\code{set_minimal_measurements_needed(minimal_measurements_needed) }}{
#' Setter for the minimal measurements needed by all SMGs.
#'
#' @param minimal_measurements_needed integer the minimal number of
#' measurements needed. This is the maximum of all maxima of the SMGs.
#' }
#'
#' \item{\code{get_pre_processor}}{
#' Active method. Returns the preprocessor provided on initialization.
#'
#' @return PreProcessor the \code{PreProcessor} instance (or \code{NULL} if
#' none is available).
#' }
#'
#' \item{\code{is_normalized}}{
#' Active method. Returns a boolean representing whether we hae normalized
#' the data.
#'
#' @return boolean representing whether we hae normalized the data.
#' }
#'
#' \item{\code{getCache}}{
#' Active method. Returns the cache of the \code{SummaryMeasureGenerator}.
#'
#' @return data.table the stored cache.
#' }
#'
#' \item{\code{get_data_object}}{
#' Active method. Returns the data of the current instance.
#'
#' @return Data.Base the instance of the data passed in on initialization
#' or using the setter.
#' }
#'
#' \item{\code{get_smg_list}}{
#' Active method. Returns the list of SMG objects that were provided on
#' initialization.
#'
#' @return list the used SMG instances.
#' }
#'
#' \item{\code{get_minimal_measurements_needed}}{
#' Active method. Returns the number of minimal measurements needed for the
#' \code{SummaryMeasureGenerator}s to function properly.
#'
#' @return integer the minimal number of measurements needed.
#' }
#'
#' }
#' @export
SummaryMeasureGenerator <- R6Class("SummaryMeasureGenerator",
public =
list(
initialize = function(data = NULL, SMG.list, verbose = FALSE, pre_processor = NULL) {
self$set_trajectories(data)
private$SMG.list <- SMG.list
private$verbose <- verbose
if (is.null(pre_processor)) {
private$normalized <- FALSE
} else {
private$pre_processor <- Arguments$getInstanceOf(pre_processor, 'PreProcessor')
private$normalized <- TRUE
}
## Determine the minimal number of measurements we need in order to be able to
## support all our SMGs
## TODO: We do the -1 so we can just get new measurements, without caring about the cache getting filled or not.
## This could be made more explicit
minimal_measurements_needed <- max(sapply(SMG.list, function(obj) obj$minimalObservations)) - 1
self$set_minimal_measurements_needed(minimal_measurements_needed = minimal_measurements_needed)
},
reset = function() {
## We need to keep a cache for each of the trajectories
cache <- lapply(private$trajectories, function(x) data.table())
names(cache) <- self$get_trajectory_names
private$cache <- cache
},
check_enough_data_available = function(relevantVariables) {
## Currently we do not support interactions
needed <- self$get_needed_data(relevantVariables)
available <- unlist(lapply(private$SMG.list, function(smg) smg$exposedVariables))
diff <- setdiff(needed, available)
## check if our set is empty, in that case we cover them all
if (length(diff) != 0) {
missing <- paste(diff, collapse = ', ', sep = ', ')
throw('Not all provided variables (', missing, ') are included in the SMGs, include the correct SMGs')
}
TRUE
},
get_needed_data = function(relevantVariables) {
unique(unlist(lapply(relevantVariables, function(rv) rv$getX)))
},
set_trajectories = function(data) {
## We can also work with a single trajectory
if (is(data, 'Data.Static') || is.null(data)) data <- list(data)
## Set some names in the list to make the data sets recognizable afterwards
if (is.null(names(data))) {
traj_names <- lapply(seq_along(data), function(i) paste('traj', i, sep = '_'))
names(data) <- traj_names
}
private$trajectory_names <- names(data)
private$trajectories <- data
## Reset the cache
self$reset()
private$trajectories
},
setData = function(data) {
warning('deprecated function: setData in SummaryMeasureGenerator')
self$set_trajectories(data)
},
## This function will fill the cache with the first N measurements if the cache is empty
fillCache = function() {
private$check_data_available()
## If no history is needed, we don't have to fill the cache
if(self$get_minimal_measurements_needed == 0) return(FALSE)
extra_measurements_needed <- self$get_cache_size - self$get_minimal_measurements_needed
if(extra_measurements_needed < 0) {
private$cache <- private$get_next_normalized(n = abs(extra_measurements_needed))
return(TRUE)
}
FALSE
},
get_latest_covariates = function(data) {
if(nrow(data) != 1){
throw('Not enough data provided to support all summary measures')
}
private$SMG.list[[1]]$update(data)
datas <- lapply(private$SMG.list, function(smg) {
smg$update(copy(data))
})
unlist(datas) %>%
t %>%
as.data.table
},
summarize_data = function(data = NULL, n = 1){
# TODO: Rename data to trajectories
trajectories <- data
## Make sure that we are able to work with more than one trajectory
if(is.null(trajectories)) trajectories <- private$cache
## In case trajectories is (by mistake) still a datatable, embed it in a list here
if (is(trajectories, 'data.table')) trajectories <- list(trajectories)
if(length(trajectories) < length(self$get_trajectory_names)) {
throw('When summarizing data, always summarize for all the trajectories')
}
names(trajectories) <- self$get_trajectory_names
## If Not enough data provided to support all summary measures
## Data here is a list of trajectories. We just select the first one and we assume
## that each of them are the same
if(nrow(trajectories[[1]]) <= self$get_minimal_measurements_needed) return(trajectories)
## Run each of the processors over each of the trajectories
summarized_trajectories <- lapply(trajectories, function(trajectory) {
lapply(private$SMG.list, function(smg) {
result <- smg$process(copy(trajectory))
tail(result, n)
})
})
## Perform an extra check to make sure all SMGs give enough data. If this isn't the case, return null
enough_data <- lapply(summarized_trajectories, function(trajectory) lapply(trajectory, function(out){
if(nrow(out) != n) {
private$verbose && cat(private$verbose, 'The output of all summary measures should be equal to n. The colnames of the failing entry: ', colnames(out))
return(FALSE)
}
return(TRUE)
})) %>% unlist %>% all
if (!enough_data) return(NULL)
names(summarized_trajectories) <- self$get_trajectory_names
lapply(summarized_trajectories, function(trajectory) {
tmp <- Reduce(cbind, trajectory)
tmp[complete.cases(tmp), ]
})
},
getNext = function(n = 1) {
#if(nrow(private$cache) == 0) browser()
private$check_data_available()
## TODO: Make this much more efficient
## TODO: This is an exact copy of the Data.Base function
has_filled_cache <- self$fillCache()
## Remove the first n measurements from the dataframe
if(!has_filled_cache) private$remove_last_measurements_from_cache(n)
## Now, this combined with the cache, should be enough to get the new observations
current <- private$get_next_normalized(n=n)
private$build_cache(current_data = current)
self$summarize_data(private$cache, n = n)
},
set_minimal_measurements_needed = function(minimal_measurements_needed) {
private$minimal_measurements_needed = minimal_measurements_needed
}
),
active =
list(
get_pre_processor = function() {
private$pre_processor
},
is_normalized = function() {
private$normalized
},
getCache = function(){
return(private$cache)
},
# TODO: Document
get_cache_size = function(){
## We assume that each entry in the cache is of the same size
nrow(self$getCache[[1]])
},
get_data_object = function() {
warning('deprecated function: get_data_object in SummaryMeasureGenerator')
return(self$get_trajectories[[1]])
},
get_smg_list = function() {
return(private$SMG.list)
},
get_minimal_measurements_needed = function() {
## This value is equal to the highest lag in the system. That is, if
## we have a lag_2 variable, this variable is 2.
return(private$minimal_measurements_needed)
},
# TODO: Document
get_trajectories = function() {
return(private$trajectories)
},
# TODO: Document
get_trajectory_names = function() {
return(private$trajectory_names)
}
),
private =
list(
trajectories = NULL,
trajectory_names = NULL,
minimal_measurements_needed = NULL,
cache = NULL,
SMG.list = NULL,
verbose = NULL,
normalized = NULL,
pre_processor = NULL,
remove_last_measurements_from_cache = function(n) {
private$cache <- lapply(private$cache, function(cache) tail(cache, -n))
},
build_cache = function(current_data) {
private$cache <- lapply(seq_along(current_data), function(i) {
list(private$cache[[i]], current_data[[i]]) %>% rbindlist
})
names(private$cache) <- self$get_trajectory_names
private$cache
},
get_next_normalized = function(n) {
## Get the next N observations, rely on the data source to get this data efficient
lapply(self$get_trajectories, function(trajectory) {
current <- trajectory$getNextN(n = n)
if (is.null(current)) return(NULL)
if (self$is_normalized) current %<>% self$get_pre_processor$normalize(.)
current
})
},
check_data_available = function() {
trajs <- self$get_trajectories
if(is.null(trajs) || (length(trajs) == 1 && is.null(trajs[[1]]))) {
throw('Please set the data of the summary measure generator first')
}
}
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.