R/obj_FeatureExtractor.R

# This file is part of the R package "aifeducation".
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as published by
# the Free Software Foundation.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>

#' @title Feature extractor for reducing the number for dimensions of text embeddings.
#'
#' @description Abstract class for auto encoders with 'pytorch'.
#'
#' Objects of this class are used for reducing the number of dimensions of text embeddings created by an object
#'   of class [TextEmbeddingModel].
#'
#'   For training an object of class [EmbeddedText] or [LargeDataSetForTextEmbeddings] generated by an object of class
#'   [TextEmbeddingModel] is necessary. Passing raw texts is not supported.
#'
#'   For prediction an ob object class [EmbeddedText] or [LargeDataSetForTextEmbeddings] is necessary that was generated
#'   with the same [TextEmbeddingModel] as during training. Prediction outputs a new object of class [EmbeddedText] or
#'   [LargeDataSetForTextEmbeddings] which contains a text embedding with a lower number of dimensions.
#'
#'   All models use tied weights for the encoder and decoder layers (except `method="LSTM"`) and apply the estimation of
#'   orthogonal weights. In addition, training tries to train the model to achieve uncorrelated features.
#'
#'   Objects of class [TEFeatureExtractor] are designed to be used with classifiers such as [TEClassifierRegular] and
#'   [TEClassifierProtoNet].
#'
#' @return A new instances of this class.
#'
#' @family Text Embedding
#' @export
TEFeatureExtractor <- R6::R6Class(
  classname = "TEFeatureExtractor",
  inherit = ModelsBasedOnTextEmbeddings,
  public = list(
    # New-----------------------------------------------------------------------
    #' @description Creating a new instance of this class.
    #' @param name `r get_param_doc_desc("name")`
    #' @param label `r get_param_doc_desc("label")`
    #' @param text_embeddings `r get_param_doc_desc("text_embeddings")`
    #' @param features `r get_param_doc_desc("features")`
    #' @param method `r get_param_doc_desc("method")`
    #' @param orthogonal_method `r get_param_doc_desc("method")`
    #' @param noise_factor `r get_param_doc_desc("noise_factor")`
    #' @note `features` refers to the number of features for the compressed text embeddings.
    #' @note This model requires `pad_value=0`. If this condition is not met the
    #' padding value is switched automatically.
    #' @return Returns an object of class [TEFeatureExtractor] which is ready for training.
    configure = function(name = NULL,
                         label = NULL,
                         text_embeddings = NULL,
                         features = 128L,
                         method = "dense",
                         orthogonal_method = "matrix_exp",
                         noise_factor = 0.2) {
      tmp_args <- get_called_args(n = 1L)
      private$check_config_for_FALSE()

      # Check arguments
      check_all_args(args = tmp_args)
      private$check_embeddings_object_type(tmp_args$text_embeddings, strict = TRUE)

      # Set TextEmbeddingModel
      private$set_text_embedding_model(
        model_info = tmp_args$text_embeddings$get_model_info(),
        feature_extractor_info = tmp_args$text_embeddings$get_feature_extractor_info(),
        times = tmp_args$text_embeddings$get_times(),
        features = tmp_args$text_embeddings$get_features(),
        pad_value = tmp_args$text_embeddings$get_pad_value()
      )

      # save arguments
      private$save_all_args(args = tmp_args, group = "configure")

      # Perform additional checks and adjustments
      # private$check_param_combinations()

      # Set ML framework
      private$ml_framework <- "pytorch"

      # Setting Label and Name
      private$set_model_info(
        model_name = private$generate_model_id(name),
        label = label,
        model_date = get_time_stamp()
      )

      # Set package versions
      private$set_package_versions()

      # Finalize configuration
      private$set_configuration_to_TRUE()

      # Create_Model
      private$create_reset_model()
    },

    #-------------------------------------------------------------------------
    #' @description Method for training a neural net.
    #' @param data_embeddings `r get_param_doc_desc("data_embeddings")`
    #' @param data_val_size `r get_param_doc_desc("data_val_size")`
    #' @param sustain_track `r get_param_doc_desc("sustain_track")`
    #' @param sustain_iso_code `r get_param_doc_desc("sustain_iso_code")`
    #' @param sustain_region `r get_param_doc_desc("sustain_region")`
    #' @param sustain_interval `r get_param_doc_desc("sustain_interval")`
    #' @param sustain_log_level `r get_param_doc_desc("sustain_log_level")`
    #' @param epochs `r get_param_doc_desc("epochs")`
    #' @param batch_size `r get_param_doc_desc("batch_size")`
    #' @param trace `r get_param_doc_desc("trace")`
    #' @param ml_trace `r get_param_doc_desc("ml_trace")`
    #' @param log_dir `r get_param_doc_desc("log_dir")`
    #' @param log_write_interval `r get_param_doc_desc("log_write_interval")`
    #' @param lr_rate `r get_param_doc_desc("lr_rate")`
    #' @param lr_warm_up_ratio `r get_param_doc_desc("lr_warm_up_ratio")`
    #' @param optimizer `r get_param_doc_desc("optimizer")`
    #' @note This model requires that the underlying [TextEmbeddingModel] uses `pad_value=0`. If
    #' this condition is not met the pad value is switched before training.
    #' @return Function does not return a value. It changes the object into a trained classifier.
    train = function(data_embeddings = NULL,
                     data_val_size = 0.25,
                     sustain_track = TRUE,
                     sustain_iso_code = NULL,
                     sustain_region = NULL,
                     sustain_interval = 15L,
                     sustain_log_level = "warning",
                     epochs = 40L,
                     batch_size = 32L,
                     trace = TRUE,
                     ml_trace = 1L,
                     log_dir = NULL,
                     log_write_interval = 10L,
                     lr_rate = 1e-3,
                     lr_warm_up_ratio = 0.02,
                     optimizer = "AdamW") {
      tmp_args <- get_called_args(n = 1L)
      check_all_args(args = tmp_args)
      self$check_embedding_model(data_embeddings)

      # Save args
      private$save_all_args(args = tmp_args, group = "training")

      # Perform additional checks and adjustments
      # private$check_param_combinations()

      # set up logger
      private$set_up_logger(log_dir = log_dir, log_write_interval = log_write_interval)

      # Loading PY Scripts
      private$load_reload_python_scripts()

      # Start-------------------------------------------------------------------
      if (self$last_training$config$trace) {
        message(
          get_time_stamp(),
          " Start"
        )
      }

      # Set up dataset
      if (inherits(data_embeddings, "EmbeddedText")) {
        tmp_data <- data_embeddings$convert_to_LargeDataSetForTextEmbeddings()
        tmp_data <- tmp_data$get_dataset()
      } else {
        tmp_data <- data_embeddings$get_dataset()
      }

      # Reduce to unique cases for training
      tmp_data <- reduce_to_unique(tmp_data, "id")

      # Copy input as label for training
      extractor_dataset <- tmp_data$map(
        py$map_input_to_labels,
        load_from_cache_file = FALSE,
        keep_in_memory = FALSE,
        cache_file_name = file.path(create_and_get_tmp_dir(), generate_id(15L))
      )

      # Check and create temporary directory for checkpoints
      private$create_checkpoint_directory()

      # Set up log file
      log_top_value <- 0L
      log_top_total <- 1L
      log_top_message <- "Overall"

      # Set format
      extractor_dataset$set_format("torch")

      # Split into train and validation data
      extractor_dataset <- extractor_dataset$train_test_split(self$last_training$config$data_val_size)

      # Start Sustainability Tracking-------------------------------------------
      private$init_and_start_sustainability_tracking()

      # Start Training----------------------------------------------------------
      self$last_training$history <- py$AutoencoderTrain_PT_with_Datasets(
        model = private$model,
        optimizer_method = self$last_training$config$optimizer,
        lr_rate = self$last_training$config$lr_rate,
        lr_warm_up_ratio = self$last_training$config$lr_warm_up_ratio,
        epochs = as.integer(self$last_training$config$epochs),
        trace = as.integer(self$last_training$config$ml_trace),
        batch_size = as.integer(self$last_training$config$batch_size),
        train_data = extractor_dataset$train,
        val_data = extractor_dataset$test,
        filepath = file.path(private$dir_checkpoint, "best_weights.pt"),
        use_callback = TRUE,
        log_dir = private$log_config$log_dir,
        log_write_interval = private$log_config$log_write_interval,
        log_top_value = log_top_value,
        log_top_total = log_top_total,
        log_top_message = log_top_message
      )
      rownames(self$last_training$history$loss) <- c("train", "val")

      # Stop sustainability tracking if requested
      private$stop_sustainability_tracking()

      # Set training status value
      private$trained <- TRUE

      # Clean temporary directory
      private$clean_checkpoint_directory()

      if (self$last_training$config$trace) {
        message(get_time_stamp(), " Training finished")
      }
    },
    #---------------------------------------------------------------------------
    #' @description Method for extracting features. Applying this method reduces the number of dimensions of the text
    #'   embeddings. Please note that this method should only be used if a small number of cases should be compressed
    #'   since the data is loaded completely into memory. For a high number of cases please use the method
    #'   `extract_features_large`.
    #' @param data_embeddings Object of class [EmbeddedText],[LargeDataSetForTextEmbeddings],
    #'   `datasets.arrow_dataset.Dataset` or `array` containing the text embeddings which should be reduced in their
    #'   dimensions.
    #' @param batch_size `int` batch size.
    #' @return Returns an object of class [EmbeddedText] containing the compressed embeddings.
    extract_features = function(data_embeddings, batch_size) {
      # Argument checking
      check_type(object = batch_size, type = "int", FALSE)
      # check data_embeddings object
      if (inherits(data_embeddings, "EmbeddedText") | inherits(data_embeddings, "LargeDataSetForTextEmbeddings")) {
        self$check_embedding_model(text_embeddings = data_embeddings)
      } else {
        private$check_embeddings_object_type(data_embeddings, strict = FALSE)
      }

      # Load Custom Model Scripts
      private$load_reload_python_scripts()

      # Check number of cases in the data
      single_prediction <- private$check_single_prediction(data_embeddings)

      # Get current row names/name of the cases
      current_row_names <- private$get_rownames_from_embeddings(data_embeddings)

      # If at least two cases are part of the data set---------------------------
      if (!single_prediction) {
        prepared_embeddings <- private$prepare_embeddings_as_dataset(data_embeddings)

        prepared_embeddings$set_format("torch")
        reduced_tensors <- py$TeFeatureExtractorBatchExtract(
          model = private$model,
          dataset = prepared_embeddings,
          batch_size = as.integer(batch_size)
        )
        reduced_embeddings <- tensor_to_numpy(reduced_tensors)
        #---------------------------------------------------------------------
      } else {
        prepared_embeddings <- private$prepare_embeddings_as_np_array(data_embeddings)

        if (torch$cuda$is_available()) {
          device <- "cuda"
          dtype <- torch$double
          private$model$to(device, dtype = dtype)
          private$model$eval()
          input <- torch$from_numpy(prepared_embeddings)
          reduced_tensors <- private$model(input$to(device, dtype = dtype),
            encoder_mode = TRUE
          )
          reduced_embeddings <- tensor_to_numpy(reduced_tensors)
        } else {
          device <- "cpu"
          dtype <- torch$float
          private$model$to(device, dtype = dtype)
          private$model$eval()
          input <- torch$from_numpy(prepared_embeddings)
          reduced_tensors <- private$model(input$to(device, dtype = dtype),
            encoder_mode = TRUE
          )
          reduced_embeddings <- tensor_to_numpy(reduced_tensors)
        }
      }

      # Prepare output
      rownames(reduced_embeddings) <- current_row_names

      model_info <- self$get_text_embedding_model()

      red_embedded_text <- EmbeddedText$new()
      red_embedded_text$configure(
        model_name = paste0("feature_extracted_", model_info$model_name),
        model_label = model_info$model$model_label,
        model_date = model_info$model$model_date,
        model_method = model_info$model$model_method,
        model_version = model_info$model$model_version,
        model_language = model_info$model$model_language,
        param_seq_length = model_info$model$param_seq_length,
        param_features = dim(reduced_embeddings)[3L],
        param_chunks = model_info$model$param_chunks,
        param_overlap = model_info$model$param_overlap,
        param_emb_layer_min = model_info$model$param_emb_layer_min,
        param_emb_layer_max = model_info$model$param_emb_layer_max,
        param_emb_pool_type = model_info$model$param_emb_pool_type,
        param_aggregation = model_info$model$param_aggregation,
        param_pad_value = private$text_embedding_model$pad_value,
        embeddings = reduced_embeddings
      )

      red_embedded_text$add_feature_extractor_info(
        model_name = private$model_info$model_name,
        model_label = private$model_info$model_label,
        features = private$model_config$features,
        method = private$model_config$method,
        noise_factor = private$model_config$noise_factor,
        optimizer = private$model_config$optimizer
      )

      return(red_embedded_text)
    },
    #--------------------------------------------------------------------------
    #' @description Method for extracting features from a large number of cases. Applying this method reduces the number
    #'   of dimensions of the text embeddings.
    #' @param data_embeddings Object of class [EmbeddedText] or [LargeDataSetForTextEmbeddings] containing the text
    #'   embeddings which should be reduced in their dimensions.
    #' @param batch_size `int` batch size.
    #' @param trace `bool` If `TRUE` information about the progress is printed to the console.
    #' @return Returns an object of class [LargeDataSetForTextEmbeddings] containing the compressed embeddings.
    extract_features_large = function(data_embeddings, batch_size, trace = FALSE) {
      # Argument checking
      check_class(object = data_embeddings, object_name = "data_embeddings", classes = c("EmbeddedText", "LargeDataSetForTextEmbeddings"), allow_NULL = FALSE)
      check_type(object = batch_size, type = "int", FALSE)
      check_type(object = trace, type = "bool", FALSE)

      # Get total number of batches for the loop
      total_number_of_bachtes <- ceiling(data_embeddings$n_rows() / batch_size)

      # Get indices for every batch
      batches_index <- get_batches_index(
        number_rows = data_embeddings$n_rows(),
        batch_size = batch_size,
        zero_based = TRUE
      )
      # Process every batch
      for (i in 1L:total_number_of_bachtes) {
        tmp_subset <- data_embeddings$select(as.integer(batches_index[[i]]))
        embeddings <- self$extract_features(
          data_embeddings = tmp_subset,
          batch_size = batch_size
        )
        if (i == 1L) {
          # Create Large Dataset
          model_info <- self$get_text_embedding_model()

          embedded_texts_large <- LargeDataSetForTextEmbeddings$new()
          embedded_texts_large$configure(
            model_label = model_info$model_label,
            model_date = model_info$model_date,
            model_method = model_info$model_method,
            model_version = model_info$model_version,
            model_language = model_info$model_language,
            param_seq_length = model_info$param_seq_length,
            param_features = dim(embeddings)[3L],
            param_chunks = model_info$model$param_chunks,
            param_overlap = model_info$model$param_overlap,
            param_emb_layer_min = model_info$model$param_emb_layer_min,
            param_emb_layer_max = model_info$model$param_emb_layer_max,
            param_emb_pool_type = model_info$model$param_emb_pool_type,
            param_aggregation = model_info$model$param_aggregation,
            param_pad_value = private$text_embedding_model$pad_value
          )
          embedded_texts_large$add_feature_extractor_info(
            model_name = private$model_info$model_name,
            model_label = private$model_info$model_label,
            features = private$model_config$features,
            method = private$model_config$method,
            noise_factor = private$model_config$noise_factor,
            optimizer = private$model_config$optimizer
          )

          # Add new data
          embedded_texts_large$add_embeddings_from_EmbeddedText(embeddings)
        } else {
          # Add new data
          embedded_texts_large$add_embeddings_from_EmbeddedText(embeddings)
        }
        print_message(
          msg = paste("Compress Embeddings - Batch", i, "/", total_number_of_bachtes, "done"),
          trace = trace
        )
        gc()
      }
      return(embedded_texts_large)
    },
    #' @description Method for requesting a plot of the training history.
    #' This method requires the *R* package 'ggplot2' to work.
    #' @param x_min `r get_param_doc_desc("x_min")`
    #' @param x_max `r get_param_doc_desc("x_max")`
    #' @param y_min `r get_param_doc_desc("y_min")`
    #' @param y_max `r get_param_doc_desc("y_max")`
    #' @param ind_best_model `r get_param_doc_desc("ind_best_model")`
    #' @param text_size `r get_param_doc_desc("text_size")`
    #' @return Returns a plot of class `ggplot` visualizing the training process.
    plot_training_history = function(x_min = NULL, x_max = NULL, y_min = NULL, y_max = NULL, ind_best_model = TRUE, text_size = 10L) {
      tmp_plot <- super$plot_training_history(
        final_training = TRUE,
        pl_step = NULL,
        measure = "loss",
        ind_best_model = ind_best_model,
        ind_selected_model = FALSE,
        x_min = x_min,
        x_max = x_max,
        y_min = y_min,
        y_max = y_max,
        add_min_max = FALSE,
        text_size = text_size
      )
      return(tmp_plot)
    }
  ),
  private = list(
    trained = FALSE,
    #--------------------------------------------------------------------------
    load_reload_python_scripts = function() {
      load_py_scripts(c(
        "pytorch_act_fct.py",
        "pytorch_loss_fct.py",
        "pytorch_layers.py",
        "pytorch_layers_normalization.py",
        "pytorch_stack_layers.py",
        "pytorch_autoencoder.py",
        "py_log.py",
        "py_functions.py",
        "pytorch_classifier_models.py",
        "pytorch_cls_training_loops.py",
        "pytorch_predict_batch.py",
        "pytorch_datacollators.py",
        "pytorch_old_scripts.py"
      ))
    },
    #--------------------------------------------------------------------------
    create_reset_model = function() {
      private$load_reload_python_scripts()
      private$check_config_for_TRUE()

      if (private$model_config$method == "LSTM") {
        private$model <- py$LSTMAutoencoder_with_Mask_PT(
          times = as.integer(private$text_embedding_model["times"]),
          features_in = as.integer(private$text_embedding_model["features"]),
          features_out = as.integer(private$model_config$features),
          noise_factor = private$model_config$noise_factor,
          pad_value = private$text_embedding_model$pad_value
        )
      } else if (private$model_config$method == "Dense") {
        private$model <- feature_extractor <- py$DenseAutoencoder_with_Mask_PT(
          features_in = as.integer(private$text_embedding_model["features"]),
          features_out = as.integer(private$model_config$features),
          noise_factor = private$model_config$noise_factor,
          pad_value = private$text_embedding_model$pad_value,
          orthogonal_method = private$model_config$orthogonal_method
        )
      }
    },
    #--------------------------------------------------------------------------
    generate_model_id = function(name) {
      if (is.null(name)) {
        return(paste0("tefe_", generate_id(16L)))
      } else {
        return(name)
      }
    }
  )
)

# Add the model to the user list
TextEmbeddingObjectsIndex$TEFeatureExtractor <- ("TEFeatureExtractor")

Try the aifeducation package in your browser

Any scripts or data that you put into this service are public.

aifeducation documentation built on Nov. 19, 2025, 5:08 p.m.