library(knitr)

knitr::opts_chunk$set(collapse = TRUE, comment = "#>")

Example Scenario

Let's imagine we have a need to process a good amount of data, where on one hand there is a substantial number of groups (subsets) that need to be processed separately on the other hand processing each group takes a considerable amount of time. Thus, it makes much sense to process these subsets of data in parallel.

We're still exploring the logic so ideally we would like to wrap up all that logic in an execution unit and run it a number of R processes. In Linux it is pretty straightforward with mclapply from the parallel package. But what if we need to do it in Windows? Or if those processes live on another computer? How do we distribute the logic to a remote environment and run it there?

Enters defer. Here's how one can wrap up all necessary functions and information about their dependencies and prepare an execution package that can be then serialized, sent over to another R environment, deserialized and finally executed - all with minimal bookkeeping overhead.

Data processing code

Here we have the code: a simple ETL process, a train and test procedure and a some glue code that put it all together. We're processing time series data: first we add extra columns and then verify how well those features do in a predictive modelling scenario with a simple linear model, in a series train/test run determined by a moving test window.

library(lubridate)
library(dplyr)
library(magrittr)

# data: timestamp and value
etl <- function (data) {
  # replace NAs with the average of surrounding values
  data %<>%
    mutate(lead_lag_mean = (lead(value, 1) + lag(value, 1))/2)
  i <- is.na(data$value)
  data$value[i] <- data$lead_lag_mean[i]

  # cast timestapm, add lags
  data %>%
    select(-lead_lag_mean) %>%
    mutate(timestamp = floor_date(timestamp, "hour")) %>%
    group_by(timestamp) %>%
    summarise(value = mean(value)) %>%
    ungroup %>%
    mutate(lag_1 = lag(value, 1),
           lag_2 = lag(value, 2),
           lag_3 = lag(value, 3))
}

score <- function (data, n) {
  train <- tail(data, n = -n)
  test  <- tail(data, n = n)[1, ]

  m <- lm(value ~ ., data = train)
  test$value - predict(m, test)
}

glue <- function (data, n = 10) {
  data <- etl(data)
  ans <- lapply(seq(n), function (n) {
    score(data, n)
  })
  test_residuals <- unlist(ans)
  sqrt(mean(test_residuals ^ 2))
}

Sample data:

n_samples <- 4*96
time_series <- data_frame(timestamp = as.POSIXct("2017-05-01") + seq(n_samples) * 900,
                          value = rnorm(n_samples))

Prepare an execution package

The following call packages the glue() function so that it can be copied (serialized and deserialized) to another R session and run there, provided that the same third-party packages (lubridate, dplyr, magrittr) are present in the library.

library(defer)
deferred_glue <- defer_(glue, score = score, etl = etl)

Here's how one can run it:

deferred_glue(time_series)

And here's how it can be serialized and then unserialized in a new R session and still sucessfully run:

saveRDS(deferred_glue, "glue.rds")
rm(glue, etl, score, deferred_glue)
deferred_glue <- readRDS("glue.rds")
deferred_glue(time_series)


lbartnik/defer documentation built on May 20, 2019, 8:27 p.m.