R/dfg_tools.R

Defines functions dfg.historic.sliding dfg.historic.old dfg.periodic.sparklyr dfg.periodic.old dfg.periodic last.narm

# dfg_tools (Dynamic Feature Generator Tools)

# Standard Feature Names:

# event-based features:
# <eventTypes>_<attributes>_<periodicAggregator>_<windowType>_<historicAggregator>_<lag>
# Example:
# transactionReported_amount_SUMG0SUML5

last.narm = function(x){dplyr::last(x[!is.na(x)])}

AGGREGATOR_FUNCTION = list(sum = sum, avg = mean, mean = mean, gavg = rutils::geomean, eavg = rutils::expmean, prd = prod, median = median, med = median, sd = sd, sdv = sd, var = var, max = max, min = min, last = dplyr::last, count = length)

CUMULATIVE_AGGREGATOR_FUNCTION = list(last = last.narm,
                      sum = cumsum, max = cummax, min = cummin, avg = dplyr::cummean,
                      med = cumstats::cummedian, var = cumstats::cumvar,
                      gavg = cumstats::cumgmean, count = seq_along)

# Tools to map event-log into training dataset for machine learning:
# MLMpaaer.periodic converts an eventlog into a mix of multivariate time series.  Each caseID, will have a time series for itself.
dfg.periodic = function(eventlog, features, period = c('hour', 'day', 'week', 'month', 'year'), start = NULL, end = NULL){
  period = match.arg(period)
  eventlog$eventTime %<>% as.POSIXct %>% lubridate::force_tz('GMT')
  if(!is.null(start)){start %<>% as.POSIXct %>% lubridate::force_tz('GMT'); eventlog %<>% filter(eventTime > start - 1)}
  if(!is.null(end))  {end   %<>% as.POSIXct %>% lubridate::force_tz('GMT'); eventlog %<>% filter(eventTime < end + 1)}

  EL %<>% mutate(periodStart = eventTime %>% lubridate::floor_date(period), selected = T)
  molten = data.frame()
  for (ft in features){
    if(!is.null(ft$eventTypes)){
      EL$selected = EL$eventType %in% ft$eventTypes
    }

    if(!is.null(ft$variables)){
      EL$selected = EL$selected & (EL$variable %in% ft$variables)
    }
    EL %>% filter(selected) %>% group_by(caseID, periodStart) %>%
      summarise(aggrval = do.call(ft$aggregator, list(value))) %>%
      mutate(featureName = ft$name) %>%
      bind_rows(molten) -> molten
  }

  tt = data.frame(periodStart = seq(from = min(EL$periodStart), to = max(EL$periodStart), by = 'day'))
  data.frame(caseID = unique(eventlog$caseID)) %>% group_by(caseID) %>% do({data.frame(caseID = .$caseID, periodStart = tt)}) %>%
    left_join(molten %>% reshape2::dcast(caseID + periodStart ~ featureName, sum, value.var = 'aggrval'), by = c('caseID', 'periodStart')) -> molten
  molten[is.na(molten)] <- 0
  names(molten)[2] <- 'time'
  return(molten)
}

dfg.periodic.old = function(eventlog, features, period = c('hour', 'day', 'week', 'month', 'year'), start = NULL, end = NULL){
  period = match.arg(period)
  eventlog$eventTime %<>% as.POSIXct %>% lubridate::force_tz('GMT')
  if(!is.null(start)){start %<>% as.POSIXct %>% lubridate::force_tz('GMT'); eventlog %<>% filter(eventTime > start - 1)}
  if(!is.null(end))  {end   %<>% as.POSIXct %>% lubridate::force_tz('GMT'); eventlog %<>% filter(eventTime < end + 1)}

  eventlog %<>% mutate(periodStart = eventTime %>% lubridate::floor_date(period), selected = T)
  molten = data.frame()
  for (ft in features){
    if(!is.null(ft$eventTypes)){
      eventlog$selected = eventlog$eventType %in% ft$eventTypes
    } else eventlog$selected = TRUE

    if(!is.null(ft$variables)){
      eventlog$selected = eventlog$selected & (eventlog$variable %in% ft$variables)
    }
    eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>%
      summarise(aggrval = do.call(ft$aggregator, list(value))) %>%
      mutate(featureName = ft$name) -> mlt

    if(nrow(mlt) > 0){
      molten %<>% bind_rows(mlt)
    }
  }

  tt = data.frame(periodStart = seq(from = min(eventlog$periodStart), to = max(eventlog$periodStart), by = period))
  data.frame(caseID = unique(eventlog$caseID)) %>% group_by(caseID) %>% do({data.frame(caseID = .$caseID, periodStart = tt)}) %>%
    left_join(molten %>% reshape2::dcast(caseID + periodStart ~ featureName, sum, value.var = 'aggrval'), by = c('caseID', 'periodStart')) -> molten
  molten[is.na(molten)] <- 0
  names(molten)[2] <- 'time'
  return(molten)
}

# currently, only works for daily
dfg.periodic.sparklyr = function(eventlog, features, period = 'day', start = NULL, end = NULL){
  period = match.arg(period)
  if(!is.null(start)){start %<>% as.POSIXct %>% lubridate::force_tz('GMT'); eventlog %<>% filter(eventTime > start - 1)}
  if(!is.null(end))  {end   %<>% as.POSIXct %>% lubridate::force_tz('GMT'); eventlog %<>% filter(eventTime < end + 1)}
  eventlog %<>% mutate(periodStart = as.Date(eventTime))
  molten = data.frame()
  for(ft in config$features){
    cat('Building PAF ', ft$name, '\n')
    if(!is.null(ft$eventTypes)){
      eventlog %<>% mutate(selected = eventType %in% ft$eventTypes)
    } else {
      eventlog %<>% mutate(selected = TRUE)
    }

    if(!is.null(ft$variables)){
      eventlog %<>% mutate(selected = selected & (variable %in% ft$variables))
    }

    switch(ft$aggregator,
           'mean' = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = AVG(value, na.rm = T)) %>% mutate(featureName = ft$name) %>% collect},
           'sum'  = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = SUM(value, na.rm = T)) %>% mutate(featureName = ft$name) %>% collect},
           'first' = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = first_value(value)) %>% mutate(featureName = ft$name) %>% collect},
           'last' = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = last_value(value)) %>% mutate(featureName = ft$name) %>% collect},
           'count'  = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = COUNT(value)) %>% mutate(featureName = ft$name) %>% collect},
           'max'  = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = MAX(value)) %>% mutate(featureName = ft$name) %>% collect},
           'sd'  = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = sd(value)) %>% mutate(featureName = ft$name) %>% collect},
           'min'  = {eventlog %>% filter(selected) %>% group_by(caseID, periodStart) %>% summarise(aggrval = MIN(value)) %>% mutate(featureName = ft$name) %>% collect}
    ) -> mlt

    if(nrow(mlt) > 0){
      molten %<>% bind_rows(mlt)
    }
  }

  tt = data.frame(periodStart = seq(from = min(molten$periodStart), to = max(molten$periodStart), by = 'day'))
  data.frame(caseID = unique(eventlog$caseID)) %>% group_by(caseID) %>% do({data.frame(caseID = .$caseID, periodStart = tt)}) %>%
    left_join(molten %>% reshape2::dcast(caseID + periodStart ~ featureName, sum, value.var = 'aggrval'), by = c('caseID', 'periodStart')) -> molten
  molten[is.na(molten)] <- 0
  names(molten)[2] <- 'time'
  return(molten)
}



# Only for moving windows not growing
dfg.historic.old = function(periodic, features, early_call = T){
  drops = character()
  for (ft in features){
    if(is.null(ft$drop_reference)){ft$drop_reference = F}
    periodic %>% nrow %>% sequence %>% 
      sapply(FUN = vect.sliding.backward, vector = periodic[, ft$reference], win_size = ft$win_size, fun = ft$aggregator, early_call = early_call) -> periodic[, ft$name]
    if(ft$drop_reference){drops = c(drops, ft$reference)}
  }

  return(periodic[, colnames(periodic) %>% setdiff(drops)])
}



# This function has been stolen from pracma::movavg with a modification: aggregator function can be passed as an argument and it only works for types s and t
# Types of available moving averages are:
#   
# s for "simple", it computes the simple moving average. n indicates the number of previous data points used with the current data point when calculating the moving average.
# 
# t for "triangular", it computes the triangular moving average by calculating the first simple moving average with window width of ceil(n+1)/2; then it calculates a second simple moving average on the first moving average with the same window size.
# 
# w for "weighted", it calculates the weighted moving average by supplying weights for each element in the moving window. Here the reduction of weights follows a linear trend.
# 
# m for "modified", it calculates the modified moving average. The first modified moving average is calculated like a simple moving average. Subsequent values are calculated by adding the new value and subtracting the last average from the resulting sum.
# 
# e for "exponential", it computes the exponentially weighted moving average. The exponential moving average is a weighted moving average that reduces influences by applying more weight to recent data points () reduction factor 2/(n+1); or
# 
# r for "running", this is an exponential moving average with a reduction factor of 1/n [same as the modified average?].
# if n = Inf, it comutes cumulative aggregator
# for types r, e, m, w, aggregator is not used at all.
#' @export
moving_aggregator = function (x, n = Inf, type = c("s", "t", "w", "m", "e", "r"), aggregator = mean){
  stopifnot(is.numeric(x), is.numeric(n), is.character(type))
  type = match.arg(type)
  if (length(n) != 1 || ceiling(n != floor(n)) || n <= 1) 
    stop("Window length 'n' must be a single integer greater 1.")
  nx <- length(x)
  n  <- min(n, nx - 1) %>% max(1)
  y  <- numeric(nx)
  if (type == "s") {
    for (k in 1:(n - 1)) y[k] <- aggregator(x[1:k])
    for (k in n:nx) y[k] <- aggregator(x[(k - n + 1):k])
  }
  else if (type == "t") {
    n <- ceiling((n + 1)/2)
    s <- moving_aggregator(x, n, "s", aggregator = aggregator)
    y <- moving_aggregator(s, n, "s", aggregator = aggregator)
  }
  else if (type == "w") {
    for (k in 1:(n - 1)) y[k] <- 2 * sum((k:1)*x[k:1])/(k*(k + 1))
    for (k in n:nx) y[k] <- 2*sum((n:1)*x[k:(k - n + 1)])/(n*(n + 1))
  }
  else if (type == "m") {
    y[1] <- x[1]
    for (k in 2:nx) y[k] <- y[k - 1] + (x[k] - y[k - 1])/n
  }
  else if (type == "e") {
    a <- 2/(n + 1)
    y[1] <- x[1]
    for (k in 2:nx) y[k] <- a * x[k] + (1 - a) * y[k - 1]
  }
  else if (type == "r") {
    a <- 1/n
    y[1] <- x[1]
    for (k in 2:nx) y[k] <- a * x[k] + (1 - a) * y[k - 1]
  }
  else stop("The type must be one of 's', 't', 'w', 'm', 'e', or 'r'.")
  return(y)
}

#' @export
dfg.historic.sliding = function(periodic, features, caseID_col = 'caseID', eventTime_col = 'eventTime'){
  drops = character()
  periodic %<>% mutate(.row = sequence(nrow(.))) %>% arrange(!!sym(caseID_col), !!sym(eventTime_col))
  
  #todo: use parallel computing  
  for (ft in features){
    if(is.null(ft$drop_reference)){ft$drop_reference = F}
    if(is.null(ft$aggregator)){ft$aggregator = mean}
    if(inherits(ft$aggregator, 'character')){ft$aggregator = AGGREGATOR_FUNCTION[[ft$aggregator]]}
    verify(ft$aggregator, 'function')
    
    if(is.null(ft$type)){ft$type = 's'}
    if(ft$win_size < 2){ft$win_size = 2}
    

    ft_fun = function(v) moving_aggregator(x = v, n = ft$win_size, type = ft$type, aggregator = ft$aggregator)

    periodic %>% pull(ft$reference) %>% ave(id = periodic %>% pull(caseID_col), FUN = ft_fun) -> periodic[, ft$name]
      
    if(ft$drop_reference){drops = c(drops, ft$reference)}
  }
  return(periodic[, colnames(periodic) %>% setdiff(drops)] %>% arrange(.row) %>% select(-.row))
}


#' @export
dfg.lagger = function(input, features, caseID_col = 'caseID', eventTime_col = 'eventTime'){
  drops = character()
  input %<>% mutate(.row = sequence(nrow(.))) %>% arrange(!!sym(caseID_col), !!sym(eventTime_col))
  
  #todo: use parallel computing  
  for (ft in features){
    if(is.null(ft$drop_reference)){ft$drop_reference = F}
    verify(ft$offset, c('integer', 'numeric'), default = 1)

    ft_fun = function(v) lag(v, ft$offset)
    input %>% pull(ft$reference) %>% ave(id = input %>% pull(caseID_col), FUN = ft_fun) -> input[[ft$name]]
    
    if(ft$drop_reference){drops = c(drops, ft$reference)}
  }
  return(input[colnames(input) %>% setdiff(drops)] %>% arrange(.row) %>% select(-.row))
}


dfg.labeler = function(periodic, features){
  drops = character()
  for (ft in features){
    if(is.null(ft$drop_reference)){ft$drop_reference = F}
    periodic %>% nrow %>% sequence %>% sapply(FUN = vect.sliding.forward, vector = periodic[, ft$reference], win_size = ft$win_size, fun = ft$aggregator) -> periodic[, ft$name]
    if(ft$drop_reference){drops = c(drops, ft$reference)}
  }
  return(periodic[, colnames(periodic) %>% setdiff(drops)])
}

# This function helps you build list of features to pass to argument 'features' of dfg.historic.sliding and dfg.labeler:
add_features = function(flist = list(), actions){
  for (act in actions){
    for (ft in act$features){
      flist %<>% c(act$win_sizes %>% lapply(function(x) list(name = paste(ft, act$label, sep = '.') %>% paste0(x) , aggregator = act$fun, reference = ft, win_size = x)))
    }
  }
  return(flist)
}


# todo: provide a link to the event-log description.
#' @title Dynamic Feature Generator
#' @description This function generates a package of multiple tables containing dynamic features 
#' extracted from a given eventlog.
#' @param el (data.frame) The input event-log table.
#' The event-log must have these columns. Column names must match exactly: 
#' - \code{caseID, eventType, eventTime, attribute, value}
#' evenTime must be of class Date, otherwise, modify accordingly.
#' @param period (character) Choose period of time for which is periodic transition system is required.
#' @param sequential (logical) Should the dynamic features be generated for a sequence of periodic times.
#' If set to \code{FALSE}, the dynamic features are generated only for the times at which an event has happend.
#' @param event_funs (character): Which type of features/labels do you want to generate from the occurrence of events?
#' For each event type in the event-log you can generate:
#' - \code{count} Count of events happened in the past within each period.
#' - \code{count_cumsum} Cumulative count of events happened in the past (prior to the current date).
#' - \code{elapsed} Time elapsed since last occurrence of the event.
#' - \code{tte} Time remained until the next event occurrence.
#' - \code{censored} Flag which is TRUE if the event occurrence after the current time has not been observed.
#' - \code{last} The latest value of the event
#' - \code{sum} The periodic sum of the values associated with the events
#' - \code{cumsum} The cumulative sum of the values associated with the events
#' - \code{max} The periodic maximum of the values associated with the events
#' - \code{cummax} The maximum of the values associated with the events since the start of observation
#' - \code{min} The periodic minimum of the values associated with the events
#' - \code{cummin} The minimum of the values associated with the events since the start of observation
#' @param event_funs (character): which types of features do you want to generate from the occurrence of events?
#' @export
generate_dynamic_features_pack = function(el, period = c('day', "week", "month", "quarter", "year", "sec", "min", "hour"), sequential = F,
                   event_funs = c('count', 'elapsed', 'tte', 'censored'), attr_funs = NULL, var_funs = NULL, target_horizon = NULL){
  cat('\n', "Verifications started ... ")
  period = match.arg(period)
  
  el %<>% as.data.frame %>% select(caseID, eventTime, eventType, attribute, value)
  el$value %<>% as.numeric

  wna = which(is.na(el$caseID) | is.na(el$eventTime))
  warnif(length(wna) > 0, paste(length(wna), 'rows', 'with missing caseID or eventTime are removed!'))
  if(length(wna) > 0){el = el[- wna, ]}
  
  wna = which(is.na(el$value))
  warnif(length(wna) > 0, paste(length(wna), 'values', 'missing or became missing through coercion replaced by zeros!'))
  if(length(wna) > 0){el$value[wna] <- 0}

  wna = which(is.na(el$attribute))
  warnif(length(wna) > 0, paste(length(wna), 'attributes', "missing replaced by 'occurrence'"))
  if(length(wna) > 0){el$attribute[wna] <- 'occurrence'}
  
  el$timestamp = el$eventTime
  
  cat("Done!", '\n')
  
  cat('\n', "Managing event times ... ")
  if(period %in% c('day', "week", "month", "quarter", "year")){
    el$eventTime = as.Date(el$eventTime)
    if(period != 'day'){
      el$eventTime %<>% cut(breaks = period) %>% as.Date
      if(period == 'week'){
        el$eventTime = el$eventTime + lubridate::weeks(1)
      } else if (period == 'month'){
        el$eventTime = el$eventTime + months(1)
      } else if (period == 'quarter'){
        el$eventTime = el$eventTime + months(3)
      } else {
      el$eventTime = el$eventTime + lubridate::years(1)
      }
    } else {
      el$eventTime = el$eventTime + lubridate::days(1)
    }
  } else {
    el$eventTime %<>% as.POSIXct
    if(period != 'sec'){
      el$eventTime %<>% cut(breaks = period)
      el$eventTime %<>% as.POSIXct
      if(period == 'min'){
        el$eventTime = el$eventTime + lubridate::minutes(1)
      } else{
        el$eventTime = el$eventTime + lubridate::hours(1)
      }
    } else {
      el$eventTime = el$eventTime + 1
    }
  }
  cat("Done!", '\n')

  out = list()
  cat('\n', "Creating time ends and event attribute maps ... ")
  el %>% group_by(caseID) %>% summarise(minTime = min(eventTime), maxTime = max(eventTime)) %>% ungroup -> out$case_timends
  el %>% distinct(eventType, attribute) -> out$event_attr_map
  cat("Done!", '\n')
  
  if(sequential){
    cat('\n', "Adding clock events ... ")
    
    # Option 1:
    # bb = out$case_timends %>% group_by(caseID) %>%
    #   do({data.frame(caseID = .$caseID, eventTime = seq(from = as.Date(.$minTime), to = as.Date(.$maxTime), by = period))})

    # Option 2: (Faster)
    lambda = function(v){seq(from = as.Date(v[2]), to = as.Date(v[3]), by = period)}
    out$case_timends %>% mutate(minTime = cut(minTime, breaks = period), maxTime = cut(maxTime, breaks = period)) %>%
      apply(1, lambda) -> a
    if(inherits(a, 'list')){
      names(a) <- out$case_timends$caseID
      purrr::map_dfr(names(a), .f = function(v) {data.frame(caseID = v, eventTime = a[[v]])}) -> bb
    } else {
      bb = out$case_timends %>% group_by(caseID) %>% 
        do({data.frame(caseID = .$caseID, eventTime = seq(from = as.Date(.$minTime), to = as.Date(.$maxTime), by = period))})
    }

    bb %>% anti_join(el %>% select(caseID, eventTime), by = c('caseID', 'eventTime')) %>%
      mutate(eventType = 'clock', attribute = 'counter', value = 1, timestamp = eventTime - 1) %>%
      select(caseID, eventType, eventTime, attribute, value, timestamp) %>%
      rbind(el) -> el
    
    cat("Done!", '\n')
  }

  if(length(event_funs) > 0){
    cat('\n', "Creating event count table ... ")
    el %>%
      # distinct(eventID, .keep_all = T) %>% 
      reshape2::dcast(caseID + eventTime ~ eventType, value.var = 'value', fun.aggregate = length) %>%
      arrange(caseID, eventTime) -> out$event_count

    cols = 3:ncol(out$event_count)
    cat("Done!", '\n')
  }

  if('count_cumsum' %in% event_funs){
    cat('\n', "Creating event count cumsum table ... ")
    out$event_count_cumsum = out$event_count %>% column.cumulative.forward(col = cols, id_col = 'caseID')
    cat("Done!", '\n')
  }

  if(sum(c('elapsed', 'tte', 'censored') %in% event_funs) > 0){
    cat('\n', "Creating event time table ... ")
    # out$event_time = out$event_count
    # tstr = as.character(out$event_time[, 'eventTime'])
    # out$event_time[, cols] %<>% as.matrix %>% apply(2, function(v) ifelse(v > 0, tstr, NA))
    el %>% mutate(timestamp = as.character(timestamp)) %>%
      reshape2::dcast(caseID + eventTime ~ eventType, value.var = 'timestamp', fun.aggregate = dplyr::first) %>%
      arrange(caseID, eventTime) -> out$event_time
    cat("Done!", '\n')
  }

  #### event elapsed
  if('elapsed' %in% event_funs){
    cat('\n', "Creating elapsed table ... ")
    
    out$event_time_elapsed <- out$event_time %>% column.feed.forward(col = cols, id_col = 'caseID')

    for(i in cols){
      out$event_time_elapsed[, i] = as.numeric(out$event_time_elapsed$eventTime - as.Date(out$event_time_elapsed[, i]))
    }
    cat("Done!", '\n')
  }

  #### event tte, censored
  if(('tte' %in% event_funs) | ('censored' %in% event_funs)){
    cat('\n', "Creating tte table ... ")
    
    out$event_tte <- out$event_time %>% column.feed.backward(col = cols, id_col = 'caseID')
    for(i in cols){
      ind = which(as.Date(out$event_tte[, i]) < out$event_tte$eventTime) %-% nrow(out$event_tte)
      out$event_tte[ind, i] <- out$event_tte[ind + 1, i]
      out$event_tte[, i]    <- as.numeric(as.Date(out$event_tte[, i]) - out$event_tte$eventTime) %>% {.[.<0]<-NA;.}
    }
    cat("Done!", '\n')
  }

  if('censored' %in% event_funs){
    cat('\n', "Creating censored table ... ")
    
    out$event_censored = out$event_tte
    out$event_censored[, cols] <- is.na(out$event_tte[, cols]) %>% as.numeric

    out$event_tte %<>% left_join(out$case_timends, by = 'caseID') %>% mutate(ttc = as.numeric(maxTime - eventTime))
    for(i in cols){
      wna = which(out$event_censored[, i] == 1)
      out$event_tte[wna, i] <- out$event_tte[wna, 'ttc']
    }
    out$event_tte %<>%  select(- minTime, - maxTime, -ttc)
    cat("Done!", '\n')
  }
  
  #### Periodic(PAFs) and Historic Aggregated Features (HAFs) for events:
  if('last' %in% event_funs){
    cat('\n', "Creating event last value table ... ")
    
    el %>% reshape2::dcast(caseID + eventTime ~ eventType, value.var = 'value', fun.aggregate = last) %>%
      column.feed.forward(col = 3:ncol(.), id_col = 'caseID') -> out$event_last

    cat("Done!", '\n')
  }
  
  if(sum(c('sum', 'sum_cumsum') %in% event_funs) > 0){
    cat('\n', "Creating event sum value table ... ")
    
    el %>% reshape2::dcast(caseID + eventTime ~ eventType, value.var = 'value', fun.aggregate = sum) -> out$event_sum
    cat("Done!", '\n')
  }
  
  if('sum_cumsum' %in% event_funs){
    cat('\n', "Creating event cumulative sum value table ... ")
    
    out$event_sum %>% column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID') -> out$event_sum_cumsum
    cat("Done!", '\n')
  }
  
  if(sum(c('max', 'max_cummax') %in% event_funs) > 0){
    cat('\n', "Creating event max value table ... ")
    
    el %>% reshape2::dcast(caseID + eventTime ~ eventType, value.var = 'value', fun.aggregate = max) -> out$event_max
    cat("Done!", '\n')
  }
  
  if('max_cummax' %in% event_funs){
    cat('\n', "Creating event cumulative max value table ... ")
    out$event_max %>%
      column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID', aggregator = cummax) %>%
      {.[. == -Inf]<-NA;.} -> out$event_max_cummax
    cat("Done!", '\n')
  }
  
  if(sum(c('min', 'min_cummin') %in% event_funs) > 0){
    cat('\n', "Creating event min value table ... ")
    el %>% reshape2::dcast(caseID + eventTime ~ eventType, value.var = 'value', fun.aggregate = min) -> out$event_min
    cat("Done!", '\n')
  }
  
  if('min_cummin' %in% event_funs){
    cat('\n', "Creating event cumulative min value table ... ")
    
    out$event_min %>%
      column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID', aggregator = cummin) %>%
      {.[. == Inf]<-NA;.} -> out$event_min_cummin
    cat("Done!", '\n')
  }
  
  #### Periodic(PAFs) and Historic Aggregated Features (HAFs) for attributes:
  
  if('last' %in% attr_funs){
    cat('\n', "Creating attribute last value table ... ")
    
    el %>% reshape2::dcast(caseID + eventTime ~ attribute, value.var = 'value', fun.aggregate = dplyr::last) %>%
      column.feed.forward(col = 3:ncol(.), id_col = 'caseID') -> out$attr_last
    cat("Done!", '\n')
  }

  if(sum(c('sum', 'sum_cumsum') %in% attr_funs) > 0){
    cat('\n', "Creating attribute sum value table ... ")
    el %>% reshape2::dcast(caseID + eventTime ~ attribute, value.var = 'value', fun.aggregate = sum) -> out$attr_sum
    cat("Done!", '\n')
  }

  if('sum_cumsum' %in% attr_funs){
    cat('\n', "Creating attribute cumulative sum value table ... ")
    out$attr_sum %>% column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID') -> out$attr_sum_cumsum
    cat("Done!", '\n')
  }

  if(sum(c('max', 'max_cummax') %in% attr_funs) > 0){
    cat('\n', "Creating attribute max value table ... ")
    el %>% reshape2::dcast(caseID + eventTime ~ attribute, value.var = 'value', fun.aggregate = max) -> out$attr_max
    cat("Done!", '\n')
  }

  if('max_cummax' %in% attr_funs){
    cat('\n', "Creating attribute cumulative max value table ... ")
    out$attr_max %>%
      column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID', aggregator = cummax) %>%
      {.[. == -Inf]<-NA;.} -> out$attr_max_cummax
    cat("Done!", '\n')
  }

  if(sum(c('min', 'min_cummin') %in% attr_funs) > 0){
    cat('\n', "Creating attribute min value table ... ")
    
    el %>% reshape2::dcast(caseID + eventTime ~ attribute, value.var = 'value', fun.aggregate = min) -> out$attr_min
    cat("Done!", '\n')
  }

  if('min_cummin' %in% attr_funs){
    cat('\n', "Creating attribute cumulative min value table ... ")
    out$attr_min %>%
      column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID', aggregator = cummin) %>%
      {.[. == Inf]<-NA;.} -> out$attr_min_cummin
    cat("Done!", '\n')
  }

  #### Periodic(PAFs) and Historic Aggregated Features (HAFs) for Variable:
  
  if(length(var_funs) > 0){
    el %<>% mutate(variable = paste(eventType, attribute, sep = '_'))
  }

  if('last' %in% var_funs){
    cat('\n', "Creating variable last value table ... ")
    el %>% reshape2::dcast(caseID + eventTime ~ variable, value.var = 'value', fun.aggregate = last) %>%
      column.feed.forward(col = 3:ncol(.), id_col = 'caseID') -> out$var_last
    cat("Done!", '\n')
  }

  if(sum(c('sum', 'sum_cumsum') %in% var_funs) > 0){
    cat('\n', "Creating variable sum value table ... ")
    el %>% reshape2::dcast(caseID + eventTime ~ variable, value.var = 'value', fun.aggregate = sum) -> out$var_sum
    cat("Done!", '\n')
  }

  if('sum_cumsum' %in% var_funs){
    cat('\n', "Creating variable cumulative sum value table ... ")
    out$var_sum %>% column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID') -> out$var_sum_cumsum
    cat("Done!", '\n')
  }

  if(sum(c('max', 'max_cummax') %in% var_funs) > 0){
    cat('\n', "Creating variable max value table ... ")
    el %>% reshape2::dcast(caseID + eventTime ~ variable, value.var = 'value', fun.aggregate = max) -> out$var_max
    cat("Done!", '\n')
  }

  if('max_cummax' %in% var_funs){
    cat('\n', "Creating variable cumulative max value table ... ")
    out$var_max %>%
      column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID', aggregator = cummax) %>%
      {.[. == -Inf]<-NA;.} -> out$var_max_cummax
    cat("Done!", '\n')
  }

  if(sum(c('min', 'min_cummin') %in% var_funs) > 0){
    cat('\n', "Creating variable min value table ... ")
    
    el %>% reshape2::dcast(caseID + eventTime ~ variable, value.var = 'value', fun.aggregate = min) -> out$var_min
    cat("Done!", '\n')
  }

  if('cummin' %in% var_funs){
    cat('\n', "Creating variable cumulative min value table ... ")
    
    out$var_min %>%
      column.cumulative.forward(col = 3:ncol(.), id_col = 'caseID', aggregator = cummin) %>%
      {.[. == Inf]<-NA;.} -> out$var_min_cummin
    cat("Done!", '\n')
  }
  
  if(!is.null(target_horizon)){
    cat('\n', "Creating event label table ... ")
    out %<>% add_event_labels(target_horizon %>% verify(c('integer', 'numeric'), lengths = 1, domain = c(1, Inf)))
    cat("Done!", '\n')
  }
  
  return(out %>% standard_dfgpack_feature_names)
}

standard_dfgpack_feature_names = function(dfgpack){
  for(tn in names(dfgpack) %-% c('extras', 'case_timends', 'event_attr_map')){
    suffix = strsplit(tn, '_')[[1]] %-% c('event', 'attr', 'var') %>% paste(collapse = '_')
    colnames(dfgpack[[tn]]) <- c('caseID', 'eventTime', colnames(dfgpack[[tn]])[-c(1,2)] %>% paste(suffix, sep = '_'))
  }
  return(dfgpack)
}

add_event_labels = function(dfgpack, horizon){
  cols = 3:ncol(dfgpack$event_tte)
  event_label = dfgpack$event_tte
  # I don't recall why I added extra condition: (dfgpack$event_tte[, cols] > 0). It fails when tte = 0 
  # event_label[, cols] <- as.numeric((dfgpack$event_censored[, cols] == 0) & (dfgpack$event_tte[, cols] < horizon) & (dfgpack$event_tte[, cols] > 0))
  event_label[, cols] <- as.numeric((dfgpack$event_censored[, cols] == 0) & (dfgpack$event_tte[, cols] < horizon))
  dfgpack$event_label = event_label
  names(dfgpack$event_label) %<>% stringr::str_replace(pattern = '_tte', '_label') 
  return(dfgpack)
}


# The event-based dynamic features, are generated for atomic events.
# For an interval event-type with start and stop, use this mapper.
# This function works only for one type of interval event which has start and end.
# so start and end of one event must have identical event IDs.
# Column eventType of the eventlog, must indicate if the event has started or ended. (Default tags are 'started', 'ended'. Specify if different tags are used in the dataset)
ml_event_interval = function(
  eventlog, period = 'day', sequential = F, caseID_col = 'caseID', eventID_col = 'eventID', eventTime_col = 'eventTime', eventType_col = 'eventType',
  startTags = 'started', endTags = 'ended'){

  eventlog %<>% nameColumns(columns = c(eventID = eventID_col, caseID = caseID_col, eventTime = eventTime_col, eventType = eventType_col))
  startTags %<>% verify('character', default = 'started')
  endTags   %<>% verify('character', default = 'ended')
  standradEventType <- c(rep('started', length(startTags)), rep('ended', length(endTags)))
  names(standradEventType) <- c(startTags, endTags) %>% tolower
  eventlog$eventType       <- standradEventType[eventlog$eventType %>% tolower]

  sna = sum(is.na(eventlog$type))
  warnif(sna > 0, paste('eventType tags do not match for', sna, 'rows!'))

  eventlog %>% group_by(caseID) %>%
    mutate(eventNo = paste0('E', eventID %>% as.character %>% as.factor %>% as.integer)) %>%
    mutate(eventType = paste0(eventNo, eventType)) %>%
    select(-eventID, -eventNo) %>%
    dfg_pack(period = period, event_funs = c('count_cumsum', 'elapsed'), sequential = sequential) -> pack

  colnames(pack$event_count_cumsum) %>% charFilter('started') -> start_events
  start_events %>% gsub(pattern = 'started', replacement = 'ended') -> end_events

  # current count of active events distributed by event bumber:
  en_active_count  = pack$event_count_cumsum[start_events] - pack$event_count_cumsum[end_events]
  # en_active_status = (en_active_count)

  # Current count of events active for each case (similar to count of active tasks):
  countActiveEvents = pack$event_count_cumsum %>% select(caseID, eventTime) %>%
    cbind(numEventsActive = rowSums(en_active_count, na.rm = T))


  # Current duration of active events (age of active events)
  en_active_age = pack$event_elapsed[start_events]
  en_active_age[en_active_count == 0] <- 0
  en_active_age[en_active_age < 0] <- 0 # There must be no rows with negative age, but I added in case
  caseActiveAge = pack$event_count_cumsum %>% select(caseID, eventTime) %>%
    cbind(activeAge = en_active_age %>% apply(1, max, na.rm = T))

  # a case is active when at least one event belonging to that case is active:

  countActiveEvents %>% left_join(caseActiveAge, by = c('caseID', 'eventTime')) %>% mutate(status = numEventsActive > 0)
}

# ml_sliding = function(dfg_pack, table_funs = list(event_count = c('sum90', 'mean120'), attr_max = c('mean15', 'max180'))){
#   funs = table_funs$event_count %>% gsub(pattern = "[0-9]", replacement = "")
#   nums = table_funs$event_count %>% gsub(pattern = "[a-z, A-Z]", replacement = "")
#   cols = 3:ncol(dfg_pack$event_count)
#   
#   tn = 'event_count' %>% paste(table_funs$event_count[1], sep = '_')
#   out[[tn]] <- dfg_pack$event_count
#   for(i in 1:cols){
#     which(dfg_pack$event_elapsed[, i] < nums[1]) -> ind
#     dfg_pack$event_elapsed[ind, i] %>% ave(id = dfg_pack$event_elapsed[ind, 'caseID'], FUN = validfun[funs[1]])    
#   }
# }

# This module, runs a predictive model using each history table in the dfg-pack as training data and 
# ranks features by importance and measures model performance. Features of importance greater than
# argument 'importance_threshold' from models with performance greater than 'performance_threshold'
# are selected and a single dataset is returned.
extract_features_from_dfgpack = function(dfgpack, train_from, train_to, test_from, test_to, label_event, importance_threshold, performance_threshold, performance_metric = 'gini', silent = T){
  models = list()
  
  ind_train = which((dfgpack$event_label$eventTime >= train_from) & (dfgpack$event_label$eventTime <= train_to))
  ind_test  = which((dfgpack$event_label$eventTime >= test_from) & (dfgpack$event_label$eventTime <= test_to))
  
  label_col = label_event %>% paste('label', sep = '_') 
  y_train   = dfgpack$event_label[ind_train,] %>% pull(label_col)
  y_test    = dfgpack$event_label[ind_test,]  %>% pull(label_col)
  
  history_tables = names(dfgpack) %-% c('case_timends', 'event_attr_map', 'event_time', 'event_tte', 'event_censored', 'event_label')

  ## Train models and measure performances:
  perf = numeric()
  for(tn in history_tables){
    X_train = dfgpack[[tn]][ind_train,] %>% select(-caseID, -eventTime)
    X_test  = dfgpack[[tn]][ind_test,]  %>% select(-caseID, -eventTime)
    
    mdl = new('CLS.XGBOOST', nthread = as.integer(4), rfe.enabled = T, name = tn)
    mdl$fit(X_train, y_train)
    
    perf[tn]     <- mdl$performance(X_test, y_test, metric = performance_metric)
    models[[tn]] <- mdl
  }
  
  ## Extract important features from well-performed models:
  # fl: feature list
  fl = NULL
  for(tn in names(perf[perf > performance_threshold])){
    bfet = models[[tn]]$objects$features %>% 
      filter(importance > importance_threshold) %>% 
      mutate(performance = perf[tn]) %>% 
      select(fname, importance, performance)
    if(!silent){
      cat('\n', 'Table ',tn, ':', '\n')
      print(bfet)
    }
    fl %<>% rbind(bfet)
  }
  
  tn = history_tables[1]
  mldata <- dfgpack[[tn]][fl$fname %^% colnames(dfgpack[[tn]])]
  for(tn in history_tables[-1]){
    cn = fl$fname %^% colnames(dfgpack[[tn]])
    dfgpack[[tn]][cn] %>% cbind(mldata) -> mldata
  }
  
  return(mldata)
}

# This function enrichs the given dfg package by adding various sliding window features to it.
#' @export add_swf
add_swf = function(dfgpack, tables = NULL, aggregators = NULL, win_sizes = NULL, types = 's'){
  if(is.empty(win_sizes)) return(dfgpack)
  
  aggr_list = list(sum = sum, avg = mean, med = median, min = min, max = max, sdv = sd)
  
  tables %<>% verify('character', domain = names(dfgpack), default = 
                       names(dfgpack) %-% c('case_timends', 'event_attr_map', 'event_time', 'event_tte', 'event_censored', 'event_label'))

  aggregators <- verify(aggregators, 'character', domain = names(aggr_list), default = 'sum')

  for(tn in tables){
    cat('\n', 'Adding sliding window features for table: ', tn, ' ... ')
    features = list()
    for(en in colnames(dfgpack[[tn]]) %-% c('caseID', 'eventTime', 'clock')){
      for(tp in intersect(types, c('s', 't'))){
        for(ag in aggregators){
          for(i in win_sizes){
            fn = sprintf('%s_%s%s%s', en, ag, chif(tp == 's', '', tp), i)
            features[[fn]] <- list(name = fn, reference = en, win_size = i, aggregator = aggr_list[[ag]], type = tp)
          }
        }
      }
      for(tp in intersect(types, c('w', 'm', 'e', 'r'))){
        for(i in win_sizes){
          fn = sprintf('%s_avg%s%s', en, tp, i)
          features[[fn]] <- list(name = fn, reference = en, win_size = i, type = tp)
        }
      }
    }
    dfgpack[[tn]] %<>% dfg.historic.sliding(features)
    cat('Done!', '\n')
  }
  
  return(dfgpack)
}

create_feature_trend = function(dfgpack, table, features, label_event, aggregator = mean, remove_censored = F){
  if(remove_censored){
    tbr = which(dfgpack$event_censored[, label_event] == 0)
    dfgpack[[table]] = dfgpack[[table]][tbr,]
    dfgpack[['event_label']] = dfgpack[['event_label']][tbr,]
  }
  stopifnot(table %in% names(dfgpack))
  val = dfgpack[[table]][features]
  if(length(features) > 1) val %<>% rowSums(na.rm = T)
  dfgpack[[table]][, 'value'] = val
  dfgpack[[table]][, 'label'] = ifelse(dfgpack$event_label[, label_event] == 1, 'Yes', 'No')
  
  dfgpack[[table]] %>% group_by(eventTime, label) %>%
    summarise(value = aggregator(value)) %>%
    reshape2::dcast(eventTime ~ label) %>% na2zero %>%
    arrange(eventTime)
}

dfg_pack.remove_clock_events = function(pack){
  for(tn in names(pack)){
    cns = colnames(pack[[tn]])
    pack[[tn]] <- pack[[tn]][cns %-% charFilter(cns, 'clock')]
  }
  return(pack)
}


## This function generates a sql query that gives you the periodic aggregated features from an event-log
# todo: this function is under construction and not developed yet.
dfg.periodic.sql = function(eventlog_table_name, features, period = c('hour', 'day', 'week', 'month', 'year'), start = NULL, end = NULL){
  period = match.arg(period)

  sql_filter = ''
  
  if(!is.null(start)){start %<>% as.POSIXct %>% lubridate::force_tz('GMT'); sql_filter %<>% paste("WHERE eventTime > CAST(%s, DATE)" %>% sprintf(as.character(start - 1)))}
  if(!is.null(end))  {end   %<>% as.POSIXct %>% lubridate::force_tz('GMT'); sql_filter %<>% paste("WHERE eventTime > CAST(%s, DATE)" %>% sprintf(as.character(end + 1)))}
  
  EL %<>% mutate(periodStart = eventTime %>% lubridate::floor_date(period), selected = T)
  molten = data.frame()
  for (ft in features){
    if(!is.null(ft$eventTypes)){
      EL$selected = EL$eventType %in% ft$eventTypes
    }
    
    if(!is.null(ft$variables)){
      EL$selected = EL$selected & (EL$variable %in% ft$variables)
    }
    EL %>% filter(selected) %>% group_by(caseID, periodStart) %>%
      summarise(aggrval = do.call(ft$aggregator, list(value))) %>%
      mutate(featureName = ft$name) %>%
      bind_rows(molten) -> molten
  }
  
  tt = data.frame(periodStart = seq(from = min(EL$periodStart), to = max(EL$periodStart), by = 'day'))
  data.frame(caseID = unique(eventlog$caseID)) %>% group_by(caseID) %>% do({data.frame(caseID = .$caseID, periodStart = tt)}) %>%
    left_join(molten %>% reshape2::dcast(caseID + periodStart ~ featureName, sum, value.var = 'aggrval'), by = c('caseID', 'periodStart')) -> molten
  molten[is.na(molten)] <- 0
  names(molten)[2] <- 'time'
  return(molten)
}
genpack/promer documentation built on Jan. 26, 2025, 11:30 p.m.