R/main.R

Defines functions run_sim predicting_sim svt_predicting_sim predict_model

Documented in predicting_sim predict_model run_sim svt_predicting_sim

#' @include model_helper.R plotting_tools.R
NULL


#' Schedule Jobs Using Predictions On Given Test Set
#'
#' Sequantially schedule jobs using predictions on provided test set.
#'
#' @param object A S4 sim object.
#' @param trained_result A trained object depending on the model used for training.
#' @param test_x A numeric vector representing the test set.
#' @param test_xreg A numeric vector representing the dataset that target dataset depends on for scheduling and evaluations.
#' @param predict_info A dataframe containing all the past predicted information.
#' @param switch_status A list containing all the information about current switches and identifiers.
#' @return A dataframe containing the past predicted information and the current predicted information.
#' @keywords internal
predict_model <- function(object, trained_result, test_x, test_xreg, predict_info, switch_status) {
  adjust_switch <- switch_status$adjust_switch
  react_counter <- switch_status$react_counter

  test_predicted_quantiles <- data.frame(stringsAsFactors = FALSE)
  test_predicted_params <- data.frame(stringsAsFactors = FALSE)

  last_time_schedule <- length(test_x) - object@extrap_step * object@window_size + 1
  predict_iter <- 0
  current_end <- 1
  while (current_end <= last_time_schedule) {
    for (i in 1:object@extrap_step) {
      identifier_info <- data.frame("train_iter" = switch_status$train_iter, "test_iter" = switch_status$test_iter, "predict_iter" = predict_iter + 1)
    }

    start_time <- current_end
    end_time <- start_time + object@window_size * object@extrap_step - 1

    predict_iter <- predict_iter + 1
    current_test_x <- matrix(test_x[0:(start_time - 1),], ncol = 1, dimnames = list(rownames(test_x)[0:(start_time - 1)]))
    if (!is.null(test_xreg) & (is.matrix(test_xreg) | is.data.frame(test_xreg))) {
      current_test_xreg <- matrix(test_xreg[0:(start_time - 1),], ncol = 1, dimnames = list(rownames(test_xreg)[0:(start_time - 1)]))
    } else if (!is.null(test_xreg) & is.list(test_xreg)) {
      current_test_xreg <- lapply(1:length(test_xreg), function(reg) {
        matrix(test_xreg[[reg]][0:(start_time - 1),], ncol = 1, dimnames = list(rownames(test_xreg)[0:(start_time - 1)]))
      })
    } else {
      current_test_xreg <- NULL
    }
    predictor_info_lst <- do_prediction(object, trained_result, test_predicted_quantiles, current_test_x, current_test_xreg)

    actual_obs <- stats::setNames(test_x[start_time:end_time], rownames(test_x)[start_time:end_time])
    scoring_info <- check_score_pred(object, predictor_info_lst$predicted_quantiles, actual_obs, adjust_switch)

    ## Update step based on adjustment policy
    update_info <- get_adjust_switch(scoring_info[nrow(scoring_info), grep("score_pred_1_*", colnames(scoring_info), value = TRUE)], react_counter, adjust_switch, object@react_speed)

    adjust_switch <- update_info$adjust_switch
    react_counter <- update_info$react_counter

    ## Write to predict info dataframe
    test_predicted_quantiles <- rbind(test_predicted_quantiles, cbind(identifier_info, predictor_info_lst$predicted_quantiles, scoring_info))
    test_predicted_params <- rbind(test_predicted_params, cbind(identifier_info, predictor_info_lst$predicted_params))

    current_end <- current_end + object@window_size * object@extrap_step
  }

  predict_info$predicted_quantiles <- rbind(predict_info$predicted_quantiles, test_predicted_quantiles)
  predict_info$predicted_params <- rbind(predict_info$predicted_params, test_predicted_params)
  switch_status <- list("train_iter" = switch_status$train_iter, "test_iter" = switch_status$test_iter + 1, "react_counter" = react_counter, "adjust_switch" = adjust_switch)
  return(list("switch_status" = switch_status, "predict_info" = predict_info))
}


#' Simulation of Scheduling Jobs Based On Predictions On A Single Trace.
#'
#' Sequantially training and testing by scheduling jobs based on predictions on a single trace.
#'
#' @param ts_num The corresponding trace/column in \code{dataset}.
#' @param x A numeric vector of length n representing the target dataset for scheduling and evaluations.
#' @param xreg A numeric vector of length n representing the external regressor.
#' @param start_point A numeric number that represents the starting point of the simulation. Default value is \code{1}.
#' @param wait_time A numeric number that represents the time between testing and next training. Default value is \code{0}.
#' @param write_type A character that represents how to write the result of simulation, can be one of "charwise", "tracewise", "paramwise" or "none".
#' @param plot_type A character that can be one of "charwise", "tracewise", "paramwise" or "none".
#' @param ... Characters that represent the name of parent directories that will be passed to \code{write_location_check}.
#' @return A list containing the resulting prediction informations.
#' @keywords internal
svt_predicting_sim <- function(ts_num, object, x, xreg=NULL, start_point=1, wait_time=0, write_type, plot_type, ...) {
  trace_name <- colnames(x)[ts_num]

  predict_info <- list("predicted_quantiles" = data.frame(stringsAsFactors = FALSE), "predicted_params" = data.frame(stringsAsFactors = FALSE))

  current <- start_point
  last_time_update <- nrow(x) - object@train_size - object@extrap_step * object@window_size + 1

  train_models <- NULL
  train_sig <- TRUE
  switch_status <- list("train_iter" = 0, "test_iter" = 1, "react_counter" = rep(0, length(object@cut_off_prob)), "adjust_switch" = rep(FALSE, length(object@cut_off_prob)))
  while (current <= last_time_update) {
    if (train_sig) {
      # Get training set
      train_start <- current
      train_end <- current + object@train_size - 1
      train_x <- matrix(x[train_start:train_end, ts_num], ncol = 1, dimnames = list(rownames(x)[train_start:train_end], colnames(x)[ts_num]))
      if (!is.null(xreg) & (is.matrix(xreg) | is.data.frame(xreg))) {
        train_xreg <- matrix(xreg[train_start:train_end, ts_num], ncol = 1, dimnames = list(rownames(xreg)[train_start:train_end], colnames(xreg)[ts_num]))
      } else if (!is.null(xreg) & is.list(xreg)) {
        train_xreg <- stats::setNames(lapply(1:length(xreg), function(reg) {
          matrix(xreg[[reg]][train_start:train_end, ts_num], ncol = 1, dimnames = list(rownames(xreg[[reg]])[train_start:train_end], colnames(xreg[[reg]])[ts_num]))
        }), names(xreg))
      } else {
        train_xreg <- NULL
      }

      train_models <- train_model(object, train_x, train_xreg, ifelse(is.list(train_models), train_models, list(train_models)))

      switch_status$train_iter <- switch_status$train_iter + 1

      if (object@train_policy == "offline") {
        train_sig <- FALSE
      }
    }

    ## Get test set
    test_start <- current + object@train_size
    test_end <- min(current + object@train_size + object@update_freq * object@extrap_step * object@window_size - 1, nrow(x))
    test_x <- matrix(x[test_start:test_end, ts_num], ncol = 1, dimnames = list(rownames(x)[test_start:test_end], colnames(x)[ts_num]))
    if (!is.null(xreg) & (is.matrix(xreg) | is.data.frame(xreg))) {
      test_xreg <- matrix(xreg[test_start:test_end, ts_num], ncol = 1, dimnames = list(rownames(xreg)[test_start:test_end], colnames(xreg)[ts_num]))
    } else if (!is.null(xreg) & is.list(xreg)) {
      test_xreg <- stats::setNames(lapply(1:length(xreg), function(reg) {
        matrix(xreg[[reg]][test_start:test_end, ts_num], ncol = 1, dimnames = list(rownames(xreg)[test_start:test_end], colnames(xreg)[ts_num]))
      }), names(xreg))
    } else {
      test_xreg <- NULL
    }

    ## Test Model
    score_switch_info <- predict_model(object, train_models, test_x, test_xreg, predict_info, switch_status)
    switch_status <- score_switch_info$switch_status
    predict_info <- score_switch_info$predict_info

    ## Update Step
    current <- current + object@update_freq * object@extrap_step * object@window_size + wait_time
  }

  if ("tracewise" %in% write_type & !("none" %in% write_type)) {
    write_sim_result(predict_info$predicted_quantiles, "tracewise", trace_name, ...)
    write_sim_result(predict_info$predicted_params, "other", paste("Tracewise Parameters With trace", trace_name), ...)
  }
  if ("tracewise" %in% plot_type & !("none" %in% plot_type)) {
    plot_sim_tracewise(predict_info$predicted_quantiles, trace_name, ...)
  }
  trace_score <- check_score_trace(object@cut_off_prob, predict_info$predicted_quantiles)
  trace_pred_stats <- check_summary_statistics_trace(predict_info$predicted_quantiles, object@granularity)
  return(list("trace_score" = trace_score, "trace_pred_stats" = trace_pred_stats, "predict_info" = predict_info))
}


#' Simulation of Scheduling Jobs Based On Predictions.
#'
#' Sequantially training and testing by scheduling a job.
#'
#' @param object A uni-length sim object that represents a specific parameter setting.
#' @param x A matrix of size n by m representing the target dataset for scheduling and evaluations.
#' @param xreg A matrix of length n by m representing the dataset that target dataset depends on for scheduling and evaluations.
#' @param start_point A numeric number that represents the starting point of the simulation. Default value is \code{1}.
#' @param wait_time A numeric number that represents the time between training and testing. Default value is \code{0}.
#' @param cores The number of threads for parallel programming for multiple traces, not supported for windows users.
#' @param write_type A character that represents how to write the result of simulation, can be one of "charwise", "tracewise", "paramwise" or "none".
#' @param plot_type A character that represents how to plot the result of simulation can be one of "charwise", "tracewise", "paramwise" or "none".
#' @param ... Characters that represent the name of parent directories that will be passed to \code{write_location_check}.
#' @return An S4 sim result object.
#' @import foreach
#' @keywords internal
predicting_sim <- function(object, x, xreg, start_point=1, wait_time=0, cores, write_type, plot_type, ...) {
  ## Do Simulation
  print(get_representation(object, "char_con"))
  print(get_representation(object, "param_con"))
  start_time <- proc.time()

  pb <- progress::progress_bar$new(
    format = "Simulating [:bar] :elapsedfull :percent | ETA :eta",
    total = ncol(x),
    width = getOption("width"),
    clear = FALSE
  )
  cl <- snow::makeCluster(cores)
  doSNOW::registerDoSNOW(cl)
  trace_score <- foreach::foreach(ts_num = 1:ncol(x), .combine = "c", .errorhandling = "remove", .inorder = TRUE,
                                  .options.snow = list(progress = function(n) pb$tick())) %dopar% {
    list(svt_predicting_sim(ts_num, object, x, xreg, start_point, wait_time, write_type, plot_type, ..., get_representation(object, "param_con")))
  }
  snow::stopCluster(cl)

  end_time <- proc.time()
  print(end_time - start_time)

  ## Reformat Results
  trace_score_info <- data.frame()
  trace_pred_stats_info <- data.frame()
  trace_names <- c()
  for (ts_num in 1:ncol(x)) {
    #trace_score_info <- rbind(trace_score_info, trace_score[[ts_num]][["trace_score"]])
    trace_score_info <- rbind(trace_score_info, trace_score[[ts_num]][["trace_score"]])
    trace_pred_stats_info <- rbind(trace_pred_stats_info, trace_score[[ts_num]][["trace_pred_stats"]])
    trace_names <- c(trace_names, colnames(x)[ts_num])
  }
  trace_score_info$trace_name <- trace_names
  trace_pred_stats_info$trace_name <- trace_names

  trace_score_info <- normalize_predict_info(object@cut_off_prob, trace_score_info)
  if ("paramwise" %in% write_type & !("none" %in% write_type)) {
    write_sim_result(trace_score_info, "paramwise", as.character(Sys.time()), ..., get_representation(object, "param_con"))
    write_sim_result(trace_pred_stats_info, "other", paste("Paramwise Prediction Statistics at", as.character(Sys.time())), ..., get_representation(object, "param_con"))
  }
  if ("paramwise" %in% plot_type & !("none" %in% plot_type)) {
    plot_sim_paramwise(trace_score_info, object@target, as.character(Sys.time()), ..., get_representation(object, "param_con"))
  }

  param_score <- check_score_param(object@cut_off_prob, trace_score_info)
  show_result(param_score)
  return(param_score)
}


#' Predictions of Foreground Jobs.
#'
#' Sequentially training and testing by predicting the availability of CPU resource at next windows.
#'
#' @param epoch_setting A dataframe representing a specific parameter setting.
#' @param additional_setting A list containing additional vector-like parameter settings.
#' @param x A matrix of size n by m representing the target dataset for scheduling and evaluations.
#' @param xreg A matrix or a list of matrices of length n by m representing the dataset that target dataset depends on for scheduling and evaluations, or \code{NULL}.
#' @param start_point A numeric number that represents the starting point of the simulation. Default value is \code{1}.
#' @param wait_time A numeric number that represents the time between training and testing. Default value is \code{0}.
#' @param cores A numeric numb representing the number of threads for parallel programming for multiple traces, not supported for windows users.
#' @param write_type A character that represents how to write the result of simulation, can be one of "charwise", "tracewise", "paramwise" or "none".
#' @param plot_type A character that represents how to plot the result of simulation can be one of "charwise", "tracewise", "paramwise" or "none".
#' @param result_loc A character that specify the path to which the result of simulations will be saved to. Default is your work directory.
#' @return A list of S4 sim result object.
#' @export
run_sim <- function(epoch_setting, additional_setting = list(), x, xreg, start_point=1, wait_time=0, cores=parallel::detectCores(), write_type, plot_type, result_loc=getwd()) {
  if (!(any(c(write_type, plot_type) %in% c("charwise", "paramwise", "tracewise", "none")))) {
    stop("plot_type must be one of charwise, tracewise, paramwise and none.")
  }

  name_epoch_setting <- dplyr::group_by_at(epoch_setting, "class")
  score_all_lst <- dplyr::group_map(name_epoch_setting,
                   function(other, class) {
                     defau <- methods::new(paste0(tolower(as.character(class)), "_sim"))
                     char_defau <- names(get_representation(defau, "char_raw"))
                     char_epoch_setting <- dplyr::group_by_at(epoch_setting, c("class", colnames(other)[which(colnames(other) %in% char_defau)]))
                     score_char_lst <- dplyr::group_map(char_epoch_setting,
                                                      function(other, char) {
                                                        param_uni_lst <- methods::as(cbind(char, other), "sim")
                                                        if (!is.null(xreg)) {
                                                          if (ifelse(is.null(additional_setting[["include_response_window_size"]]), FALSE, additional_setting[["include_response_window_size"]])) {
                                                            window_size_for_reg <- c(param_uni_lst[[1]]@window_size, additional_setting[["window_size_for_reg"]])
                                                          } else {
                                                            window_size_for_reg <- additional_setting[["window_size_for_reg"]]
                                                          }

                                                          if (ifelse(is.null(additional_setting[["include_response_window_type"]]), FALSE, additional_setting[["include_response_window_type"]])) {
                                                            window_type_for_reg <- c(param_uni_lst[[1]]@response, additional_setting[["window_type_for_reg"]])
                                                          } else {
                                                            window_type_for_reg <- additional_setting[["window_type_for_reg"]]
                                                          }

                                                          if (is.matrix(xreg) | is.data.frame(xreg)) {
                                                            reg_indicator <- 1
                                                          } else {
                                                            reg_indicator <- 1:length(xreg)
                                                          }

                                                          reg_lengths <- c(length(window_size_for_reg), length(window_type_for_reg), length(reg_indicator))
                                                          if (all(reg_lengths == 1 | reg_lengths == max(reg_lengths))) {
                                                            reg_param_setting <- data.frame("window_size_for_reg" = window_size_for_reg, "window_type_for_reg" = window_type_for_reg, "reg_indicator" = reg_indicator, stringsAsFactors = FALSE)
                                                          } else {
                                                            reg_param_setting <- expand.grid("window_size_for_reg" = window_size_for_reg, "window_type_for_reg" = window_type_for_reg, "reg_indicator" = reg_indicator, stringsAsFactors = FALSE)
                                                          }
                                                        }
                                                        param_uni_lst <- lapply(param_uni_lst, function(param_uni) {
                                                          for (j in names(additional_setting)[!(names(additional_setting) %in% c("include_response_window_size", "include_response_window_type", "window_type_for_reg", "window_size_for_reg"))]) {
                                                            methods::slot(param_uni, j) <- additional_setting[[j]]
                                                            if ((j %in% colnames(char)) | (j %in% colnames(other))) {
                                                              methods::slot(param_uni, j) <- c(methods::slot(param_uni, j), additional_setting[[j]])
                                                            } else {
                                                              methods::slot(param_uni, j) <- additional_setting[[j]]
                                                            }
                                                          }

                                                          if (!is.null(xreg)) {
                                                            param_uni@window_size_for_reg <- reg_param_setting[, "window_size_for_reg"]
                                                            param_uni@window_type_for_reg <- reg_param_setting[, "window_type_for_reg"]
                                                          }
                                                          return(param_uni)
                                                        })

                                                        if (!is.null(xreg)) {
                                                          if (is.matrix(xreg) | is.data.frame(xreg)) {
                                                            mapped_xreg <- stats::setNames(lapply(reg_param_setting[, "reg_indicator"], function(k) {
                                                              if (k == 0) {
                                                                x
                                                              } else {
                                                                xreg
                                                              }
                                                            }), paste(reg_param_setting[, "window_type_for_reg"], reg_param_setting[, "window_size_for_reg"], sep = "_"))
                                                          } else {
                                                            mapped_xreg <- stats::setNames(lapply(reg_param_setting[, "reg_indicator"], function(k) {
                                                              if (k == 0) {
                                                                x
                                                              } else {
                                                                xreg[[k]]
                                                              }
                                                            }), paste(reg_param_setting[, "window_type_for_reg"], reg_param_setting[, "window_size_for_reg"], sep = "_"))
                                                          }
                                                        } else {
                                                          mapped_xreg <- NULL
                                                        }

                                                        score_param_lst <- lapply(param_uni_lst, predicting_sim, x, mapped_xreg, start_point, wait_time, cores, write_type, plot_type, result_loc, as.character(class), get_representation(param_uni_lst[[1]], "char_con"))
                                                        final_result_df <- data.frame()
                                                        for (i in 1:length(score_param_lst)) {
                                                          param_uni_df <- methods::as(param_uni_lst[[i]], "data.frame")
                                                          score_param_df <- as.data.frame(score_param_lst[[i]])
                                                          score_param_df <- normalize_predict_info(param_uni_lst[[i]]@cut_off_prob, score_param_df)
                                                          final_result_df <- rbind(final_result_df, cbind(param_uni_df, score_param_df))
                                                        }

                                                        file_name <- as.character(Sys.time())
                                                        if ("charwise" %in% write_type & !("none" %in% write_type)) {
                                                          write_sim_result(final_result_df, "charwise", file_name, result_loc, as.character(class), get_representation(param_uni_lst[[1]], "char_con"))
                                                        }
                                                        if ("charwise" %in% plot_type & !("none" %in% plot_type)) {
                                                          plot_sim_charwise(final_result_df, file_name, result_loc, as.character(class), get_representation(param_uni_lst[[1]], "char_con"))
                                                        }
                                                        return(score_param_lst)
                                                      })
                     return(score_char_lst)
                     })
  return(score_all_lst)
}
carlonlv/DataCenterSim documentation built on Jan. 9, 2022, 3:26 p.m.