#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# author: Reza Hosseini (reza1317@gmail.com)
## functions to perform forecasting of time series in R
# the time series could be of any given granularity (e.g. daily, hourly, ...)
# the emphasis is to capture growth and seasonality
# and the main use-case is for longterm forecast
## this function generates two seasonal data frames
# it uses ts from the stats package, function ts
# one for observed and one for the future
# via the ts function and given prescribed periods in
# periodOrdDf
SeasDf_viaTs = function(y, periodOrdDf, futLength) {
fourierDf = NULL
fourierDf_fut = NULL
for (i in 1:nrow(periodOrdDf)) {
period = periodOrdDf[i, "period"]
ord = periodOrdDf[i, "ord"]
yTs = stats::ts(na.omit(y), frequency=period)
fDf = fourier(yTs, K=ord)
fDf_fut = fourier(yTs, K=ord, h=futLength)
fourierDf = cbind(fourierDf, fDf)
fourierDf_fut = cbind(fourierDf_fut, fDf_fut)
}
return(list(
"fourierDf"=fourierDf,
"fourierDf_fut"=fourierDf_fut))
}
TestSeasDf_viaTs = function() {
seasDfList = SeasDf_viaTs(
1:100,
data.frame("period"=c(24, 24*7), "ord"=c(1, 1)),
futLength=24)
fourierDf = seasDfList[["fourierDf"]]
fourierDf_fut = seasDfList[["fourierDf_fut"]]
head(fourierDf)
}
## forecast with arima model and via the forecast package
Forecast_viaArima = function(
dt,
valueCol,
periodOrdDf, # eg data.frame("period"=c(24, 24*7), "ord"=c(1, 1)),
futLength, # eg 24*30,
weekLen, # eg 24*7,
arimaOrder,
addGrowth=TRUE) {
tsWeek = ts(na.omit(y), frequency=weekLen)
decomp = stl(tsWeek, s.window="periodic")
yDeseas = seasadj(decomp)
PltDecomp = function() {
plot(decomp, main=paste("seasonal (week period) decomp"))
}
## HW model
mod = HoltWinters(tsWeek)
hwForecast = forecast(mod, h=futLength)
PltHW_forecast = function() {
plot(hwForecast)
med = median(y, na.rm=TRUE)
abline(h=med)
grid(col="red")
}
## arima model with seasonal
seasDfList = SeasDf_viaTs(
y=y,
periodOrdDf=periodOrdDf,
futLength=futLength)
fourierDf = seasDfList[["fourierDf"]]
fourierDf_fut = seasDfList[["fourierDf_fut"]]
fourierDf = data.frame(fourierDf)
fourierDf_fut = data.frame(fourierDf_fut)
n = nrow(fourierDf)
m = nrow(fourierDf_fut)
if (addGrowth) {
fourierDf[ , "growth"] = (1:n)/(n+m)
fourierDf_fut[ , "growth"] = ((n+1):(n+m))/(n+m)
}
xreg = fourierDf
xreg_fut = fourierDf_fut
xreg = as.matrix(xreg)
xreg_fut = as.matrix(xreg_fut)
yTs = ts(na.omit(y), frequency=weekLen)
fit = Arima(yTs, order=arimaOrder, xreg=xreg)
arimaFc = forecast(
fit, h=futLength, xreg=xreg_fut)
PltArima_forecast = function() {
plot(arimaFc)
}
return(list(
"PltDecomp"=PltDecomp,
"seasDfList"=seasDfList,
"PltArima_forecast"=PltArima_forecast,
"PltHW_forecast"=PltHW_forecast
))
}
## this function fits fast (longterm) forecast models
# to time series of arbitrary time scale (e.g. hourly, daily, ...)
# it captures seasonality and growth
# function requires at least 1.1 year of data to produce reasonable forecast
Forecast_viaSeas_andGrowth = function(
dt,
timeCol,
valueCol,
futTimeLength_secs,
gridLength_secs,
trainTest_thresh=2019.45,
origin_forTimeVars=2018,
toyK=10,
predCols0=c("ct1", "dow_hr"),
writePath=NULL,
figsPath=NULL,
dataName="") {
dt0 = SubsetCols(
dt=dt, keepCols=c(timeCol, valueCol))
dtAvail = na.omit(dt0)
minTs = min(dtAvail[ , get(timeCol)])
maxTs = max(dtAvail[ , get(timeCol)])
dt = dt[get(timeCol) >= minTs, ]
ts = Col(dt=dt, timeCol)
t1 = max(ts) + gridLength_secs
t2 = t1 + futTimeLength_secs
futGrid = seq(t1, t2, by=gridLength_secs)
timeDf = BuildTimeDf(
ts=ts,
format="%Y-%m-%d %H:%M:%S",
tz="GMT",
origin_forTimeVars=origin_forTimeVars)
timeDt = data.table(timeDf)
obsDt = cbind(
dt[ , get(timeCol), drop=FALSE],
timeDt, dt[ , get(valueCol), drop=FALSE])
colnames(obsDt) = c(timeCol, colnames(timeDt), valueCol)
timeDf_fut = BuildTimeDf(
ts=futGrid,
format="%Y-%m-%d %H:%M:%S",
tz="GMT",
origin_forTimeVars=origin_forTimeVars)
timeDt_fut = data.table(timeDf_fut)
futDt = cbind(futGrid, timeDt_fut)
futDt[ , valueCol] = NA
colnames(futDt) = c(timeCol, colnames(timeDt_fut), valueCol)
trainDt = obsDt[contiTime < trainTest_thresh, ]
testDt = obsDt[contiTime >= trainTest_thresh, ]
longtIndex = NULL
lagK = NULL
todK = NULL
todK_wknd = NULL
towK = NULL
toyK = toyK
imputeWrtCols = "tow"
res = BuildTsModDf_lagFourierParams(
df=data.frame(trainDt),
todK=todK,
todK_wknd=todK_wknd,
towK=towK,
toyK=toyK,
lagK=lagK,
longtIndex=longtIndex,
lagTrans=function(x)x,
imputeWrtCols=imputeWrtCols
)
modDf_train = res[["modDf"]]
fsFcnList = res[["fsFcnList"]]
LagFcn = res[["LagFcn"]]
res = BuildTsModDf_lagFourierParams(
df=data.frame(testDt),
todK=todK,
todK_wknd=todK_wknd,
towK=towK,
toyK=toyK,
lagK=lagK,
longtIndex=longtIndex,
lagTrans=function(x)x,
imputeWrtCols=imputeWrtCols
)
modDf_test = res[["modDf"]]
fsFcnList_test = res[["fsFcnList"]]
LagFcn = res[["LagFcn"]]
res = BuildTsModDf_lagFourierParams(
df=data.frame(futDt),
todK=todK,
todK_wknd=todK_wknd,
towK=towK,
toyK=toyK,
lagK=lagK,
longtIndex=longtIndex,
lagTrans=function(x)x,
imputeWrtCols=NULL,
omitNa=FALSE
)
modDf_fut = res[["modDf"]]
fsFcnList_fut = res[["fsFcnList"]]
LagFcn = res[["LagFcn"]]
modDf_fut
modDf_train[ , "modeling_period"] = "train"
modDf_test[ , "modeling_period"] = "test"
modDf_fut[ , "modeling_period"] = "future"
modDf_testFut = rbind(modDf_test, modDf_fut)
predCols = setdiff(
setdiff(colnames(modDf_train), "modeling_period"),
c(timeCol, valueCol, colnames(timeDf)))
predCols = c(
predCols,
predCols0)
res = FitPred(
df=modDf_train,
newDf=modDf_testFut,
valueCol=valueCol,
predCols=predCols,
family="gaussian",
predQuantLimits=c(0, 1),
iqrDeltaCoef_right=2/10,
iqrDeltaCoef_left=0)
modDf = rbind(modDf_train, modDf_testFut)
modDt = data.table(modDf)
modDf_testFut[ , valueCol] = res[["yPred"]]
mod = res[["mod"]]
dt2 = modDt
dt2[ , "value"] = dt2[ , get(valueCol)]
dt2[ , is_max := (value == max(value)) , by="ywoy"]
dt3 = dt2[is_max == TRUE, ]
dim(dt2)
dim(dt3)
tail(dt3[ , mget(c(timeCol, "dow", valueCol))], 20)
Plt = function(lwd=0.5) {
ind = 1:nrow(modDf)
#ind = (nrow(modDf)-1000):nrow(modDf)
plot(
modDf[ind , "contiTime"],
modDf[ind , valueCol],
col=ColAlpha("white", 0.5),
type="l",
lwd=lwd,
ylab=valueCol,
xlab="time")
lines(
modDf_train[ , "contiTime"],
modDf_train[, valueCol],
col=ColAlpha("blue", 0.3),
lwd=lwd)
lines(
modDf_test[ , "contiTime"],
modDf_test[, valueCol],
col=ColAlpha("green", 0.3),
lwd=lwd)
lines(
modDf_testFut[ind , "contiTime"],
modDf_testFut[ind, valueCol],
col=ColAlpha("red", 0.3),
lwd=lwd)
abline(h=0, lty=3, col=ColAlpha("grey", 0.8), lwd=lwd)
legend(
"left",
legend=c("training", "test", "future"),
col=c("blue", "green", "red"),
lwd=c(2, 2, 2))
}
figFn0 = ""
if (!is.null(figsPath)) {
figFn0 = paste0(
"forecast_for_",
valueCol,
"_with_",
timeCol,
"_granularity",
dataName,
".png")
figFn = paste0(figsPath, figFn0)
# Cairo(file=fn, width=15, height=8, dpi=400, type="png")
Cairo(
file=figFn,
width=640*10,
height=480*5,
dpi=600,
pointsize=5,
type="png",
bg=ColAlpha("white", 1/3))
Plt()
dev.off()
}
modDf = rbind(modDf_train, modDf_testFut)
forecastDf = modDf[ , c(timeCol, "modeling_period", valueCol)]
dataFn0 = NULL
if (!is.null(writePath)) {
dataFn0 = paste0(
"forecast_for_",
valueCol,
"_with_",
timeCol,
"_granularity",
dataName,
".csv")
dataFn = paste0(writePath, dataFn0)
write.csv(
file=dataFn,
x=forecastDf,
row.names=FALSE)
}
return(list(
"forecastDf"=forecastDf,
"Plt"=Plt,
"figFileName"=figFn0,
"dataFileName"=dataFn0,
"avail_period"=c(minTs, maxTs)))
}
TestForecast_viaSeas_andGrowth = function() {
ts = seq(
from=as.POSIXlt("2018-04-01 23:53"),
to=as.POSIXlt("2019-07-02 23:53"),
by=60*60)
timeDf = BuildTimeDf(ts)
df = data.frame(
"ts"=ts)
df[ , "contiTime"] = timeDf[ , "contiTime"]
n = nrow(df)
df[ , "y"] = (
sin(2 * pi * df[ , "contiTime"]) +
sin(4 * pi * df[ , "contiTime"]) +
1 * (1:n)/n)
plot(df[ , "ts"], df[ , "y"], col=ColAlpha("blue"))
dt = data.table(df)
res = Forecast_viaSeas_andGrowth(
dt=dt,
timeCol="ts",
valueCol="y",
futTimeLength_secs=365*24*3600,
gridLength_secs=60*60,
trainTest_thresh=2019.45,
toyK=10,
writePath=NULL,
figsPath=NULL,
dataName="_v1")
Plt = res[["Plt"]]
Plt(lwd=2)
}
## calculates available time for each metric in a long data format
AvailTime_perMetric = function(
longDt,
timeCol,
metricCol,
valueCol,
tablesPath=NULL) {
dt2 = SubsetCols(df=longDt, keepCols=c(timeCol, metricCol, valueCol))
dt2 = dt2[!is.na(value), ]
colnames(dt2) = c("ts", "metric", "value")
timeDt = dt2[ ,
.(min_timestamp=min(ts), max_timestamp=max(ts)),
by=metric]
timeDf = data.frame(timeDt)
for (col in c("min_timestamp", "max_timestamp")) {
timeDf[ , col] = as.character(timeDf[ , col])
}
colnames(timeDf) = gsub("_", "-", colnames(timeDf))
timeDf[ , "metric"] = gsub("_", "-", timeDf[ , "metric"])
x = xtable2(
timeDf,
caption="time df",
label="time df",
digits=3)
if (!is.null(tablesPath)) {
fn = paste0(
tablesPath,
"available_time_for_granularity_of_",
timeCol,
"_for_each_series.tex")
print(
x=x,
file=fn,
include.rownames=FALSE,
hline.after=c(-1, 0, 1:nrow(timeDf)),
size="tiny",
sanitize.text.function=identity)
}
return(list("timeDf"=timeDf, "latexTab"=x))
}
# this function drops columns with zero variance from a data table
DropCols_withZeroVar = function(dt, cols=NULL, thresh=0) {
dt = data.table(dt)
if (is.null(cols)) {
cols = colnames(dt)
}
sds = dt[ , lapply(.SD, function(x)sd(x, na.rm=TRUE)), ]
ind = which(sds > thresh)
cols1 = cols[ind]
cols2 = setdiff(cols, cols1)
dt = SubsetCols(dt, keepCols=cols1)
return(list(
"dt"=dt,
"cols_nonzero_sd"= cols1,
"cols_zero_sd"=cols2))
}
## it interpolates a time series given in a data table
# by using near by values
# the near by values are found by a timeRounding
# we do not attempt to look for the nearest available value
# for computational efficiency reasons
InterpolateSeries_nearTimes = function(
dt,
timeCol,
valueCols,
timeRounding,
idCols=NULL,
AggFunc=function(x)mean(x, na.rm=TRUE)) {
dt = data.table(dt)
newTimeCol = paste0("ts", gsub(" ", "", timeRounding))
dt[ , newTimeCol] = round_date(Col(dt=dt , col=timeCol), timeRounding)
aggDt = DtSimpleAgg(
dt=dt,
gbCols=c(newTimeCol, idCols),
valueCols=valueCols,
AggFunc=AggFunc)
colnames(aggDt) = plyr::mapvalues(
colnames(aggDt), valueCols, paste0(valueCols, "_interp"))
dt2 = merge(dt, aggDt, by=c(idCols, newTimeCol))
for (col in valueCols) {
col2 = paste0(col, "_interp")
col3 = paste0(col, "_final")
dt2[ , col3] = dt2[ , get(col)]
ind = which(is.na(dt2[ , get(col)]))
if (length(ind) > 0) {
str = paste0("dt2[ind,", col3, ":=", col2, "]")
eval(parse(text=str))
}
}
return(list("aggDt"=aggDt, "dt"=dt2, "newTimeCol"=newTimeCol))
}
TestInterpolateSeries_nearTimes = function() {
ts = seq(
from=as.POSIXlt("2011-01-01 23:53"),
to=as.POSIXlt("2011-01-02 23:53"),
by=60)
n = length(ts)
df0 = data.frame(
"ts"=ts,
"x1"=log((1:n)/n) + rnorm(n, sd=1/50),
"x2"=log((2:(n+1))/n) + rnorm(n, sd=1/50))
m = round(5 * n / 6)
ind1 = sort(sample(1:n, m, replace=FALSE))
df0[ind1, "x1"] = NA
ind2 = sort(sample(1:n, m, replace=FALSE))
df0[ind2, "x2"] = NA
res = InterpolateSeries_nearTimes(
dt=df0,
timeCol="ts",
valueCols=c("x1", "x2"),
timeRounding="20 min",
idCols=NULL,
AggFunc=function(x)mean(x, na.rm=TRUE))
dt2 = res[["dt"]]
df2 = data.frame(dt2)
plot(df2[ , "ts"], df2[ , "x1"], col=ColAlpha("blue", 0.4))
lines(df2[ , "ts"], df2[ , "x1_final"], col=ColAlpha("red", 0.4))
}
## specify a target granularity
# the granularity could be FINER than the data
RegGrid_timeSeries = function(
dt,
timeCol,
timeRounding, # 1 min, 5 min, 1 hour, 2 hour, 1 week
valueCols,
idCols=NULL,
startTime=NULL,
endTime=NULL,
AggFunc=function(x)mean(x, na.rm=TRUE)) {
if (is.null(startTime)) {
startTime = min(dt[ , get(timeCol)], na.rm=TRUE)
}
if (is.null(endTime)) {
endTime = max(dt[ , get(timeCol)], na.rm=TRUE)
}
startTime = round_date(startTime, timeRounding)
endTime = round_date(endTime, timeRounding)
gran = as.numeric(lubridate::duration(timeRounding))
ts = seq(
from=startTime,
to=endTime,
by=gran)
# lets round the data up to the prescribed granularity
newTimeCol = paste0("ts", gsub(" ", "", timeRounding))
df0 = data.frame(newTimeCol=ts)
dt0 = data.table(df0)
colnames(dt0) = newTimeCol
dt[ , newTimeCol] = round_date(Col(dt=dt , col=timeCol), timeRounding)
gridDt = merge(dt0, dt, all.x=TRUE, all=FALSE, by=c(newTimeCol, idCols))
## aggregate is necessary if the data is coarser than the original series
# at some intervals
gridDt = DtSimpleAgg(
dt=gridDt,
gbCols=c(newTimeCol, idCols),
valueCols=valueCols,
AggFunc=AggFunc)
return(list(
"gridDt"=gridDt,
"newTimeCol"=newTimeCol,
"startTime"=startTime,
"endTime"=endTime))
}
TestRegGrid_timeSeries = function() {
ts = seq(
from=as.POSIXlt("2011-01-01 1:53"),
to=as.POSIXlt("2011-01-01 05:53"),
by=60)
n = length(ts)
df = data.frame(
"ts"=ts,
"x1"=log((1:n)/n) + rnorm(n, sd=1/50),
"x2"=log((2:(n+1))/n) + rnorm(n, sd=1/50))
m = round(2 * n / 3)
ind1 = sort(sample(1:n, m, replace=FALSE))
df[ind1, "x1"] = NA
ind2 = sort(sample(1:n, m, replace=FALSE))
df[ind2, "x2"] = NA
## drop some time stamps
ind3 = sort(sample(1:n, n-m, replace=FALSE))
df = df[ind3, ]
dt = data.table(df)
timeCol = "ts"
timeRounding = "5 min"
startTime = NULL
endTime = NULL
valueCols = c("x1", "x2")
idCols = NULL
res = RegGrid_timeSeries(
dt=dt,
timeCol=timeCol,
timeRounding=timeRounding, # 1 min, 5 min, 1 hour, 2 hour, 1 week
valueCols=valueCols,
idCols=NULL,
startTime=NULL,
endTime=NULL,
AggFunc=AggFunc)
gridDt = res[["gridDt"]]
plot(dt[ , ts], dt[ , x1], col=ColAlpha("blue", 1/3), pch=10)
points(gridDt[ , ts5min], gridDt[ , x1], col=ColAlpha("red", 1/3), pch=10)
for (v in gridDt[ , ts5min]) {
abline(v=v)
}
points(
gridDt[ , ts5min], rep(-1, nrow(gridDt)),
col=ColAlpha("black", 1/3),
pch=10)
}
## create a regular grid for a multivar (or univar) time series
# and interpolating values
RegGrid_timeSeries_andInterpolate = function(
dt,
timeCol,
timeRoundingGrid, # 1 min, 5 min, 1 hour, 2 hour, 1 week
timeRoundingInterp,
valueCols,
idCols=NULL,
startTime=NULL,
endTime=NULL,
AggFunc=function(x)mean(x, na.rm=TRUE)) {
res = RegGrid_timeSeries(
dt=dt,
timeCol=timeCol,
timeRounding=timeRoundingGrid, # 1 min, 5 min, 1 hour, 2 hour, 1 week
valueCols=valueCols,
idCols=idCols,
startTime=startTime,
endTime=endTime)
gridDt = res[["gridDt"]]
newTimeCol_grid = res[["newTimeCol"]]
startTime = res[["startTime"]]
endTime = res[["endTime"]]
res = InterpolateSeries_nearTimes(
dt=gridDt,
timeCol=newTimeCol_grid,
valueCols=valueCols,
timeRounding=timeRoundingInterp,
idCols=NULL,
AggFunc=AggFunc)
gridDt_interp = res[["dt"]]
newTimeCol_interp = res[["newTimeCol"]]
return(list(
"gridDt"=gridDt,
"gridDt_interp"=gridDt_interp,
"startTime"=startTime,
"endTime"=endTime,
"newTimeCol_grid"=newTimeCol_grid,
"newTimeCol_interp"=newTimeCol_interp))
}
TestRegGrid_timeSeries_andInterpolate = function() {
ts = seq(
from=as.POSIXlt("2011-01-01 01:53:00"),
to=as.POSIXlt("2011-01-01 03:53:00"),
by=60)
n = length(ts)
df = data.frame(
"ts"=ts,
"x1"=log((1:n)/n) + rnorm(n, sd=1/50),
"x2"=log((2:(n+1))/n) + rnorm(n, sd=1/50))
m = round(n / 3)
ind1 = sort(sample(1:n, m, replace=FALSE))
df[ind1, "x1"] = NA
ind2 = sort(sample(1:n, m, replace=FALSE))
df[ind2, "x2"] = NA
## drop some timestamps
ind3 = sort(sample(1:n, n-m, replace=FALSE))
df = df[ind3, ]
dt = data.table(df)
timeCol = "ts"
timeRoundingGrid = "1 min"
timeRoundingInterp = "5 min"
startTime = NULL
endTime = NULL
valueCols = c("x1", "x2")
idCols = NULL
res = RegGrid_timeSeries_andInterpolate(
dt=dt,
timeCol=timeCol,
timeRoundingGrid=timeRoundingGrid,
timeRoundingInterp=timeRoundingInterp, # 1 min, 5 min, 1 hour, 2 hour, 1 week
valueCols=valueCols,
idCols=NULL,
startTime=NULL,
endTime=NULL,
AggFunc=function(x)median(x, na.rm=TRUE))
gridDt = res[["gridDt"]]
gridDt_interp = res[["gridDt_interp"]]
newTimeCol_grid = res[["newTimeCol_grid"]]
newTimeCol_interp = res[["newTimeCol_interp"]]
plot(dt[ , ts], dt[ , x1], col=ColAlpha("blue", 1/3), pch=20)
points(
gridDt[ , get(newTimeCol_grid)],
gridDt[ , x1],
col=ColAlpha("red", 1/3), pch=21)
points(
gridDt_interp[ , get(newTimeCol_grid)],
gridDt_interp[ , x1_final],
col=ColAlpha("black", 1/2), pch=22)
for (v in gridDt[ , get(newTimeCol_grid)]) {
abline(v = v, lty=3, col=ColAlpha("grey", 1/2))
}
}
## just an inspection of the gridding
# for interpolated series
InspectGridding_forInterpolatedSeries = function(
dt,
newTimeCol_grid,
valueCol,
valueCol_final) {
n = 1
ind = (n+1):(n+500)
ind = 1:nrow(dt)
plot(
dt[ind , get(newTimeCol_grid)],
dt[ind , get(valueCol)],
col=ColAlpha("red", 1/2), pch=20)
lines(
dt[ind , get(newTimeCol_grid)],
dt[ind , get(valueCol_final)],
col=ColAlpha("blue", 1/2), pch=22)
}
## make a correlation plot and save it after removing
# variables with zero variace
Plt_tsCor_dropZeroVar = function(
wideDt,
timeCol,
idCols=NULL,
dataName="",
valueCols=NULL,
figsPath=NULL,
latexFn=NULL) {
if (is.null(valueCols)) {
valueCols = setdiff(colnames(wideDt), c(timeCol, idCols))
}
df = data.frame(wideDt)
colnames(df) = colnames(wideDt)
res = DropCols_withZeroVar(dt=df[ , valueCols], cols=NULL, thresh=0)
dt = res[["dt"]]
valueCols2 = res[["cols_nonzero_sd"]]
cols_zero_sd = res[["cols_zero_sd"]]
df2 = df[ , valueCols2]
corMat = cor(df2, use="pairwise.complete.obs")
p = CorPlt(df=df2)
if (!is.null(figsPath)) {
fn0 = paste0(
"correlation_among_metrics_for_granularity_of_",
timeCol,
"_for_",
dataName,
".png")
fn = paste0(figsPath, fn0)
ggsave(p, filename=fn, width=12, height=10, dpi=600, type="cairo")
}
if (!is.null(figsPath) && !is.null(latexFn)) {
LatexFig(
latexFn=latexFn,
figFn=fn0,
figLabel=NULL,
figCaption=NULL,
scale=0.5)
}
return(list(
"cols_nonzero_sd"=valueCols2,
"cols_zero_sd"=cols_zero_sd,
"p"=p,
"corMat"=corMat))
}
## clustering correlated variables
Cluster_correlatedVars = function(
df, cols, clusterNum, absValue=TRUE, figFn=NULL) {
res = DropCols_withZeroVar(dt=df[ , cols], cols=NULL, thresh=0)
dt = res[["dt"]]
cols_nonzero_sd = res[["cols_nonzero_sd"]]
cols_zero_sd = res[["cols_zero_sd"]]
x = data.frame(dt)
# calculate pairwise correlation between all variables
corMat = cor(x, use="pairwise.complete.obs")
if (absValue) {
corMat = abs(corMat)
}
## calculate dissimilarity matrix
# this will have choose(n, 2) elements, where n = length(cols)
distMat = dist(corMat)
## possible methods: "ward.D", "ward.D2",
# "single", "complete", "average"’, "mcquitty", "median", "centroid"
## here we cluster the data
hc = hclust(distMat, method="ward.D")
grp = cutree(hc, k=clusterNum)
table(grp)
rownames(df)[grp == 1]
Plt = function() {
plot(hc, cex=1)
rect.hclust(hc, k=clusterNum, border=2:5)
}
if (!is.null(figFn)) {
r = 3
png(figFn, width=480*r, height=480*r, pointsize=14, res=200)
Plt()
dev.off()
}
return(list(
"hc"=hc,
"grp"=grp,
"Plt"=Plt,
"cols_nonzero_sd"=cols_nonzero_sd,
"cols_zero_sd"=cols_zero_sd))
}
## explores the correlation between multiple value columns in a data table
Explore_CorDt = function(
wideDt,
valueCol,
clusterNum,
figsPath=NULL,
fnSuffix="",
latexFn=NULL) {
corMat = cor(SubsetCols(df=wideDt, keepCols=valueCols), use="pairwise.complete.obs")
corPlt = CorPlt(
df=data.frame(SubsetCols(df=wideDt, keepCols=valueCols)),
fontSizeAlpha=0.3,
fontFace="plain")
corPlt_fn0 = NULL
if (!is.null(figsPath)) {
corPlt_fn0 = paste0(
"correlation_among_metrics_for_",
fnSuffix,
".png")
fn = paste0(figsPath, corPlt_fn0)
ggsave(corPlt, filename=fn, width=12, height=10, dpi=600, type="cairo")
}
if (!is.null(latexFn)) {
LatexFig(
latexFn=latexFn,
figFn=corPlt_fn0,
figLabel=NULL,
figCaption=NULL,
scale=0.5)
}
df = data.frame(wideDt)
colnames(df) = colnames(dt)
res = Cluster_correlatedVars(
df=df,
cols=valueCols,
clusterNum=clusterNum,
absValue=TRUE,
figFn=paste0(figsPath, "variable_clusters.png"))
clusterPlt = res[["Plt"]]
clusterPlt_fn0 = NULL
if (!is.null(figsPath)) {
clusterPlt_fn0 = paste0(
"variable_clustering_based_on_absolute_value_of_correlation_for_",
fnSuffix,
".png")
fn = paste0(figsPath, clusterPlt_fn0)
#Cairo(file=fn, width=15, height=8, dpi=400, type="png")
Cairo(file=fn, type="png")
clusterPlt()
dev.off()
}
if (!is.null(latexFn)) {
LatexFig(
latexFn=latexFn,
figFn=,
figLabel=NULL,
figCaption=NULL,
scale=0.5)
}
return(list(
"corPlt"=corPlt,
"clusterPlt"=clusterPlt,
"corPlt_fileName"=corPlt_fn0,
"clusterPlt_fileName"=clusterPlt_fn0
))
}
TestExplore_CorDt = function() {
library("MASS")
k = 10
mat = mvrnorm(n=100, mu=rep(0, k), Sigma=diag(k))
wideDt = data.table(mat)
valueCols = colnames(wideDt)
res = Explore_CorDt(
wideDt=wideDt,
valueCol=valueCols,
clusterNum=2,
figsPath=NULL,
fnSuffix="",
latexFn=NULL)
}
## function to explore multivariate time series data
# it abbreviates column name
# it can read different timestamp columns via parse_date
# it rounds the timestamp and aggregates using AggFunc which can be passed
# eg AggFunc can be mean or sum
# it creates overlay plots
Explore_multivarTimeSeries = function(
df,
timeCol,
dataName,
timeRounding,
AggFunc=function(x)mean(x, na.rm=TRUE),
colsReplaceStrings=c("prod", "\\.", "/", "&", " and ", "-", "_", ",", ";"),
idCols=NULL,
valueCols=NULL,
figsPath=NULL,
csvPath=NULL,
tablesPath=NULL,
latexFn=NULL) {
df[ , "ts"] = parse_date(df[ , timeCol])
if (timeCol != "ts") {
df = SubsetCols(df, keepCols=NULL, dropCols=timeCol)
}
Func = function(s) {
AbbrString(
s,
wordLength=5,
replaceStrings=colsReplaceStrings,
sep="_",
totalLength=NULL,
wordNumLimit=NULL)
}
colnames(df) = tolower(sapply(FUN=Func, X=colnames(df)))
if (is.null(valueCols)) {
valueCols = setdiff(colnames(df), c("ts", timeCol, idCols))
}
#df[1:10, c("ts", "timestamp")]
dt = data.table(df)
newTimeCol = paste0("ts", gsub(" ", "", timeRounding))
dt[ , newTimeCol] = round_date(Col(dt=dt , "ts"), timeRounding)
dt = SubsetCols(dt, keepCols=NULL, dropCols="ts")
wideDt = DtSimpleAgg(
dt=dt,
gbCols=c(newTimeCol, idCols),
valueCols=valueCols,
AggFunc=function(x){mean(x, na.rm=TRUE)})
longDt = melt(
wideDt,
id.vars=c(newTimeCol, idCols),
variable.name="metric",
value.name="value")
p = Plt_splitCurve_splitPanel(
df=longDt,
xCol=newTimeCol,
valueCol="value",
splitCurveCol="metric",
splitPanelCol=NULL,
alpha=0.3,
size=0.5)
overlayTs_fn0 = NULL
if (!is.null(figsPath)) {
overlayTs_fn0 = paste0(
dataName,
"_raw_",
newTimeCol,
"_average_time_series", ".png")
fn = paste0(figsPath, overlayTs_fn0)
print(fn)
ggsave(p, filename=fn, width=12, height=10, dpi=600, type="cairo")
}
rawSeriesPlt = p
if (!is.null(latexFn)) {
LatexFig(
latexFn=latexFn,
figFn=overlayTs_fn0,
figLabel=NULL,
figCaption=NULL,
scale=0.5)
}
res = Plt_tsCor_dropZeroVar(
wideDt=wideDt,
timeCol=newTimeCol,
dataName=dataName,
valueCols=NULL,
figsPath=figsPath,
latexFn=latexFn)
cols_nonzero_sd = res[["cols_nonzero_sd"]]
cols_zero_sd = res[["cols_zero_sd"]]
corPlt = res[["p"]]
corMat = res[["corMat"]]
res = AvailTime_perMetric(
longDt=longDt,
timeCol=newTimeCol,
metricCol="metric",
valueCol="value",
tablesPath=NULL)
latexTab = res[["latexTab"]]
timeDf = res[["timeDf"]]
availTime_fn0 = NULL
if (!is.null(csvPath)) {
availTime_fn0 = paste0("avail_time_interval_", dataName, ".csv")
fn = paste0(csvPath, availTime_fn0)
print(fn)
write.csv(file=fn, x=timeDf, rownames=FALSE)
}
availTime_latexTab_fn0 = NULL
if (!is.null(tablesPath)) {
x = xtable2(
timeDf,
caption="time df",
label="time df",
digits=3)
availTime_latexTab_fn0 = paste0("avail_time_interval_", dataName, ".tex")
fn = paste0(tablesPath, availTime_latexTab_fn0)
print(
x=x, file=fn, include.rownames=FALSE,
hline.after=c(-1, 0, 1:nrow(timeDf)),
size="tiny",
sanitize.text.function=identity)
print("this table file was created:")
print(fn)
}
return(list(
"longDt"=longDt,
"wideDt"=wideDt,
"timeDf"=timeDf,
"newTimeCol"=newTimeCol,
"rawSeriesPlt"=rawSeriesPlt,
"corPlt"=corPlt,
"availTime_latexTab"=latexTab,
"cols_nonzero_sd"=cols_nonzero_sd,
"cols_zero_sd"=cols_zero_sd,
"overlay_fileName"=overlayTs_fn0,
"availTime_csv_fileName"=availTime_fn0,
"availTime_latexTab_fileName"=availTime_latexTab_fn0))
}
## this explores a list of each being
# a multivar time series given in wide format
# it also merges them together
Explore_multivarTimeSeries_fromList = function(
dfList,
timeRounding,
AggFunc,
figsPath="",
tablesPath="",
latexFn=NULL) {
k = length(dfList)
dataList = list()
for (i in 1:k) {
df = dfList[[i]]
dataName = names(dfList)[i]
print(paste("processing:", dataName))
dataName = AbbrString(
dataName,
wordLength=5,
replaceStrings=c(
"csv", "prod", "\\.", "/", "&", " and ", "-", "_", ",",
";"),
sep="_")
print(paste("new data name is: ", dataName))
colnames(df)[1] = "timestamp"
res = Explore_multivarTimeSeries(
df=df,
timeCol="timestamp",
dataName=dataName,
timeRounding=timeRounding,
AggFunc=AggFunc,
valueCols=NULL,
figsPath=figsPath,
tablesPath=tablesPath,
latexFn=latexFn)
longDt0 = res[["longDt"]]
wideDt0 = res[["wideDt"]]
longDt0[ , "group"] = dataName
timeCol = res[["newTimeCol"]]
if (i == 1) {
longDt = longDt0
wideDt = wideDt0
} else {
longDt = rbind(longDt, longDt0)
wideDt = merge(wideDt, wideDt0, by=timeCol, all=TRUE)
}
dataList[[dataName]] = res
}
return(list(
"dataList"=dataList,
"longDt"=longDt,
"wideDt"=wideDt,
"timeCol"=paste0("ts", gsub(" ", "", timeRounding))))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.