R/xtractor.R

#' R6 Object for Feature Extraction.
#'
#' @description
#' \code{Xtractor} calculates features from raw data for each ID of a grouping variable individually. This process can be parallelized with the package future.
#'
#' @format \code{\link{R6Class}} object.
#' @name Xtractor
#'
#' @section Usage:
#' \preformatted{
#' xtractor = Xtractor$new("xtractor")
#' }
#'
#' @section Arguments:
#'
#' For Xtractor$new():
#' \describe{
#' \item{\code{name}: }{(`character(1)`): A user defined name of the Xtractor. All necessary data will be saved on the path: ./fxtract_files/name/}
#' \item{\code{load}: }{(`logical(1)`): If TRUE, an existing Xtractor will be loaded.}
#' \item{\code{file.dir}: }{(`character(1)`): Path where all files of the Xtractor are saved. Default is the current working directory.}
#' }
#' @section Details:
#' All datasets and feature functions are saved in this R6 object.
#' Datasets will be saved as single RDS files (for each ID) and feature functions are calculated on each single dataset.
#' A big advantage of this method is that it scales nicely for larger datasets. Data is only read into RAM, when needed.
#'
#' @section Fields:
#' \describe{
#' \item{\code{error_messages}: }{(`data.frame()`): Active binding. A dataframe with information about error messages.}
#' \item{\code{ids}: }{(`character()`): Active binding. A character vector with the IDs of the grouping variable.}
#' \item{\code{features}: }{(`character()`): Active binding. A character vector with the feature functions which were added.}
#' \item{\code{status}: }{(`data.frame()`): Active binding. A dataframe with an overview over which features are calculated on which datasets.}
#' \item{\code{results}: }{(`data.frame()`): Active binding. A dataframe with all calculated features of all IDs.}
#' }
#'
#' @section Methods:
#' \describe{
#' \item{\code{add_data(data, group_by)}}{[data: (`data.frame` | `data.table`)] A dataframe or data.table which shall be added to the R6 object. \cr
#'  [group_by: (`character(1)`)] The grouping variable's name of the dataframe. \cr \cr
#'  This method writes single RDS files for each group.}
#' \item{\code{preprocess_data(fun)}}{[fun: (`function`)] A function, which has a dataframe as input and a dataframe as output. \cr \cr
#'  This method loads the RDS files and applies this function on them. The old RDS files are overwritten.}
#' \item{\code{remove_data(ids)}}{[ids: (`character()`)] One or many IDs of the grouping variable. \cr \cr
#'  This method deletes the RDS files of the given IDs.}
#' \item{\code{get_data(ids)}}{[ids: (`character()`)] One or many IDs of the grouping variable. \cr \cr
#'  This method returns one dataframe with the chosen IDs.}
#' \item{\code{add_feature(fun, check_fun)}}{[fun: (`function`)] A function, which has a dataframe as input and a named vector or list as output. \cr
#'  [check_fun: (`logical(1)`)] The function will be checked if it returns a vector or a  list. Defaults to \code{TRUE}. Disable, if calculation takes too long. \cr \cr
#'  This method adds the feature function to the R6 object. It writes an RDS file of the function which can be retrieved later.}
#' \item{\code{remove_feature(fun)}}{[fun: (`function | character(1)`)] A function (or the name of the function as character) which shall be removed. \cr \cr
#'  This method removes the function from the object and deletes all corresponding files and results.}
#' \item{\code{get_feature(fun)}}{[fun: (`character(1)`)] The name of a function as character. \cr \cr
#'  This method reads the RDS file of the function. Useful for debugging after loading an Xtractor.}
#' \item{\code{calc_features(features, ids)}}{[features: (`character()`)] A character vector of the names of the features which shall be calculated. Defaults to all features. \cr
#' [ids: (`character()`)] One or many IDs of the grouping variable. Defaults to all IDs. \cr \cr
#' This method calculates all features on the chosen IDs.}
#' \item{\code{retry_failed_features(features)}}{[features: (`character()`)] A character vector of the names of the features which shall be calculated. Defaults to all features. \cr \cr
#' This method retries calculation of failed features. Useful if calculation failed because of memory problems.}
#' \item{\code{plot()}}{[internal] method to print the R6 object.}
#' \item{\code{clone()}}{[internal] method to clone the R6 object.}
#' \item{\code{initialize()}}{[internal] method to initialize the R6 object.}
#' }
#'
#' @examples
#' # one feature function
#' dir = tempdir()
#' xtractor = Xtractor$new("xtractor", file.dir = dir)
#' xtractor$add_data(iris, group_by = "Species")
#' xtractor$ids
#' fun = function(data) {
#'   c(mean_sepal_length = mean(data$Sepal.Length))
#' }
#' xtractor$add_feature(fun)
#' xtractor$features
#' xtractor$calc_features()
#' xtractor$results
#' xtractor$status
#' xtractor
#'
#' # failing function on only one ID
#' fun2 = function(data) {
#'   if ("setosa" %in% data$Species) stop("my error")
#'   c(sd_sepal_length = sd(data$Sepal.Length))
#' }
#' xtractor$add_feature(fun2)
#' xtractor$calc_features()
#' xtractor$results
#' xtractor$error_messages
#' xtractor
#'
#' # remove feature function
#' xtractor$remove_feature("fun2")
#' xtractor$results
#' xtractor
#'
#' # remove ID
#' xtractor$remove_data("setosa")
#' xtractor$results
#' xtractor$ids
#' xtractor
#'
#' # get datasets and functions
#' fun3 = xtractor$get_feature("fun")
#' df = xtractor$get_data()
#' dplyr_wrapper(data = df, group_by = "Species", fun = fun3)
#'
#' @import R6
#' @import dplyr
#' @import future.apply
#' @importFrom data.table setDF rbindlist
NULL

#' @export
Xtractor = R6Class("Xtractor",
  public = list(
    initialize = function(name, file.dir = ".", load = FALSE) {
      private$name = checkmate::assert_character(name)
      newDirPath = file.path(file.dir, "fxtract_files", name)
      private$dir = newDirPath
      if (!load) {
        if (!dir.exists(file.path(file.dir, "fxtract_files"))) dir.create(file.path(file.dir, "fxtract_files"))
        if (dir.exists(newDirPath)) stop("The Xtractor name already exists. Please choose another name, delete the existing Xtractor, or set load = TRUE, if you want to load the old Xtractor.")
        dir.create(newDirPath)
        dir.create(file.path(newDirPath, "rds_files"))
        dir.create(file.path(newDirPath, "rds_files", "data"))
        dir.create(file.path(newDirPath, "rds_files", "features"))
        dir.create(file.path(newDirPath, "rds_files", "results"))
        dir.create(file.path(newDirPath, "rds_files", "results", "done"))
        dir.create(file.path(newDirPath, "rds_files", "results", "failed"))
        saveRDS(NULL, file = file.path(private$dir, "rds_files", "group_by.RDS"))
      } else {
        checkmate::assert_subset(name, list.files(file.path(file.dir, "fxtract_files")))
        private$group_by = readRDS(file.path(newDirPath, "rds_files", "group_by.RDS"))
      }
    },
    print = function() {
      ids = self$ids
      feats = self$features

      cat("R6 Object: Xtractor\n")
      cat(paste0("Name: ", private$name, "\n"))
      cat(paste0("Grouping variable: ", private$group_by, "\n"))
      if (length(ids) <= 10) {
        cat(paste0("IDs: ", paste0(ids, collapse = ", "), "\n"))
      } else {
        cat(paste0("Number IDs: ", length(ids), ". See $ids for all ids.\n"))
      }
      if (length(feats) <= 10) {
        cat(paste0("Feature functions: ", paste0(feats, collapse = ", "), "\n"))
      } else {
        cat(paste0("Number feature functions: ", length(feats), ". See $features for all feature functions.\n"))
      }
      if (ncol(self$status[, -1, drop = FALSE]) >= 1) cat(paste0("Extraction done: ", mean(as.matrix(self$status[, -1]) == "done") * 100, "%\n"))
      cat(paste0("Errors during calculation: ", nrow(self$error_messages), " \n"))
      invisible(self)
    },
    add_data = function(data, group_by) {
      checkmate::assert_subset(group_by, colnames(data))
      checkmate::assert(
        checkmate::checkClass(data, "data.frame"),
        checkmate::checkClass(data, "data.table")
      )
      checkmate::assert_subset(group_by, colnames(data))
      checkmate::assert_character(group_by, len = 1)
      if (is.null(private$group_by)) {
        private$group_by = group_by
        saveRDS(group_by, file = file.path(private$dir, "rds_files", "group_by.RDS"))
      }
      if (group_by != private$group_by) stop(paste0("The group_by variable was set to ", private$group_by,
        ". Only one group_by variable is allowed per Xtractor!"))
      gb = data %>% dplyr::distinct_(.dots = group_by) %>% data.frame() %>% unlist()
      if (any(gb %in% self$ids)) stop(paste0("Adding data multiple times is not allowed! Following ID(s) are already added to the R6 object: ",
        paste0(gb[which(gb %in% self$ids)], collapse = ", ")))

      #save rds files
      message("Saving raw RDS files.")
      pb = utils::txtProgressBar(min = 0, max = length(gb), style = 3)
      for (i in seq_along(gb)) {
        data_i = data %>% dplyr::filter(!!as.name(group_by) == gb[i]) %>% data.frame()
        saveRDS(data_i, file = file.path(private$dir, "rds_files", "data", paste0(gb[i], ".RDS")))
        utils::setTxtProgressBar(pb, i)
      }
      close(pb)
      return(invisible(self))
    },
    preprocess_data = function(fun) {
      message("Updating raw RDS files.")
      future.apply::future_lapply(self$ids, function(i) {
        data_i = readRDS(file.path(private$dir, "rds_files", "data", paste0(i, ".RDS")))
        data_preproc = fun(data_i)
        saveRDS(data_preproc, file = file.path(private$dir, "rds_files", "data", paste0(i, ".RDS")))
      }, future.seed = TRUE)
      return(invisible(self))
    },
    remove_data = function(ids) {
      checkmate::assert_character(ids, min.len = 1L)
      checkmate::assert_subset(ids, self$ids)
      for (id in ids) {
        message("Deleting RDS file ", id, ".RDS")
        unlink(file.path(file.path(private$dir, "rds_files", "data", paste0(id, ".RDS"))), recursive = TRUE)
      }

      #delete done
      done_dir = file.path(private$dir, "rds_files", "results", "done")
      done_features = list.files(done_dir)
      for (feature in done_features) {
        for (id in ids) {
          done_feat_path = file.path(private$dir, "rds_files", "results", "done", feature, paste0(id, ".RDS"))
          if (file.exists(done_feat_path)) {
            message(paste0("Deleting results from id: ", id))
            unlink(done_feat_path, recursive = TRUE)
          }
        }
      }
      #delete error
      failed_dir = file.path(private$dir, "rds_files", "results", "failed")
      failed_features = list.files(failed_dir)
      for (feature in failed_features) {
        for (id in ids) {
          failed_feat_path = file.path(private$dir, "rds_files", "results", "failed", feature, paste0(id, ".RDS"))
          if (file.exists(failed_feat_path)) {
            message(paste0("Deleting error messages from id: ", id))
            unlink(failed_feat_path, recursive = TRUE)
          }
        }
      }
      return(invisible(self))
    },
    get_data = function(ids) {
      if (missing(ids)) ids = self$ids
      checkmate::assert_character(ids, min.len = 1L)
      checkmate::assert_subset(ids, self$ids)
      data = future.apply::future_lapply(ids, function(i) {
        readRDS(file.path(private$dir, "rds_files", "data", paste0(i, ".RDS")))
      })
      if (length(unique(lapply(data, function(df) typeof(df[, private$group_by])))) > 1) {
        message("Different vector types detected. Converting group column to character.")
        for (i in seq_along(data)) {
          data[[i]][, private$group_by] = as.character(data[[i]][, private$group_by])
        }
      }
      dplyr::bind_rows(data)
    },
    add_feature = function(fun, check_fun = TRUE) {
      checkmate::assert_logical(check_fun)
      checkmate::assert_function(fun)
      if (deparse(substitute(fun)) %in% self$features) stop(paste0("Feature function '", deparse(substitute(fun)), "' was already added."))
      saveRDS(list(fun = fun, check_fun = check_fun), file = file.path(private$dir, "rds_files", "features", paste0(deparse(substitute(fun)), ".RDS")))
      dir.create(file.path(private$dir, "rds_files", "results", "done", deparse(substitute(fun))))
      dir.create(file.path(private$dir, "rds_files", "results", "failed", deparse(substitute(fun))))
      return(invisible(self))
    },
    remove_feature = function(fun) {
      if (is.function(fun)) fun = as.character(substitute(fun))
      checkmate::assert_character(fun, min.len = 1L)
      checkmate::assert_subset(fun, self$features)
      for (f in fun) {
        unlink(file.path(private$dir, "rds_files", "features", paste0(f, ".RDS")), recursive = TRUE)
        unlink(file.path(private$dir, "rds_files", "results", "done", f), recursive = TRUE)
        unlink(file.path(private$dir, "rds_files", "results", "failed", f), recursive = TRUE)
      }
      return(invisible(self))
    },
    get_feature = function(fun) {
      checkmate::assert_character(fun, len = 1L)
      checkmate::assert_subset(fun, self$features)
      readRDS(file.path(private$dir, "rds_files", "features", paste0(fun, ".RDS")))$fun
    },
    calc_features = function(features, ids, retry_failed = TRUE) {
      if (missing(features)) features = self$features
      checkmate::assert_character(features)
      checkmate::assert_subset(features, self$features)
      checkmate::assert_logical(retry_failed)
      if (!missing(ids)) checkmate::assert_character(ids)
      if (!missing(ids)) checkmate::assert_subset(ids, self$ids)

      if (length(self$ids) == 0) stop("Please add datasets with method $add_data().")
      if (length(self$features) == 0) stop("Please add feature functions with method $add_feature().")

      features_new = features
      status = self$status
      for (feature in features) {
        if (all(self$status[[feature]] == "done")) {
          features_new = setdiff(features_new, feature)
          message(paste0("Feature function '", feature, "' was already applied on every ID and will be skipped."))
        }
      }

      #calculating features using future.apply
      message(paste0("Calculating features on ", future::nbrOfWorkers(), " core(s)."))
      for (feature in features_new) {
        idm = ifelse(missing(ids), "", paste0(" on IDs: ", paste0(ids, collapse = ", ")))
        message(paste0("Calculating feature function: ", feature, idm))
        feat_fun = self$get_feature(feature)

        if (missing(ids)) {
          ids_calc = status[which(status[, feature] == "not_done"), private$group_by]
        } else {
          ids_calc = ids
        }

        if (retry_failed) {
          ids_failed = status[which(status[, feature] == "failed"), private$group_by]
          if (length(ids_failed) > 0) message("Failed features will be calculated again. For stochastical features set a seed inside your function!")
          ids_calc = unique(c(ids_calc, ids_failed))
        }

        if (length(ids_calc) == 0) message("Nothing to calculate.")

        future.apply::future_lapply(ids_calc, function(x) {
          data = self$get_data(x)
          group_by = private$group_by
          res_id = tryCatch(fxtract::dplyr_wrapper(data, group_by, feat_fun, check_fun = private$get_check_fun(feature)), error = function(e) e$message)

          #if error, save as error, else save result
          feat_fail_path = file.path(private$dir, "rds_files", "results", "failed", feature, paste0(x, ".RDS"))
          if (is.character(res_id)) {
            saveRDS(res_id, file = feat_fail_path)
          } else {
            if (file.exists(feat_fail_path)) unlink(feat_fail_path, recursive = TRUE)
            saveRDS(res_id, file = file.path(private$dir, "rds_files", "results", "done", feature,  paste0(x, ".RDS")))
          }
        }, future.seed = TRUE)
      }
      return(invisible(self))
    }
  ),
  private = list(
    name = NULL,
    group_by = NULL,
    dir = NULL,
    get_check_fun = function(fun) {
      checkmate::assert_character(fun, len = 1L)
      checkmate::assert_subset(fun, self$features)
      readRDS(file.path(private$dir, "rds_files", "features", paste0(fun, ".RDS")))$check_fun
    }
  ),
  active = list(
    error_messages = function() {
      error_df = setNames(data.frame(matrix(ncol = 3, nrow = 0), stringsAsFactors = FALSE), c("feature_function", "id", "error_message"))
      for (feat in self$features) {
        error_feats = list.files(file.path(private$dir, "rds_files", "results", "failed", feat))
        if (length(error_feats) == 0) next
        for (file in error_feats) {
          error_message = readRDS(file.path(private$dir, "rds_files", "results", "failed", feat, file))
          error_df = rbind(error_df, data.frame(feature_function = feat,
            id = gsub(".RDS", "", file), error_message = error_message, stringsAsFactors = FALSE))
        }
      }
      data.table::setDF(error_df)
    },
    ids = function() {
      gsub(".RDS", "", list.files(file.path(private$dir, "rds_files", "data")))
    },
    features = function() {
      gsub(".RDS", "", list.files(file.path(private$dir, "rds_files", "features")))
    },
    results = function() {
      todo_data = gsub(".RDS", "", list.files(file.path(private$dir, "rds_files", "data")))
      final_result = setNames(data.frame(todo_data, stringsAsFactors = FALSE), private$group_by)
      if (nrow(final_result) == 0) return(final_result)

      for (feat in self$features) {
        results_feat = future.apply::future_lapply(list.files(file.path(private$dir, "rds_files", "results", "done", feat), full.names = TRUE), readRDS)
        results_feat = data.table::rbindlist(results_feat, fill = TRUE) %>% data.frame()
        if (nrow(results_feat) == 0) next
        results_feat[, private$group_by] = as.character(results_feat[, private$group_by])
        final_result = dplyr::full_join(final_result, results_feat, by = private$group_by)
      }
      data.table::setDF(final_result)
    },
    status = function() {
      todo_data = gsub(".RDS", "", list.files(file.path(private$dir, "rds_files", "data")))
      todo_feats = gsub(".RDS", "", list.files(file.path(private$dir, "rds_files", "features")))
      status = setNames(data.frame(todo_data, stringsAsFactors = FALSE), private$group_by)
      if (nrow(status) == 0) return(status)
      for (feat in todo_feats) {
        #make done df
        done_feat = gsub(".RDS", "", list.files(file.path(private$dir, "rds_files", "results", "done", feat)))
        if (length(done_feat) >= 1) {
          done_df = setNames(data.frame(done_feat, "done", stringsAsFactors = FALSE), c(private$group_by, feat))
        } else {
          done_df = setNames(data.frame(matrix(ncol = 2, nrow = 0), stringsAsFactors = FALSE), c(private$group_by, feat))
        }
        #make error df
        error_feat = gsub(".RDS", "", list.files(file.path(private$dir, "rds_files", "results", "failed", feat)))
        if (length(error_feat) >= 1) {
          error_df = setNames(data.frame(error_feat, "failed", stringsAsFactors = FALSE), c(private$group_by, feat))
        } else {
          error_df = setNames(data.frame(matrix(ncol = 2, nrow = 0), stringsAsFactors = FALSE), c(private$group_by, feat))
        }
        #make not done df
        not_done_feat = setdiff(todo_data, c(done_feat, error_feat))
        if (length(not_done_feat) >= 1) {
          not_done_df = setNames(data.frame(not_done_feat, "not_done", stringsAsFactors = FALSE), c(private$group_by, feat))
        } else {
          not_done_df = setNames(data.frame(matrix(ncol = 2, nrow = 0), stringsAsFactors = FALSE), c(private$group_by, feat))
        }
        status_feat = rbind(done_df, error_df, not_done_df)
        status = dplyr::left_join(status, status_feat, by = private$group_by)
      }
      status
    }
  )
)

Try the fxtract package in your browser

Any scripts or data that you put into this service are public.

fxtract documentation built on July 8, 2020, 5:43 p.m.