R/obj_Tokenizer.R

# This file is part of the R package "aifeducation".
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as published by
# the Free Software Foundation.
#
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>

#' @title Base class for tokenizers
#' @description Base class for tokenizers containing all methods shared by the sub-classes.
#' @return `r get_description("return_object")`
#' @family R6 Classes for Developers
#' @export
TokenizerBase <- R6::R6Class(
  classname = "TokenizerBase",
  inherit = AIFEMaster,
  private = list(
    tokenizer_statistics = data.frame(),
    configured = FALSE,
    #-------------------------------------------------------------------------
    load_reload_python_scripts = function() {
      load_py_scripts("py_log.py")
    },
    #------------------------------------------------------------------------
    # Method for loading tokenizer statistics
    load_tokenizer_statistics = function(model_dir) {
      path <- file.path(model_dir, "tokenizer_statistics.csv")
      if (file.exists(path)) {
        private$tokenizer_statistics <- utils::read.csv(file = path)
      } else {
        private$tokenizer_statistics <- NA
      }
    },
    #------------------------------------------------------------------------
    # Method for saving tokenizer statistics
    save_tokenizer_statistics = function(dir_path, folder_name) {
      if (!is.null_or_na(private$tokenizer_statistics)) {
        save_location <- file.path(dir_path, folder_name)
        create_dir(dir_path, trace = TRUE, msg_fun = FALSE)
        write.csv(
          x = private$tokenizer_statistics,
          file = file.path(save_location, "tokenizer_statistics.csv"),
          row.names = FALSE,
          quote = FALSE
        )
      }
    }
  ),
  public = list(
    #--------------------------------------------------------------------------
    #' @description Method for saving a model on disk.
    #' @param dir_path `r get_description("save_dir")`
    #' @param folder_name `r get_param_doc_desc("folder_name")`
    #' @return `r get_description("return_save_on_disk")`
    #'
    #' @importFrom utils write.csv
    save = function(dir_path, folder_name) {
      check_type(object = dir_path, type = "string", FALSE)
      check_type(object = folder_name, type = "string", FALSE)

      # Create Directory and Folder
      save_location <- file.path(dir_path, folder_name)
      create_dir(dir_path, trace = TRUE, msg_fun = FALSE)
      create_dir(save_location, trace = TRUE, msg_fun = FALSE)

      # Save tokenizer statistics
      private$save_tokenizer_statistics(
        dir_path = dir_path,
        folder_name = folder_name
      )

      # Save Sustainability Data
      private$save_sustainability_data(dir_path = dir_path, folder_name = folder_name)

      # Write vocab txt
      # special_tokens <- self$get_special_tokens()
      # special_tokens <- special_tokens[order(x = special_tokens[, "id"]), ]
      # special_tokens <- unique(special_tokens[, "token"])
      # write(
      #  x = c(special_tokens, names(private$model$get_vocab())),
      #  file = paste0(save_location, "/", "vocab.txt")
      # )

      # Save Tokenizer
      file_paths <- private$model$save_pretrained(save_location)
    },
    #--------------------------------------------------------------------------
    #' @description Loads an object from disk
    #' and updates the object to the current version of the package.
    #' @param dir_path `r get_description("load_dir")`
    #' @return `r get_description("return_load_on_disk")`
    load_from_disk = function(dir_path) {
      # Load or reload python scripts
      private$load_reload_python_scripts()

      # Load private and public config files
      private$load_config_file(dir_path)

      # Load the tokenizer model
      private$model <- transformers$PreTrainedTokenizerFast$from_pretrained(dir_path)

      # Load Tokenizer Statistics
      private$load_tokenizer_statistics(dir_path)

      # Load Sustainability Data
      private$load_sustainability_data(model_dir = dir_path)

      # Prevent modification
      private$set_configuration_to_TRUE()
      private$trained <- TRUE
    },
    # ------------------------------------------------------------------------
    #' @description Tokenizer statistics
    #' @return Returns a `data.frame` containing the tokenizer's statistics.
    get_tokenizer_statistics = function() {
      return(private$tokenizer_statistics)
    },
    #--------------------------------------------------------------------------
    #' @description Python tokenizer
    #' @return Returns the python tokenizer within the model.
    get_tokenizer = function() {
      return(private$model)
    },
    #-------------------------------------------------------------------------
    #' @description Method for encoding words of raw texts into integers.
    #' @param raw_text `r get_param_doc_desc("raw_text")`
    #' @param token_overlap `r get_param_doc_desc("token_overlap")`
    #' @param max_token_sequence_length `r get_param_doc_desc("max_token_sequence_length")`
    #' @param n_chunks `r get_param_doc_desc("n_chunks")`
    #' @param token_encodings_only `r get_param_doc_desc("token_encodings_only")`
    #' @param return_token_type_ids `r get_param_doc_desc("return_token_type_ids")`
    #' @param token_to_int `r get_param_doc_desc("token_to_int")`
    #' @param trace `r get_param_doc_desc("trace")`
    #' @return `list` containing the integer or token sequences of the raw texts with
    #' special tokens.
    encode = function(raw_text,
                      token_overlap = 0L,
                      max_token_sequence_length = 512L,
                      n_chunks = 1L,
                      token_encodings_only = FALSE,
                      token_to_int = TRUE,
                      return_token_type_ids = TRUE,
                      trace = FALSE) {
      # Checking
      check_type(object = raw_text, type = "vector", FALSE)
      check_type(object = token_encodings_only, type = "bool", FALSE)
      check_type(object = token_to_int, type = "bool", FALSE)
      check_type(object = trace, type = "bool", FALSE)

      # Start
      n_units <- length(raw_text)
      #---------------------------------------------------------------------
      if (token_encodings_only) {
        encodings <- NULL
        encodings_only <- NULL
        for (i in 1L:n_units) {
          tokens_unit <- NULL

          tokens <- private$model(
            raw_text[i],
            stride = as.integer(token_overlap),
            padding = "max_length",
            truncation = TRUE,
            return_overflowing_tokens = TRUE,
            return_length = FALSE,
            return_offsets_mapping = FALSE,
            return_attention_mask = FALSE,
            max_length = as.integer(max_token_sequence_length),
            return_tensors = "np"
          )

          tmp_seq_len <- nrow(tokens[["input_ids"]])

          chunks <- min(tmp_seq_len, n_chunks)

          for (j in 1L:chunks) {
            tokens_unit[j] <- list(tokens["input_ids"][j, ])
            if (trace) {
              cat(paste(get_time_stamp(), i, "/", n_units, "block", j, "/", chunks, "\n"))
            }
          }
          encodings_only[i] <- list(tokens_unit)
        }
        if (token_to_int) {
          return(encodings_only)
        } else {
          # Convert ids to tokens

          token_seq_list <- NULL
          for (i in seq_len(length(encodings_only))) {
            tmp_sequence <- encodings_only[[i]]
            tmp_seqeunce_tok <- NULL
            for (j in seq_len(length(tmp_sequence))) {
              tmp_seqeunce_tok[length(tmp_seqeunce_tok) + 1L] <- list(
                private$model$convert_ids_to_tokens(
                  ids = as.integer(tmp_sequence[[j]]), skip_special_tokens = FALSE
                )
              )
            }
            token_seq_list[length(token_seq_list) + 1L] <- list(tmp_seqeunce_tok)
          }
          return(token_seq_list)
        }

        #--------------------------------------------------------------------
      } else {
        encodings <- NULL
        chunk_list <- vector(length = n_units)
        total_chunk_list <- vector(length = n_units)
        for (i in 1L:n_units) {
          tokens <- private$model(
            raw_text[i],
            stride = as.integer(token_overlap),
            padding = "max_length",
            truncation = TRUE,
            max_length = as.integer(max_token_sequence_length),
            return_overflowing_tokens = TRUE,
            return_length = FALSE,
            return_offsets_mapping = FALSE,
            return_attention_mask = TRUE,
            return_token_type_ids = return_token_type_ids,
            return_tensors = "pt"
          )

          tmp_dataset <- datasets$Dataset$from_dict(tokens)

          tmp_seq_len <- tmp_dataset$num_rows
          chunk_list[i] <- min(tmp_seq_len, n_chunks)
          total_chunk_list[i] <- tmp_seq_len
          if (chunk_list[i] == 1L) {
            tmp_dataset <- tmp_dataset$select(list(as.integer((1L:chunk_list[[i]]) - 1L)))
          } else {
            tmp_dataset <- tmp_dataset$select(as.integer((1L:chunk_list[[i]]) - 1L))
          }
          encodings <- datasets$concatenate_datasets(c(encodings, tmp_dataset))
        }
        return(encodings_list = list(
          encodings = encodings,
          chunks = chunk_list,
          total_chunks = total_chunk_list
        ))
      }
    },
    #--------------------------------------------------------------------------
    #' @description Method for decoding a sequence of integers into tokens
    #' @param int_seqence `r get_param_doc_desc("int_seqence")`
    #' @param to_token `r get_param_doc_desc("to_token")`
    #' @return `list` of token sequences
    decode = function(int_seqence, to_token = FALSE) {
      # Check
      check_type(object = int_seqence, type = "list", FALSE)
      check_type(object = to_token, type = "bool", FALSE)

      # Start
      tmp_token_list <- NULL
      for (i in seq_len(length(int_seqence))) {
        tmp_seq_token_list <- NULL
        for (j in seq_len(length(int_seqence[[i]]))) {
          tmp_vector <- int_seqence[[i]][[j]]
          mode(tmp_vector) <- "integer"
          if (!to_token) {
            tmp_seq_token_list[j] <- list(private$model$decode(
              token_ids = tmp_vector,
              skip_special_tokens = TRUE
            ))
          } else {
            tmp_seq_token_list[j] <- list(paste(private$model$convert_ids_to_tokens(tmp_vector), collapse = " "))
          }
        }
        tmp_token_list[i] <- list(tmp_seq_token_list)
      }
      return(tmp_token_list)
    },
    #---------------------------------------------------------------------------
    #' @description Method for receiving the special tokens of the model
    #' @return Returns a `matrix` containing the special tokens in the rows
    #' and their type, token, and id in the columns.
    get_special_tokens = function() {
      special_tokens <- c(
        "bos_token",
        "eos_token",
        "unk_token",
        "sep_token",
        "pad_token",
        "cls_token",
        "mask_token"
      )
      tokens_map <- matrix(
        nrow = length(special_tokens),
        ncol = 3L,
        data = NA
      )
      colnames(tokens_map) <- c("type", "token", "id")
      rownames(tokens_map) <- special_tokens

      for (i in seq_len(length(special_tokens))) {
        tokens_map[i, 1L] <- special_tokens[i]
        tokens_map[i, 2L] <- replace_null_with_na(private$model[special_tokens[i]])
        tokens_map[i, 3L] <- replace_null_with_na(
          private$model[paste0(special_tokens[i], "_id")]
        )
      }
      return(tokens_map)
    },
    #--------------------------------------------------------------------------
    #' @description Method for receiving the special tokens of the model
    #' @return Returns an 'int' counting the number of special tokens.
    n_special_tokens = function() {
      special_tokens <- self$get_special_tokens()
      return(
        length(
          unique(
            special_tokens[, "token"]
          )
        )
      )
    },
    #--------------------------------------------------------------------------
    #' @description Method for calculating tokenizer statistics as suggested by
    #' Kaya and Tantuğ (2024).
    #'
    #' Kaya, Y. B., & Tantuğ, A. C. (2024). Effect of tokenization granularity
    #' for Turkish large language models. Intelligent Systems with
    #' Applications, 21, 200335.
    #' \ifelse{text}{\doi{doi:10.1016/j.iswa.2024.200335}}{<https://doi.org/10.1016/j.iswa.2024.200335>}
    #'
    #' @param text_dataset `r get_param_doc_desc("text_dataset")`
    #' @param statistics_max_tokens_length `r get_param_doc_desc("statistics_max_tokens_length")`
    #' @param step `string` describing the context of the estimation.
    #' @returns Returns a `data.frame` containing the estimates.

    #' @return Returns an 'int' counting the number of special tokens.
    calculate_statistics = function(text_dataset,
                                    statistics_max_tokens_length,
                                    step = "creation") {
      # Calculate tokenizer statistics
      tokenized_texts_raw <- tokenize_dataset(
        dataset = text_dataset$get_dataset(),
        tokenizer = private$model,
        max_length = statistics_max_tokens_length,
        add_special_tokens = FALSE,
        log_file = NULL,
        write_interval = 2L,
        value_top = 1L,
        total_top = 1L,
        message_top = "NA"
      )

      statistics <- as.data.frame(
        calc_tokenizer_statistics(
          dataset = tokenized_texts_raw,
          statistics_max_tokens_length = statistics_max_tokens_length,
          step = step
        )
      )
      return(statistics)
    }
  )
)


#' @title WordPieceTokenizer
#' @description Tokenizer based on the WordPiece model (Wu et al. 2016).
#' @return `r get_description("return_object")`
#' @references Wu, Y., Schuster, M., Chen, Z., Le, Q. V., Norouzi, M., Macherey, W.,
#' Krikun, M., Cao, Y., Gao, Q., Macherey, K., Klingner, J., Shah, A.,
#' Johnson, M., Liu, X., Kaiser, Ł., Gouws, S., Kato, Y., Kudo, T., Kazawa,
#' H., . . . Dean, J. (2016). Google's Neural Machine Translation System:
#' Bridging the Gap between Human and Machine Translation.
#' \ifelse{text}{\doi{doi:10.48550/arXiv.1609.08144}}{<https://doi.org/10.48550/arXiv.1609.08144>}
#'
#' @family Tokenizer
#' @export
WordPieceTokenizer <- R6::R6Class(
  classname = "WordPieceTokenizer",
  inherit = TokenizerBase,
  private = list(),
  public = list(
    #--------------------------------------------------------------------------
    #' @description Configures a new object of this class.
    #' @param vocab_size `r get_param_doc_desc("vocab_size")`
    #' @param vocab_do_lower_case `r get_param_doc_desc("vocab_do_lower_case")`
    #' @return `r get_description("return_nothing")`
    configure = function(vocab_size = 10000L,
                         vocab_do_lower_case = FALSE) {
      private$load_reload_python_scripts()
      private$check_config_for_FALSE()

      private$save_all_args(
        args = get_called_args(n = 1L),
        group = "configure"
      )

      # Set package versions
      private$set_package_versions()

      # Set configured to TRUE to avoid changes in the model
      private$set_configuration_to_TRUE()
    },
    #--------------------------------------------------------------------------
    #' @description Trains a new object of this class
    #' @param text_dataset `r get_param_doc_desc("text_dataset")`
    #' @param statistics_max_tokens_length `r get_param_doc_desc("statistics_max_tokens_length")`
    #' @param sustain_track `r get_param_doc_desc("sustain_track")`
    #' @param sustain_iso_code `r get_param_doc_desc("sustain_iso_code")`
    #' @param sustain_region `r get_param_doc_desc("sustain_region")`
    #' @param sustain_interval `r get_param_doc_desc("sustain_interval")`
    #' @param sustain_log_level `r get_param_doc_desc("sustain_log_level")`
    #' @param trace `r get_param_doc_desc("trace")`
    #' @return `r get_description("return_nothing")`
    train = function(text_dataset,
                     statistics_max_tokens_length = 512L,
                     sustain_track = FALSE,
                     sustain_iso_code = NULL,
                     sustain_region = NULL,
                     sustain_interval = 15L,
                     sustain_log_level = "warning",
                     trace = FALSE) {
      private$check_config_for_TRUE()
      private$check_for_untrained()

      private$save_all_args(
        args = get_called_args(n = 1L),
        group = "training"
      )

      private$init_and_start_sustainability_tracking()

      # Define tokens
      sep_token <- "[SEP]"
      sep_id <- 1L
      cls_token <- "[CLS]"
      cls_id <- 0L
      unk_token <- "[UNK]"
      pad_token <- "[PAD]"
      mask_token <- "[MASK]"
      bos_token <- "[CLS]"
      eos_token <- "[SEP]"

      special_tokens <- c(
        cls_token,
        sep_token,
        unk_token,
        pad_token,
        mask_token,
        bos_token,
        eos_token
      )

      # Create model
      tok_new <- tok$Tokenizer(tok$models$WordPiece(unk_token = unk_token))
      tok_new$normalizer <- tok$normalizers$BertNormalizer(
        lowercase = private$model_config$vocab_do_lower_case,
        clean_text = TRUE,
        handle_chinese_chars = TRUE,
        strip_accents = private$model_config$vocab_do_lower_case
      )
      tok_new$pre_tokenizer <- tok$pre_tokenizers$BertPreTokenizer()
      tok_new$post_processor <- tok$processors$BertProcessing(
        sep = reticulate::tuple(list(sep_token, as.integer(sep_id))),
        cls = reticulate::tuple(list(cls_token, as.integer(cls_id)))
      )

      tok_new$decoder <- tok$decoders$WordPiece()

      # configurate training
      trainer <- tok$trainers$WordPieceTrainer(
        vocab_size = as.integer(private$model_config$vocab_size),
        special_tokens = special_tokens,
        show_progress = trace
      )

      # calculate the model
      run_py_file("datasets_transformer_compute_vocabulary.py")

      tok_new$train_from_iterator(
        iterator = py$batch_iterator(
          batch_size = 200L,
          dataset = text_dataset$get_dataset(),
          log_file = NULL,
          write_interval = 2L,
          value_top = 0L,
          total_top = 1L,
          message_top = "NA"
        ),
        trainer = trainer,
        length = as.integer(text_dataset$n_rows())
      )

      # Create the complete and final model
      private$model <- transformers$PreTrainedTokenizerFast(
        tokenizer_object = tok_new,
        unk_token = unk_token,
        sep_token = sep_token,
        pad_token = pad_token,
        cls_token = cls_token,
        mask_token = mask_token,
        bos_token = bos_token,
        eos_token = eos_token
      )

      # Calculate tokenizer statistics
      private$tokenizer_statistics <- self$calculate_statistics(
        text_dataset = text_dataset,
        statistics_max_tokens_length = statistics_max_tokens_length,
        step = "creation"
      )

      # Update
      private$model_config$vocab_size <- length(private$model$get_vocab())

      # Set trained field
      private$trained <- TRUE

      private$stop_sustainability_tracking("Create tokenizer")
    }
  )
)
# Add the model to the user list
TokenizerIndex$WordPieceTokenizer <- ("WordPieceTokenizer")


# ===============================================================================

#' @title HuggingFaceTokenizer
#' @description Abstract class for all tokenizers used with the 'transformers' library.
#' @return `r get_description("return_object")`
#' @family Tokenizer
#' @export
HuggingFaceTokenizer <- R6::R6Class(
  classname = "HuggingFaceTokenizer",
  inherit = TokenizerBase,
  private = list(
    load_config_file = function(dir_path) {}
  ),
  public = list(
    #--------------------------------------------------------------------------
    #' @description Creates a tokenizer from a pretrained model
    #' @param model_dir `r get_description("model_dir")`
    #' @return `r get_description("return_object")`
    create_from_hf = function(model_dir) {
      # Load the model
      private$model <- transformers$AutoTokenizer$from_pretrained(model_dir)

      # Set configured to TRUE to avoid changes in the model
      private$set_configuration_to_TRUE()

      # Set package versions
      private$set_package_versions()

      # Set trained field
      private$trained <- TRUE
    }
  )
)
# Add the model to the user list
TokenizerIndex$HuggingFaceTokenizer <- ("HuggingFaceTokenizer")

Try the aifeducation package in your browser

Any scripts or data that you put into this service are public.

aifeducation documentation built on Nov. 19, 2025, 5:08 p.m.