R/forecasting.R

Defines functions SeasDf_viaTs TestSeasDf_viaTs Forecast_viaArima Forecast_viaSeas_andGrowth TestForecast_viaSeas_andGrowth AvailTime_perMetric AggFunc AggFunc AggFunc AggFunc AggFunc AggFunc AggFunc

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# author: Reza Hosseini (reza1317@gmail.com)

## functions to perform forecasting of time series in R
# the time series could be of any given granularity (e.g. daily, hourly, ...)
# the emphasis is to capture growth and seasonality
# and the main use-case is for longterm forecast


## this function generates two seasonal data frames
# it uses ts from the stats package, function ts
# one for observed and one for the future
# via the ts function and given prescribed periods in
# periodOrdDf
SeasDf_viaTs = function(y, periodOrdDf, futLength) {

  fourierDf = NULL
  fourierDf_fut = NULL

  for (i in 1:nrow(periodOrdDf)) {
    period = periodOrdDf[i, "period"]
    ord = periodOrdDf[i, "ord"]

    yTs = stats::ts(na.omit(y), frequency=period)

    fDf = fourier(yTs, K=ord)
    fDf_fut = fourier(yTs, K=ord, h=futLength)

    fourierDf = cbind(fourierDf, fDf)
    fourierDf_fut = cbind(fourierDf_fut, fDf_fut)
  }

  return(list(
    "fourierDf"=fourierDf,
    "fourierDf_fut"=fourierDf_fut))
}

TestSeasDf_viaTs = function() {
  seasDfList = SeasDf_viaTs(
      1:100,
      data.frame("period"=c(24, 24*7), "ord"=c(1, 1)),
      futLength=24)

  fourierDf = seasDfList[["fourierDf"]]
  fourierDf_fut = seasDfList[["fourierDf_fut"]]

  head(fourierDf)
}

## forecast with arima model and via the forecast package
Forecast_viaArima = function(
    dt,
    valueCol,
    periodOrdDf, # eg data.frame("period"=c(24, 24*7), "ord"=c(1, 1)),
    futLength, # eg 24*30,
    weekLen, # eg 24*7,
    arimaOrder,
    addGrowth=TRUE) {

  tsWeek = ts(na.omit(y), frequency=weekLen)

  decomp = stl(tsWeek, s.window="periodic")
  yDeseas = seasadj(decomp)

  PltDecomp = function() {
    plot(decomp, main=paste("seasonal (week period) decomp"))
  }

  ## HW model
  mod = HoltWinters(tsWeek)
  hwForecast = forecast(mod, h=futLength)

  PltHW_forecast = function() {
    plot(hwForecast)
    med = median(y, na.rm=TRUE)
    abline(h=med)
    grid(col="red")
  }

  ## arima model with seasonal
  seasDfList = SeasDf_viaTs(
      y=y,
      periodOrdDf=periodOrdDf,
      futLength=futLength)

  fourierDf = seasDfList[["fourierDf"]]
  fourierDf_fut = seasDfList[["fourierDf_fut"]]

  fourierDf = data.frame(fourierDf)
  fourierDf_fut = data.frame(fourierDf_fut)

  n = nrow(fourierDf)
  m = nrow(fourierDf_fut)

  if (addGrowth) {
    fourierDf[ , "growth"] = (1:n)/(n+m)
    fourierDf_fut[ , "growth"] = ((n+1):(n+m))/(n+m)
  }

  xreg = fourierDf
  xreg_fut = fourierDf_fut
  xreg = as.matrix(xreg)
  xreg_fut = as.matrix(xreg_fut)

  yTs = ts(na.omit(y), frequency=weekLen)
  fit = Arima(yTs, order=arimaOrder, xreg=xreg)

  arimaFc = forecast(
      fit, h=futLength, xreg=xreg_fut)

  PltArima_forecast = function() {
    plot(arimaFc)
  }

  return(list(
      "PltDecomp"=PltDecomp,
      "seasDfList"=seasDfList,
      "PltArima_forecast"=PltArima_forecast,
      "PltHW_forecast"=PltHW_forecast
      ))
}

## this function fits fast (longterm) forecast models
# to time series of arbitrary time scale (e.g. hourly, daily, ...)
# it captures seasonality and growth
# function requires at least 1.1 year of data to produce reasonable forecast
Forecast_viaSeas_andGrowth = function(
    dt,
    timeCol,
    valueCol,
    futTimeLength_secs,
    gridLength_secs,
    trainTest_thresh=2019.45,
    origin_forTimeVars=2018,
    toyK=10,
    predCols0=c("ct1", "dow_hr"),
    writePath=NULL,
    figsPath=NULL,
    dataName="") {

  dt0 = SubsetCols(
      dt=dt, keepCols=c(timeCol, valueCol))

  dtAvail = na.omit(dt0)
  minTs = min(dtAvail[ , get(timeCol)])
  maxTs = max(dtAvail[ , get(timeCol)])
  dt = dt[get(timeCol) >= minTs, ]
  ts = Col(dt=dt, timeCol)

  t1 = max(ts) + gridLength_secs
  t2 = t1 + futTimeLength_secs
  futGrid = seq(t1, t2, by=gridLength_secs)

  timeDf = BuildTimeDf(
      ts=ts,
      format="%Y-%m-%d %H:%M:%S",
      tz="GMT",
      origin_forTimeVars=origin_forTimeVars)

  timeDt = data.table(timeDf)
  obsDt = cbind(
      dt[ , get(timeCol), drop=FALSE],
      timeDt, dt[ , get(valueCol), drop=FALSE])

  colnames(obsDt) = c(timeCol, colnames(timeDt), valueCol)

  timeDf_fut = BuildTimeDf(
      ts=futGrid,
      format="%Y-%m-%d %H:%M:%S",
      tz="GMT",
      origin_forTimeVars=origin_forTimeVars)

  timeDt_fut = data.table(timeDf_fut)
  futDt = cbind(futGrid, timeDt_fut)
  futDt[ , valueCol] = NA
  colnames(futDt) = c(timeCol, colnames(timeDt_fut), valueCol)

  trainDt = obsDt[contiTime < trainTest_thresh, ]
  testDt = obsDt[contiTime >= trainTest_thresh, ]

  longtIndex = NULL
  lagK = NULL
  todK = NULL
  todK_wknd = NULL
  towK = NULL
  toyK = toyK

  imputeWrtCols = "tow"

  res = BuildTsModDf_lagFourierParams(
      df=data.frame(trainDt),
      todK=todK,
      todK_wknd=todK_wknd,
      towK=towK,
      toyK=toyK,
      lagK=lagK,
      longtIndex=longtIndex,
      lagTrans=function(x)x,
      imputeWrtCols=imputeWrtCols
  )

  modDf_train = res[["modDf"]]
  fsFcnList = res[["fsFcnList"]]
  LagFcn = res[["LagFcn"]]

  res = BuildTsModDf_lagFourierParams(
      df=data.frame(testDt),
      todK=todK,
      todK_wknd=todK_wknd,
      towK=towK,
      toyK=toyK,
      lagK=lagK,
      longtIndex=longtIndex,
      lagTrans=function(x)x,
      imputeWrtCols=imputeWrtCols
  )

  modDf_test = res[["modDf"]]
  fsFcnList_test = res[["fsFcnList"]]
  LagFcn = res[["LagFcn"]]

  res = BuildTsModDf_lagFourierParams(
      df=data.frame(futDt),
      todK=todK,
      todK_wknd=todK_wknd,
      towK=towK,
      toyK=toyK,
      lagK=lagK,
      longtIndex=longtIndex,
      lagTrans=function(x)x,
      imputeWrtCols=NULL,
      omitNa=FALSE
  )

  modDf_fut = res[["modDf"]]
  fsFcnList_fut = res[["fsFcnList"]]
  LagFcn = res[["LagFcn"]]
  modDf_fut

  modDf_train[ , "modeling_period"] = "train"
  modDf_test[ , "modeling_period"] = "test"
  modDf_fut[ , "modeling_period"] = "future"

  modDf_testFut = rbind(modDf_test, modDf_fut)

  predCols = setdiff(
      setdiff(colnames(modDf_train), "modeling_period"),
      c(timeCol, valueCol, colnames(timeDf)))

  predCols = c(
      predCols,
      predCols0)

  res = FitPred(
      df=modDf_train,
      newDf=modDf_testFut,
      valueCol=valueCol,
      predCols=predCols,
      family="gaussian",
      predQuantLimits=c(0, 1),
      iqrDeltaCoef_right=2/10,
      iqrDeltaCoef_left=0)

  modDf = rbind(modDf_train, modDf_testFut)
  modDt = data.table(modDf)
  modDf_testFut[ , valueCol] = res[["yPred"]]
  mod = res[["mod"]]

  dt2 = modDt
  dt2[ , "value"] = dt2[ , get(valueCol)]

  dt2[ , is_max := (value == max(value)) , by="ywoy"]

  dt3 = dt2[is_max == TRUE, ]

  dim(dt2)
  dim(dt3)
  tail(dt3[ , mget(c(timeCol, "dow", valueCol))], 20)

  Plt = function(lwd=0.5) {
    ind = 1:nrow(modDf)
    #ind = (nrow(modDf)-1000):nrow(modDf)
    plot(
        modDf[ind , "contiTime"],
        modDf[ind , valueCol],
        col=ColAlpha("white", 0.5),
        type="l",
        lwd=lwd,
        ylab=valueCol,
        xlab="time")

    lines(
        modDf_train[ , "contiTime"],
        modDf_train[, valueCol],
        col=ColAlpha("blue", 0.3),
        lwd=lwd)

    lines(
        modDf_test[ , "contiTime"],
        modDf_test[, valueCol],
        col=ColAlpha("green", 0.3),
        lwd=lwd)

    lines(
        modDf_testFut[ind , "contiTime"],
        modDf_testFut[ind, valueCol],
        col=ColAlpha("red", 0.3),
        lwd=lwd)

    abline(h=0, lty=3, col=ColAlpha("grey", 0.8), lwd=lwd)

    legend(
        "left",
        legend=c("training", "test", "future"),
        col=c("blue", "green", "red"),
        lwd=c(2, 2, 2))
  }

  figFn0 = ""
  if (!is.null(figsPath)) {

    figFn0 = paste0(
        "forecast_for_",
        valueCol,
        "_with_",
        timeCol,
        "_granularity",
        dataName,
        ".png")

    figFn = paste0(figsPath, figFn0)
    # Cairo(file=fn, width=15, height=8, dpi=400, type="png")
    Cairo(
        file=figFn,
        width=640*10,
        height=480*5,
        dpi=600,
        pointsize=5,
        type="png",
        bg=ColAlpha("white", 1/3))
    Plt()
    dev.off()

  }

  modDf = rbind(modDf_train, modDf_testFut)
  forecastDf = modDf[ , c(timeCol, "modeling_period", valueCol)]

  dataFn0 = NULL
  if (!is.null(writePath)) {

    dataFn0 = paste0(
        "forecast_for_",
        valueCol,
        "_with_",
        timeCol,
        "_granularity",
        dataName,
        ".csv")

    dataFn = paste0(writePath, dataFn0)
    write.csv(
        file=dataFn,
        x=forecastDf,
        row.names=FALSE)
  }

  return(list(
      "forecastDf"=forecastDf,
      "Plt"=Plt,
      "figFileName"=figFn0,
      "dataFileName"=dataFn0,
      "avail_period"=c(minTs, maxTs)))
}


TestForecast_viaSeas_andGrowth = function() {

  ts = seq(
      from=as.POSIXlt("2018-04-01 23:53"),
      to=as.POSIXlt("2019-07-02 23:53"),
      by=60*60)

  timeDf = BuildTimeDf(ts)

  df = data.frame(
      "ts"=ts)

  df[ , "contiTime"] = timeDf[ , "contiTime"]

  n = nrow(df)
  df[ , "y"] = (
      sin(2 * pi * df[ , "contiTime"]) +
      sin(4 * pi * df[ , "contiTime"]) +
      1 * (1:n)/n)

  plot(df[ , "ts"], df[ , "y"], col=ColAlpha("blue"))

  dt = data.table(df)

  res = Forecast_viaSeas_andGrowth(
      dt=dt,
      timeCol="ts",
      valueCol="y",
      futTimeLength_secs=365*24*3600,
      gridLength_secs=60*60,
      trainTest_thresh=2019.45,
      toyK=10,
      writePath=NULL,
      figsPath=NULL,
      dataName="_v1")

  Plt = res[["Plt"]]

  Plt(lwd=2)
}

## calculates available time for each metric in a long data format
AvailTime_perMetric = function(
    longDt,
    timeCol,
    metricCol,
    valueCol,
    tablesPath=NULL) {

  dt2 = SubsetCols(df=longDt, keepCols=c(timeCol, metricCol, valueCol))
  dt2 = dt2[!is.na(value), ]

  colnames(dt2) = c("ts", "metric", "value")

  timeDt = dt2[ ,
      .(min_timestamp=min(ts), max_timestamp=max(ts)),
      by=metric]

  timeDf = data.frame(timeDt)
  for (col in c("min_timestamp", "max_timestamp")) {
    timeDf[ , col] = as.character(timeDf[ , col])
  }

  colnames(timeDf) = gsub("_", "-", colnames(timeDf))
  timeDf[ , "metric"] = gsub("_", "-", timeDf[ , "metric"])

  x = xtable2(
      timeDf,
      caption="time df",
      label="time df",
      digits=3)

  if (!is.null(tablesPath)) {

    fn = paste0(
        tablesPath,
        "available_time_for_granularity_of_",
        timeCol,
        "_for_each_series.tex")

    print(
        x=x,
        file=fn,
        include.rownames=FALSE,
        hline.after=c(-1, 0, 1:nrow(timeDf)),
        size="tiny",
        sanitize.text.function=identity)
  }

  return(list("timeDf"=timeDf, "latexTab"=x))
}

# this function drops columns with zero variance from a data table
DropCols_withZeroVar = function(dt, cols=NULL, thresh=0) {

  dt = data.table(dt)
  if (is.null(cols)) {
    cols = colnames(dt)
  }

  sds = dt[ , lapply(.SD, function(x)sd(x, na.rm=TRUE)), ]

  ind = which(sds  > thresh)

  cols1 = cols[ind]
  cols2 = setdiff(cols, cols1)
  dt = SubsetCols(dt, keepCols=cols1)

  return(list(
      "dt"=dt,
      "cols_nonzero_sd"= cols1,
      "cols_zero_sd"=cols2))
}

## it interpolates a time series given in a data table
# by using near by values
# the near by values are found by a timeRounding
# we do not attempt to look for the nearest available value
# for computational efficiency reasons
InterpolateSeries_nearTimes = function(
    dt,
    timeCol,
    valueCols,
    timeRounding,
    idCols=NULL,
    AggFunc=function(x)mean(x, na.rm=TRUE)) {

  dt = data.table(dt)

  newTimeCol = paste0("ts", gsub(" ", "", timeRounding))

  dt[ , newTimeCol] = round_date(Col(dt=dt , col=timeCol), timeRounding)

  aggDt = DtSimpleAgg(
      dt=dt,
      gbCols=c(newTimeCol, idCols),
      valueCols=valueCols,
      AggFunc=AggFunc)

  colnames(aggDt) = plyr::mapvalues(
      colnames(aggDt), valueCols, paste0(valueCols, "_interp"))

  dt2 = merge(dt, aggDt, by=c(idCols, newTimeCol))


  for (col in valueCols) {
    col2 = paste0(col, "_interp")
    col3 = paste0(col, "_final")
    dt2[ , col3] = dt2[ , get(col)]
    ind = which(is.na(dt2[ , get(col)]))
    if (length(ind) > 0) {
      str = paste0("dt2[ind,", col3, ":=", col2, "]")
      eval(parse(text=str))
    }
  }

  return(list("aggDt"=aggDt, "dt"=dt2, "newTimeCol"=newTimeCol))
}

TestInterpolateSeries_nearTimes = function() {

  ts = seq(
      from=as.POSIXlt("2011-01-01 23:53"),
      to=as.POSIXlt("2011-01-02 23:53"),
      by=60)

  n = length(ts)

  df0 = data.frame(
      "ts"=ts,
      "x1"=log((1:n)/n) + rnorm(n, sd=1/50),
      "x2"=log((2:(n+1))/n) + rnorm(n, sd=1/50))

  m = round(5 * n / 6)
  ind1 = sort(sample(1:n, m, replace=FALSE))
  df0[ind1, "x1"] = NA
  ind2 = sort(sample(1:n, m, replace=FALSE))
  df0[ind2, "x2"] = NA

  res = InterpolateSeries_nearTimes(
    dt=df0,
    timeCol="ts",
    valueCols=c("x1", "x2"),
    timeRounding="20 min",
    idCols=NULL,
    AggFunc=function(x)mean(x, na.rm=TRUE))

  dt2 = res[["dt"]]

  df2 = data.frame(dt2)

  plot(df2[ , "ts"], df2[ , "x1"], col=ColAlpha("blue", 0.4))
  lines(df2[ , "ts"], df2[ , "x1_final"], col=ColAlpha("red", 0.4))
}

## specify a target granularity
# the granularity could be FINER than the data
RegGrid_timeSeries = function(
    dt,
    timeCol,
    timeRounding, # 1 min, 5 min, 1 hour, 2 hour, 1 week
    valueCols,
    idCols=NULL,
    startTime=NULL,
    endTime=NULL,
    AggFunc=function(x)mean(x, na.rm=TRUE)) {

  if (is.null(startTime)) {
    startTime = min(dt[ , get(timeCol)], na.rm=TRUE)
  }

  if (is.null(endTime)) {
    endTime = max(dt[ , get(timeCol)], na.rm=TRUE)
  }

  startTime =  round_date(startTime, timeRounding)
  endTime = round_date(endTime, timeRounding)

  gran = as.numeric(lubridate::duration(timeRounding))

  ts = seq(
      from=startTime,
      to=endTime,
      by=gran)

  # lets round the data up to the prescribed granularity
  newTimeCol = paste0("ts", gsub(" ", "", timeRounding))

  df0 = data.frame(newTimeCol=ts)
  dt0 = data.table(df0)
  colnames(dt0) = newTimeCol
  dt[ , newTimeCol] = round_date(Col(dt=dt , col=timeCol), timeRounding)

  gridDt = merge(dt0, dt, all.x=TRUE, all=FALSE, by=c(newTimeCol, idCols))

  ## aggregate is necessary if the data is coarser than the original series
  # at some intervals
  gridDt = DtSimpleAgg(
      dt=gridDt,
      gbCols=c(newTimeCol, idCols),
      valueCols=valueCols,
      AggFunc=AggFunc)

  return(list(
      "gridDt"=gridDt,
      "newTimeCol"=newTimeCol,
      "startTime"=startTime,
      "endTime"=endTime))
}


TestRegGrid_timeSeries = function() {

  ts = seq(
      from=as.POSIXlt("2011-01-01 1:53"),
      to=as.POSIXlt("2011-01-01 05:53"),
      by=60)

  n = length(ts)

  df = data.frame(
      "ts"=ts,
      "x1"=log((1:n)/n) + rnorm(n, sd=1/50),
      "x2"=log((2:(n+1))/n) + rnorm(n, sd=1/50))

  m = round(2 * n / 3)
  ind1 = sort(sample(1:n, m, replace=FALSE))
  df[ind1, "x1"] = NA
  ind2 = sort(sample(1:n, m, replace=FALSE))
  df[ind2, "x2"] = NA

  ## drop some time stamps
  ind3 = sort(sample(1:n, n-m, replace=FALSE))
  df = df[ind3, ]

  dt = data.table(df)

  timeCol = "ts"
  timeRounding = "5 min"
  startTime = NULL
  endTime = NULL
  valueCols = c("x1", "x2")
  idCols = NULL

  res = RegGrid_timeSeries(
      dt=dt,
      timeCol=timeCol,
      timeRounding=timeRounding, # 1 min, 5 min, 1 hour, 2 hour, 1 week
      valueCols=valueCols,
      idCols=NULL,
      startTime=NULL,
      endTime=NULL,
      AggFunc=AggFunc)

  gridDt = res[["gridDt"]]

  plot(dt[ , ts], dt[ , x1], col=ColAlpha("blue", 1/3), pch=10)
  points(gridDt[ , ts5min], gridDt[ , x1], col=ColAlpha("red", 1/3), pch=10)
  for (v in gridDt[ , ts5min]) {
    abline(v=v)
  }
  points(
      gridDt[ , ts5min], rep(-1, nrow(gridDt)),
      col=ColAlpha("black", 1/3),
      pch=10)
}

## create a regular grid for a multivar (or univar) time series
# and interpolating values
RegGrid_timeSeries_andInterpolate = function(
    dt,
    timeCol,
    timeRoundingGrid, # 1 min, 5 min, 1 hour, 2 hour, 1 week
    timeRoundingInterp,
    valueCols,
    idCols=NULL,
    startTime=NULL,
    endTime=NULL,
    AggFunc=function(x)mean(x, na.rm=TRUE)) {

  res = RegGrid_timeSeries(
      dt=dt,
      timeCol=timeCol,
      timeRounding=timeRoundingGrid, # 1 min, 5 min, 1 hour, 2 hour, 1 week
      valueCols=valueCols,
      idCols=idCols,
      startTime=startTime,
      endTime=endTime)

  gridDt = res[["gridDt"]]
  newTimeCol_grid = res[["newTimeCol"]]
  startTime = res[["startTime"]]
  endTime = res[["endTime"]]

  res = InterpolateSeries_nearTimes(
    dt=gridDt,
    timeCol=newTimeCol_grid,
    valueCols=valueCols,
    timeRounding=timeRoundingInterp,
    idCols=NULL,
    AggFunc=AggFunc)

  gridDt_interp = res[["dt"]]
  newTimeCol_interp = res[["newTimeCol"]]

  return(list(
      "gridDt"=gridDt,
      "gridDt_interp"=gridDt_interp,
      "startTime"=startTime,
      "endTime"=endTime,
      "newTimeCol_grid"=newTimeCol_grid,
      "newTimeCol_interp"=newTimeCol_interp))
}

TestRegGrid_timeSeries_andInterpolate = function() {

  ts = seq(
      from=as.POSIXlt("2011-01-01 01:53:00"),
      to=as.POSIXlt("2011-01-01 03:53:00"),
      by=60)

  n = length(ts)

  df = data.frame(
      "ts"=ts,
      "x1"=log((1:n)/n) + rnorm(n, sd=1/50),
      "x2"=log((2:(n+1))/n) + rnorm(n, sd=1/50))

  m = round(n / 3)
  ind1 = sort(sample(1:n, m, replace=FALSE))
  df[ind1, "x1"] = NA
  ind2 = sort(sample(1:n, m, replace=FALSE))
  df[ind2, "x2"] = NA

  ## drop some timestamps
  ind3 = sort(sample(1:n, n-m, replace=FALSE))
  df = df[ind3, ]

  dt = data.table(df)

  timeCol = "ts"
  timeRoundingGrid = "1 min"
  timeRoundingInterp = "5 min"
  startTime = NULL
  endTime = NULL
  valueCols = c("x1", "x2")
  idCols = NULL

  res = RegGrid_timeSeries_andInterpolate(
      dt=dt,
      timeCol=timeCol,
      timeRoundingGrid=timeRoundingGrid,
      timeRoundingInterp=timeRoundingInterp, # 1 min, 5 min, 1 hour, 2 hour, 1 week
      valueCols=valueCols,
      idCols=NULL,
      startTime=NULL,
      endTime=NULL,
      AggFunc=function(x)median(x, na.rm=TRUE))

  gridDt = res[["gridDt"]]
  gridDt_interp = res[["gridDt_interp"]]
  newTimeCol_grid = res[["newTimeCol_grid"]]
  newTimeCol_interp = res[["newTimeCol_interp"]]

  plot(dt[ , ts], dt[ , x1], col=ColAlpha("blue", 1/3), pch=20)
  points(
      gridDt[ , get(newTimeCol_grid)],
      gridDt[ , x1],
      col=ColAlpha("red", 1/3), pch=21)

  points(
      gridDt_interp[ , get(newTimeCol_grid)],
      gridDt_interp[ , x1_final],
      col=ColAlpha("black", 1/2), pch=22)


  for (v in gridDt[ , get(newTimeCol_grid)]) {
    abline(v = v, lty=3, col=ColAlpha("grey", 1/2))
  }
}

## just an inspection of the gridding
# for interpolated series
InspectGridding_forInterpolatedSeries = function(
    dt,
    newTimeCol_grid,
    valueCol,
    valueCol_final) {

  n = 1
  ind = (n+1):(n+500)

  ind = 1:nrow(dt)
  plot(
      dt[ind , get(newTimeCol_grid)],
      dt[ind , get(valueCol)],
      col=ColAlpha("red", 1/2), pch=20)

  lines(
      dt[ind , get(newTimeCol_grid)],
      dt[ind , get(valueCol_final)],
      col=ColAlpha("blue", 1/2), pch=22)
}

## make a correlation plot and save it after removing
# variables with zero variace
Plt_tsCor_dropZeroVar = function(
    wideDt,
    timeCol,
    idCols=NULL,
    dataName="",
    valueCols=NULL,
    figsPath=NULL,
    latexFn=NULL) {

  if (is.null(valueCols)) {
    valueCols = setdiff(colnames(wideDt), c(timeCol, idCols))
  }

  df = data.frame(wideDt)
  colnames(df) = colnames(wideDt)

  res = DropCols_withZeroVar(dt=df[ , valueCols], cols=NULL, thresh=0)
  dt = res[["dt"]]
  valueCols2 = res[["cols_nonzero_sd"]]
  cols_zero_sd = res[["cols_zero_sd"]]

  df2 = df[ , valueCols2]

  corMat = cor(df2, use="pairwise.complete.obs")
  p = CorPlt(df=df2)


  if (!is.null(figsPath)) {
    fn0 = paste0(
        "correlation_among_metrics_for_granularity_of_",
        timeCol,
        "_for_",
        dataName,
        ".png")

    fn = paste0(figsPath, fn0)
    ggsave(p, filename=fn, width=12, height=10, dpi=600, type="cairo")
  }

  if (!is.null(figsPath) && !is.null(latexFn)) {
    LatexFig(
        latexFn=latexFn,
        figFn=fn0,
        figLabel=NULL,
        figCaption=NULL,
        scale=0.5)
  }

  return(list(
      "cols_nonzero_sd"=valueCols2,
      "cols_zero_sd"=cols_zero_sd,
      "p"=p,
      "corMat"=corMat))
}

## clustering correlated variables
Cluster_correlatedVars = function(
    df, cols, clusterNum, absValue=TRUE, figFn=NULL) {

  res = DropCols_withZeroVar(dt=df[ , cols], cols=NULL, thresh=0)
  dt = res[["dt"]]
  cols_nonzero_sd = res[["cols_nonzero_sd"]]
  cols_zero_sd = res[["cols_zero_sd"]]

  x = data.frame(dt)
  # calculate pairwise correlation between all variables
  corMat = cor(x, use="pairwise.complete.obs")

  if (absValue) {
    corMat = abs(corMat)
  }

  ## calculate dissimilarity matrix
  # this will have choose(n, 2) elements, where n = length(cols)
  distMat = dist(corMat)

  ## possible methods: "ward.D", "ward.D2",
  # "single", "complete", "average"’, "mcquitty", "median", "centroid"
  ## here we cluster the data
  hc = hclust(distMat, method="ward.D")
  grp = cutree(hc, k=clusterNum)
  table(grp)
  rownames(df)[grp == 1]

  Plt = function() {
    plot(hc, cex=1)
    rect.hclust(hc, k=clusterNum, border=2:5)
  }

  if (!is.null(figFn)) {
    r = 3
    png(figFn,  width=480*r, height=480*r, pointsize=14, res=200)
    Plt()
    dev.off()
  }

  return(list(
      "hc"=hc,
      "grp"=grp,
      "Plt"=Plt,
      "cols_nonzero_sd"=cols_nonzero_sd,
      "cols_zero_sd"=cols_zero_sd))
}

## explores the correlation between multiple value columns in a data table
Explore_CorDt = function(
    wideDt,
    valueCol,
    clusterNum,
    figsPath=NULL,
    fnSuffix="",
    latexFn=NULL) {

  corMat = cor(SubsetCols(df=wideDt, keepCols=valueCols), use="pairwise.complete.obs")
  corPlt = CorPlt(
      df=data.frame(SubsetCols(df=wideDt, keepCols=valueCols)),
      fontSizeAlpha=0.3,
      fontFace="plain")

  corPlt_fn0 = NULL

  if (!is.null(figsPath)) {
    corPlt_fn0 = paste0(
        "correlation_among_metrics_for_",
        fnSuffix,
        ".png")

    fn = paste0(figsPath, corPlt_fn0)
    ggsave(corPlt, filename=fn, width=12, height=10, dpi=600, type="cairo")
  }

  if (!is.null(latexFn)) {
    LatexFig(
        latexFn=latexFn,
        figFn=corPlt_fn0,
        figLabel=NULL,
        figCaption=NULL,
        scale=0.5)
  }

  df = data.frame(wideDt)
  colnames(df) = colnames(dt)

  res = Cluster_correlatedVars(
      df=df,
      cols=valueCols,
      clusterNum=clusterNum,
      absValue=TRUE,
      figFn=paste0(figsPath, "variable_clusters.png"))

  clusterPlt = res[["Plt"]]

  clusterPlt_fn0 = NULL

  if (!is.null(figsPath)) {
    clusterPlt_fn0 = paste0(
        "variable_clustering_based_on_absolute_value_of_correlation_for_",
        fnSuffix,
        ".png")
    fn = paste0(figsPath, clusterPlt_fn0)
    #Cairo(file=fn, width=15, height=8, dpi=400, type="png")
    Cairo(file=fn,  type="png")
    clusterPlt()
    dev.off()
  }

  if (!is.null(latexFn)) {
    LatexFig(
        latexFn=latexFn,
        figFn=,
        figLabel=NULL,
        figCaption=NULL,
        scale=0.5)
  }

  return(list(
      "corPlt"=corPlt,
      "clusterPlt"=clusterPlt,
      "corPlt_fileName"=corPlt_fn0,
      "clusterPlt_fileName"=clusterPlt_fn0
      ))

}


TestExplore_CorDt = function() {

  library("MASS")
  k = 10
  mat = mvrnorm(n=100, mu=rep(0, k), Sigma=diag(k))

  wideDt = data.table(mat)
  valueCols = colnames(wideDt)

  res = Explore_CorDt(
      wideDt=wideDt,
      valueCol=valueCols,
      clusterNum=2,
      figsPath=NULL,
      fnSuffix="",
      latexFn=NULL)

}

## function to explore multivariate time series data
# it abbreviates column name
# it can read different timestamp columns via parse_date
# it rounds the timestamp and aggregates using AggFunc which can be passed
# eg AggFunc can be mean or sum
# it creates overlay plots
Explore_multivarTimeSeries = function(
    df,
    timeCol,
    dataName,
    timeRounding,
    AggFunc=function(x)mean(x, na.rm=TRUE),
    colsReplaceStrings=c("prod", "\\.", "/", "&", " and ", "-", "_", ",", ";"),
    idCols=NULL,
    valueCols=NULL,
    figsPath=NULL,
    csvPath=NULL,
    tablesPath=NULL,
    latexFn=NULL) {

  df[ , "ts"] = parse_date(df[ , timeCol])
  if (timeCol != "ts") {
    df = SubsetCols(df, keepCols=NULL, dropCols=timeCol)
  }

  Func = function(s) {

    AbbrString(
        s,
        wordLength=5,
        replaceStrings=colsReplaceStrings,
        sep="_",
        totalLength=NULL,
        wordNumLimit=NULL)

  }

  colnames(df) = tolower(sapply(FUN=Func, X=colnames(df)))

  if (is.null(valueCols)) {
    valueCols = setdiff(colnames(df), c("ts", timeCol, idCols))
  }

  #df[1:10, c("ts", "timestamp")]
  dt = data.table(df)
  newTimeCol = paste0("ts", gsub(" ", "", timeRounding))

  dt[ , newTimeCol] = round_date(Col(dt=dt , "ts"), timeRounding)
  dt = SubsetCols(dt, keepCols=NULL, dropCols="ts")

  wideDt = DtSimpleAgg(
      dt=dt,
      gbCols=c(newTimeCol, idCols),
      valueCols=valueCols,
      AggFunc=function(x){mean(x, na.rm=TRUE)})

  longDt = melt(
      wideDt,
      id.vars=c(newTimeCol, idCols),
      variable.name="metric",
      value.name="value")

  p = Plt_splitCurve_splitPanel(
      df=longDt,
      xCol=newTimeCol,
      valueCol="value",
      splitCurveCol="metric",
      splitPanelCol=NULL,
      alpha=0.3,
      size=0.5)

  overlayTs_fn0 = NULL

  if (!is.null(figsPath)) {
    overlayTs_fn0 = paste0(
        dataName,
        "_raw_",
        newTimeCol,
        "_average_time_series", ".png")
    fn = paste0(figsPath, overlayTs_fn0)
    print(fn)
    ggsave(p, filename=fn, width=12, height=10, dpi=600, type="cairo")
  }

  rawSeriesPlt = p

  if (!is.null(latexFn)) {
    LatexFig(
        latexFn=latexFn,
        figFn=overlayTs_fn0,
        figLabel=NULL,
        figCaption=NULL,
        scale=0.5)
  }

  res = Plt_tsCor_dropZeroVar(
    wideDt=wideDt,
    timeCol=newTimeCol,
    dataName=dataName,
    valueCols=NULL,
    figsPath=figsPath,
    latexFn=latexFn)

  cols_nonzero_sd = res[["cols_nonzero_sd"]]
  cols_zero_sd = res[["cols_zero_sd"]]
  corPlt = res[["p"]]
  corMat = res[["corMat"]]

  res = AvailTime_perMetric(
      longDt=longDt,
      timeCol=newTimeCol,
      metricCol="metric",
      valueCol="value",
      tablesPath=NULL)

  latexTab = res[["latexTab"]]
  timeDf = res[["timeDf"]]

  availTime_fn0 = NULL
  if (!is.null(csvPath)) {
    availTime_fn0 = paste0("avail_time_interval_", dataName, ".csv")
    fn = paste0(csvPath, availTime_fn0)
    print(fn)
    write.csv(file=fn, x=timeDf, rownames=FALSE)
  }

  availTime_latexTab_fn0 = NULL
  if (!is.null(tablesPath)) {
    x = xtable2(
      timeDf,
      caption="time df",
      label="time df",
      digits=3)

    availTime_latexTab_fn0 = paste0("avail_time_interval_", dataName, ".tex")
    fn = paste0(tablesPath, availTime_latexTab_fn0)

    print(
        x=x, file=fn, include.rownames=FALSE,
        hline.after=c(-1, 0, 1:nrow(timeDf)),
        size="tiny",
        sanitize.text.function=identity)

    print("this table file was created:")
    print(fn)
  }


  return(list(
      "longDt"=longDt,
      "wideDt"=wideDt,
      "timeDf"=timeDf,
      "newTimeCol"=newTimeCol,
      "rawSeriesPlt"=rawSeriesPlt,
      "corPlt"=corPlt,
      "availTime_latexTab"=latexTab,
      "cols_nonzero_sd"=cols_nonzero_sd,
      "cols_zero_sd"=cols_zero_sd,
      "overlay_fileName"=overlayTs_fn0,
      "availTime_csv_fileName"=availTime_fn0,
      "availTime_latexTab_fileName"=availTime_latexTab_fn0))
}


## this explores a list of each being
# a multivar time series given in wide format
# it also merges them together
Explore_multivarTimeSeries_fromList = function(
    dfList,
    timeRounding,
    AggFunc,
    figsPath="",
    tablesPath="",
    latexFn=NULL) {

  k = length(dfList)
  dataList = list()
  for (i in 1:k) {
    df = dfList[[i]]
    dataName = names(dfList)[i]
    print(paste("processing:", dataName))
    dataName = AbbrString(
        dataName,
        wordLength=5,
        replaceStrings=c(
            "csv", "prod", "\\.", "/", "&", " and ", "-", "_", ",",
            ";"),
        sep="_")

    print(paste("new data name is: ", dataName))
    colnames(df)[1] = "timestamp"

    res = Explore_multivarTimeSeries(
        df=df,
        timeCol="timestamp",
        dataName=dataName,
        timeRounding=timeRounding,
        AggFunc=AggFunc,
        valueCols=NULL,
        figsPath=figsPath,
        tablesPath=tablesPath,
        latexFn=latexFn)

    longDt0 = res[["longDt"]]
    wideDt0 = res[["wideDt"]]
    longDt0[ , "group"] = dataName
    timeCol = res[["newTimeCol"]]

    if (i == 1) {
      longDt = longDt0
      wideDt = wideDt0

    } else {
      longDt = rbind(longDt, longDt0)
      wideDt = merge(wideDt, wideDt0, by=timeCol, all=TRUE)
    }

    dataList[[dataName]] = res

  }

  return(list(
      "dataList"=dataList,
      "longDt"=longDt,
      "wideDt"=wideDt,
      "timeCol"=paste0("ts", gsub(" ", "", timeRounding))))
}
Reza1317/funcly documentation built on Feb. 5, 2020, 4:06 a.m.