R/ts_cluster.R

Defines functions tidy.PartitionalTSClusters_exploratory exp_ts_cluster

#' Time series clustering by dtwclust.
#' @export
exp_ts_cluster <- function(df, time, value, category, time_unit = "day", fun.aggregate = sum, na_fill_type = "previous", na_fill_value = 0, max_category_na_ratio = 0.5,
                           variables = NULL, funs.aggregate.variables = NULL,
                           centers = 3L, with_centroids = FALSE, distance = "sdtw", centroid = "sdtw_cent",
                           roll_mean_window = NULL,
                           normalize = "none",
                           seed = 1,
                           output = "data",
                           stop_for_no_data = TRUE,
                           elbow_method_mode=FALSE,
                           max_centers = 10) {
  if(!is.null(seed)) {
    set.seed(seed)
  }

  if (centroid == "sdtw_cent" && distance == "dtw2") {
    # This combination throws error "x and y must be of same type". TODO: Look into it.
    stop("'Soft-DTW Centroids' is not supported with 'DTW with L2 Norm'. Please select other methods.")
  }

  time_col <- tidyselect::vars_select(names(df), !! rlang::enquo(time))
  value_col <- if (missing(value)) {
    # Using empty string instead of NULL, because using NULL here would cause error from UQ(rlang::sym(value_col)),
    # which seems to be evaluated as soon as the parent function is called, which is before if-condition for it to be not NULL is evaluated.
    # (rlang::sym("") does not seem to throw error unlike rlang::sym(NULL).)
    ""
  }
  else {
    tidyselect::vars_select(names(df), !! rlang::enquo(value))
  }
  # Handle the case where NULL was specified for value argument. We handle this case this way because is.null(value) throws error when value actuall has value.
  if (length(value_col) == 0) {
    value_col = ""
  }
  category_col <- tidyselect::vars_select(names(df), !! rlang::enquo(category))

  # Copied from do_prophet.
  if (time_unit == "min") {
    time_unit <- "minute"
  }
  else if (time_unit == "sec") {
    time_unit <- "second"
  }

  # remove rows with NA time
  df <- df[!is.na(df[[time_col]]), ]

  # Compose arguments to pass to dplyr::summarise.
  summarise_args <- list() # default empty list
  if (!is.null(variables) && !is.null(funs.aggregate.variables)) {
    summarise_args <- purrr::map2(funs.aggregate.variables, variables, function(func, cname) {
      # For common functions that require na.rm=TRUE to handle NA, add it.
      if (is_na_rm_func(func)) {
        quo(UQ(func)(UQ(rlang::sym(cname)), na.rm=TRUE))
      }
      else {
        quo(UQ(func)(UQ(rlang::sym(cname))))
      }
    })

    # Set final output column names.
    if (!is.null(names(variables))) {
      names(summarise_args) <- names(variables)
    }
    else {
      names(summarise_args) <- variables
    }
  }

  model_df <- df %>% dplyr::nest_by() %>% dplyr::ungroup() %>%
    dplyr::mutate(model = purrr::map(data, function(df) {
      # Floor date. The code is copied form do_prophet.
      df[[time_col]] <- if (time_unit %in% c("day", "week", "month", "quarter", "year")) {
        # Take care of issue that happened in anomaly detection here for prophet too.
        # In this case, convert (possibly) from POSIXct to Date first.
        # If we did this without converting POSIXct to Date, floor_date works, but later at complete stage,
        # data on day-light-saving days would be skipped, since the times seq.POSIXt gives and floor_date does not match.
        # We give the time column's timezone to as.Date, so that the POSIXct to Date conversion is done
        # based on that timezone.
        lubridate::floor_date(as.Date(df[[time_col]], tz = lubridate::tz(df[[time_col]])), unit = time_unit)
      } else {
        lubridate::floor_date(df[[time_col]], unit = time_unit)
      }
      renamed_df <- if (value_col != "") {
        df %>% dplyr::rename(
            time = UQ(rlang::sym(time_col)),
            value = UQ(rlang::sym(value_col)),
            category = UQ(rlang::sym(category_col))
          )
      }
      else {
        df %>% dplyr::rename(
            time = UQ(rlang::sym(time_col)),
            category = UQ(rlang::sym(category_col))
          )
      }

      # Summarize
      grouped_df <- renamed_df %>% dplyr::group_by(category, time)
      if (value_col == "") {
        df <- grouped_df %>% 
          dplyr::summarise(value = n(), !!!summarise_args)
      }
      else if (is_na_rm_func(fun.aggregate)) {
        df <- grouped_df %>% 
          dplyr::summarise(value = fun.aggregate(value, na.rm=TRUE), !!!summarise_args)
      }
      else {
        df <- grouped_df %>% 
          dplyr::summarise(value = fun.aggregate(value), !!!summarise_args)
      }
      df <- df %>% dplyr::ungroup()
      df_summarised <- df
      df <- df %>% dplyr::select(time, value, category)
      # Pivot wider
      df <- df %>% tidyr::pivot_wider(names_from="category", values_from="value")

      # If there is columns like "Centroid 1" in the input, which would mess up the process from here, silently delete them.
      # TODO: When we have the ability to show warning without stopping the entire process, show this.
      df <- df %>% dplyr::select(-matches("^Centroid [0-9]+$"))

      # Complete the time column.
      df <- df %>% complete_date("time", time_unit = time_unit)
      orig_n_categories <- length(colnames(df)) - 1 # -1 for time column.
      # Drop columns (represents category) that has more NAs than max_category_na_ratio, considering them to have not enough data.
      df <- df %>% dplyr::select_if(function(x){sum(is.na(x))/length(x) < max_category_na_ratio})
      if (length(colnames(df)) <= centers) {
        if (stop_for_no_data) {
          stop("EXP-ANA-2 :: [] :: There is not enough data left after removing high NA ratio data.")
        }
        # For Analytics View, keep going to show at least the diagnostic chart to show where NAs are.
        model <- list()
        # Pass original data.
        # - So that we can generate diagnostic chart about where NAs are.
        attr(model, "aggregated_data") <- df_summarised
        attr(model, "error") <- "There is not enough data left after removing high NA ratio data."
        class(model) <- "PartitionalTSClusters_exploratory"
        return(model)
      }
      # Fill NAs in time series
      df <- df %>% dplyr::mutate(across(-time, ~fill_ts_na(.x, time, type = na_fill_type, val = na_fill_value)))

      if (!is.null(roll_mean_window) && roll_mean_window > 1) {
        if (nrow(df) - (roll_mean_window - 1) < 1) {
          stop("EXP-ANA-4 :: [] :: The window size of moving average is too long for this data.")
        }
        df <- df %>% dplyr::mutate(across(-time, ~RcppRoll::roll_mean(.x, n = roll_mean_window, fill = NA, align="right")))
        df <- df %>% tail(-(roll_mean_window - 1)) # Remove NA rows created at the beginning of df.
      }

      time_values <- df$time
      df <- df %>% dplyr::select(-time)
      if (normalize != "none") {
        df_before_normalize <- df
      }
      switch (normalize,
        center_and_scale = {
          df <- df %>% dplyr::mutate(across(everything(), ~normalize(.x, center=TRUE, scale=TRUE)))
        },
        center = {
          df <- df %>% dplyr::mutate(across(everything(), ~normalize(.x, center=TRUE, scale=FALSE)))
        },
        scale = {
          df <- df %>% dplyr::mutate(across(everything(), ~normalize(.x, center=FALSE, scale=TRUE)))
        }
      )
      if (distance %in% c("dtw_lb", "lbk", "lbi")) { # Distance algorithms that require window.size.
        window_size <- as.integer(nrow(df) * 0.1) # Let's use 10% of length of data as the default window size.
        if (window_size < 1) { # window.size should be at least 1.
          window_size <- 1L
        }
        if (!elbow_method_mode) {
          model <- dtwclust::tsclust(t(as.matrix(df)), k = centers, distance = distance, centroid = centroid,
                                     args = dtwclust::tsclust_args(dist = list(window.size = window_size)))
          model <- list(model = model) # Since the original model is S4 object, we create an S3 object that wraps it.
        }
        else { # Elbow method mode. Create a list of models.
          n_centers <- 2:max_centers
          models <- n_centers %>% purrr::map(function(n_center) {
            dtwclust::tsclust(t(as.matrix(df)), k = centers, distance = distance, centroid = centroid,
                              args = dtwclust::tsclust_args(dist = list(window.size = window_size)))
          })
          model <- list(models = models, n_centers = n_centers) # Since the original model is S4 object, we create an S3 object that wraps it.
        }
      }
      else {
        if (!elbow_method_mode) {
          model <- dtwclust::tsclust(t(as.matrix(df)), k = centers, distance = distance, centroid = centroid)
          model <- list(model = model) # Since the original model is S4 object, we create an S3 object that wraps it.
        }
        else { # Elbow method mode. Create a list of models.
          n_centers <- 2:max_centers
          models <- n_centers %>% purrr::map(function(n_center) {
            dtwclust::tsclust(t(as.matrix(df)), k = n_center, distance = distance, centroid = centroid)
          })
          model <- list(models = models, n_centers = n_centers) # Since the original model is S4 object, we create an S3 object that wraps it.
        }
      }
      attr(model, "time_col") <- time_col
      attr(model, "value_col") <- value_col
      attr(model, "category_col") <- category_col
      attr(model, "orig_n_categories") <- orig_n_categories
      attr(model, "time_values") <- time_values
      if (!is.null(variables)) {
        attr(model, "variable_cols") <- variables 
      }
      # Pass original data.
      # - So that the output has other variables too.
      # - So that we can generate diagnostic chart about where NAs are.
      attr(model, "aggregated_data") <- df_summarised
      if (normalize != "none") {
        attr(model, "before_normalize_data") <- df_before_normalize
      }
      class(model) <- "PartitionalTSClusters_exploratory"
      model
    }))
  model_df <- model_df %>% rowwise()
  if (output == "data") {
    model_df %>% tidy_rowwise(model, with_centroids = with_centroids)
  }
  else { # output == "model"
    model_df
  }
}

#' Extracts results from the model as a data frame.
#' The output is original long-format set of time series with Cluster column.
#' @export
tidy.PartitionalTSClusters_exploratory <- function(x, with_centroids = TRUE, type = "result", with_before_normalize_data = TRUE) {
  model <- x$model
  switch(type,
    result = {
      if (is.null(model)) {
        return(tibble::tibble()) # Return empty data frame to show "No Data" screen.
      }
      # Create map of time series names to clustering results
      cluster_map <- model@cluster
      cluster_map_names <- names(model@datalist)
      if (with_centroids) {
        for (i in 1:(model@k)) {
          cluster_map <- c(cluster_map, i)
          cluster_map_names <- c(cluster_map_names, paste0("Centroid ",i))
        }
      }
      names(cluster_map) <- cluster_map_names

      res <- tibble::as_tibble(model@datalist)
      res <- res %>% dplyr::mutate(time=!!attr(x,"time_values"))
      # Add centroids data
      if (with_centroids) {
        for (i in 1:(model@k)) {
          res <- res %>% dplyr::mutate(!!rlang::sym(paste0("Centroid ",i)):=model@centroids[[i]])
        }
      }
      res <- res %>% tidyr::pivot_longer(cols = -time)

      orig_df <- attr(x, "before_normalize_data")
      if (!is.null(orig_df) && with_before_normalize_data) { # If normalization was done and we want to show the result with before-normalize data.
        orig_df <- orig_df %>% dplyr::mutate(time=!!attr(x,"time_values"))
        orig_df <- orig_df %>% tidyr::pivot_longer(cols = -time)
        res <- res %>% dplyr::rename(value_normalized=value) # The value we have now in res is normalized one. Rename it, and get the one without normalization from orig_df.
        res <- res %>% dplyr::left_join(orig_df, by=c("time"="time", "name"="name"))
        res <- res %>% dplyr::relocate(value, .before=value_normalized) # Adjust column order.
      }

      if (!is.null(attr(x, "variable_cols"))) { # If there are other columns to keep in the result, join them.
        aggregated_data <- attr(x, "aggregated_data")
        aggregated_data <- aggregated_data %>% dplyr::select(-value) # Drop value column from aggregated_data since res already has it.
        res <- res %>% dplyr::left_join(aggregated_data, by=c("time"="time", "name"="category"))
      }
      res <- res %>% dplyr::mutate(Cluster = cluster_map[name])
      value_col <- attr(x, "value_col")
      if (value_col == "") {
        res <- res %>% dplyr::rename(!!rlang::sym(attr(x,"time_col")):=time,
                                     Number_of_Rows=value,
                                     !!rlang::sym(attr(x,"category_col")):=name)
        if (!is.null(orig_df) && with_before_normalize_data) { # If normalization was done and we want to show the result with before-normalize data.
          res <- res %>% dplyr::rename(Number_of_Rows_normalized=value_normalized)
        }
      }
      else {
        res <- res %>% dplyr::rename(!!rlang::sym(attr(x,"time_col")):=time,
                                     !!rlang::sym(value_col):=value,
                                     !!rlang::sym(attr(x,"category_col")):=name)
        if (!is.null(orig_df) && with_before_normalize_data) { # If normalization was done and we want to show the result with before-normalize data.
          res <- res %>% dplyr::rename(!!rlang::sym(paste0(value_col,"_normalized")):=value_normalized)
        }
      }
    },
    aggregated = { # Return raw aggretated time series data before filling NAs and feeding to the clustering algorithm. This is for Data Validation tab.
      res <- attr(x, "aggregated_data")
    },
    summary = {
      if (is.null(model)) {
        return(tibble::tibble()) # Return empty data frame to show "No Data" screen.
      }
      res <- model@clusinfo
      res <- res %>% dplyr::mutate(cluster = row_number()) %>% select(cluster, everything())
      orig_n_categories <- attr(x, "orig_n_categories")
      n_categories <- sum(res$size, na.rm=TRUE)
      if (!is.null(orig_n_categories) && orig_n_categories > n_categories) { # Add a row for categories removed at preprocessing.
        tmp_df <- tibble::tibble(size = orig_n_categories - n_categories, Note = "Removed due to high NA ratio.")
        res <- res %>% dplyr::bind_rows(tmp_df)
      }
    },
    elbow_method = {
      if (is.null(model)) {
        return(tibble::tibble()) # Return empty data frame to show "No Data" screen.
      }
      res <- purrr::map(x$models, function(model) {
        df <- model@clusinfo
        df <- df %>% dplyr::summarize(av_dist=sum(size*av_dist)/sum(size))
        df <- df %>% dplyr::mutate(iter=model@iter, converged=model@converged)
        df
      })
      res <- tibble::tibble(n_center=x$n_centers, data=res)
      res <- res %>% tidyr::unnest(data)
    }
  )
  res
}
exploratory-io/exploratory_func documentation built on April 23, 2024, 9:15 p.m.