#' Rolling apply a function on a data frame or matrix.
#'
#' @param data A data frame or a matrix.
#' @param width The rolling window size.
#' @param .fun function whose first parameter is the \code{data}.
#' @param ... Additional parameter passed to \code{.fun}
#' @param parallel parallel computing. Default is FALSE. If parallel is positive,
#' parallel is the number of cores. If parallel is negative, the number of cores is \code{detectCores() + parallel}
#' @param .export character vector of variables to export. Default is NULL.
#' @param .packages character vector of packages that tasks depends on.
#' @param progress The name of the file that is written into progress information. Default FALSE.
#' @return A list of result.
#' @export
#' @importFrom foreach foreach %do% %dopar%
#' @importFrom parallel detectCores clusterExport makeCluster stopCluster
#' @importFrom doParallel registerDoParallel
#' @examples
#' res <- roll_apply(matrix(seq(10^4 * 3), ncol = 3), 100, function(m) eigen(cov(m))$values, parallel = -1, progress = "progress.txt")
roll_apply <- function(data, width, .fun, ..., parallel = FALSE, .export = NULL,
.packages = NULL, progress = FALSE) {
.fun <- match.fun(.fun)
nobs <- nrow(data)
if (nobs < width) {
stop("The size of window is larger than row number of the data.")
}
num_wins <- nobs - width + 1
vec <- seq(num_wins)
progress_messages <- function(i, start_time) {
diff_time <- Sys.time() - start_time
time_unit <- units(diff_time)
perc_done <- round(i * 100 / num_wins, 2)
perc_todo <- 100 - perc_done
messages <- paste0(perc_done, "% done!")
messages <- paste(messages, diff_time, time_unit, "used! Expected",
perc_todo / perc_done * diff_time, time_unit, "left")
return(messages)
}
if (class(progress) == "character") {
if (!file.exists("progress")) {
dir.create("progress")
}
progress <- paste0("progress/", progress)
if (file.exists(progress)) {
file.remove(progress)
}
}
# ugly structure
if (parallel) {
if (isTRUE(parallel)) {
cores <- detectCores()
} else {
if (parallel > 0) {
cores <- parallel
} else if (abs(parallel) < detectCores()) {
cores <- detectCores() + parallel
} else {
stop("The reduced number is larger than the number of the detected cores.")
}
}
if (class(progress) == "character") {
cl <- makeCluster(cores, outfile = file.path(progress))
} else {
cl <- makeCluster(cores)
}
# .export does not work?
clusterExport(cl, .export)
registerDoParallel(cl)
if (class(progress) == "character") {
start_time <- Sys.time()
res <- foreach(i = vec, .packages = .packages) %dopar% {
messages <- progress_messages(i, start_time)
writeLines(messages)
.fun(data[i:(i + width - 1), ], ...)
}
} else {
res <- foreach(i = vec, .packages = .packages) %dopar% {
.fun(data[i:(i + width - 1), ], ...)
}
}
} else {
if (class(progress) == "character") {
con <- file(progress, "w+")
start_time <- Sys.time()
res <- foreach(i = vec, .packages = .packages) %do% {
messages <- progress_messages(i, start_time)
writeLines(messages, con)
.fun(data[i:(i + width - 1), ], ...)
}
close(con)
} else {
res <- foreach(i = vec, .packages = .packages) %do% {
.fun(data[i:(i + width - 1), ], ...)
}
}
}
if (parallel) {
stopCluster(cl)
}
return(res)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.