R/Job.R

Defines functions eval_delayed

Documented in eval_delayed

#' Helper Function to Evaluate Delayed
#' @param to_eval a list as generated from Delayed$prepare_eval()
#' @param timeout a timeout indicating when to terminate the job
#' @export
#' @importFrom R.utils withTimeout TimeoutException
#' @importFrom R.oo throw
eval_delayed <- function(to_eval, timeout = Inf) {
  if (timeout < 0) {
    R.oo::throw(R.utils::TimeoutException("time exhausted in other steps"))
  }

  result <- R.utils::withTimeout(
    {
      rlang::eval_bare(
        expr = to_eval$expr,
        env = to_eval$env
      )
    },
    timeout = timeout
  )
  return(result)
}

#' Evaluation of Delayed Objects
#'
#' @description A \code{Job} encapsulates the act of evaluating a given
#' \code{delayed} object in a particular way. \code{SequentialJob}s evaluate
#' immediately, blocking the current process until they are complete.
#' \code{FutureJob}s leverages \pkg{future} to evaluate according to the
#' specified \code{\link[future]{plan}}.
#'
#' @docType class
#'
#' @importFrom R6 R6Class
#'
#' @keywords internal
Job <- R6Class(
  classname = "Job",
  cloneable = FALSE,
  portable = TRUE,
  class = TRUE,
  public = list(
    initialize = function(delayed_object) {
      private$.delayed_object <- delayed_object
      delayed_object$register_job(self)
      invisible(self)
    }
  ),

  active = list(
    finished = function() {
      return(FALSE)
    },

    value = function() {
      return(private$.result)
    },

    runtime = function() {
      return(private$.runtime)
    }
  ),

  private = list(
    .delayed_object = NULL,
    .result = NULL,
    .runtime = NULL
  )
)

################################################################################

#' Sequential Delayed Jobs
#'
#' @description A \code{Job} that will evaluate immediately (i.e., in a
#' sequential fashion), blocking the current process until it completes.
#'
#' @docType class
#'
#' @importFrom R6 R6Class
#'
#' @examples
#' d <- delayed(3 + 4)
#' sched <- Scheduler$new(d, SequentialJob)
#' @export
SequentialJob <- R6Class(
  classname = "SequentialJob",
  cloneable = FALSE,
  portable = TRUE,
  class = TRUE,
  inherit = Job,
  public = list(
    initialize = function(delayed_object) {
      to_eval <- delayed_object$prepare_eval()
      start_time <-proc.time()

      private$.result <- try({
        set.seed(delayed_object$seed)
        eval_delayed(to_eval, delayed_object$timeout)
      })

      private$.runtime <- (proc.time()-start_time)[[3]]
      super$initialize(delayed_object)
    }
  ),

  active = list(
    finished = function() {
      return(TRUE)
    },
    value = function() {
      return(private$.result)
    }
  ),

  private = list(
    .worker = NULL,
    .result_uuid = NULL
  )
)

################################################################################

#' Future Delayed Jobs
#'
#' @description A \code{Job} that leverages the \code{future} framework to
#' evaluate asynchronously.
#'
#' @docType class
#'
#' @importFrom R6 R6Class
#' @importFrom future future resolved value
#'
#' @examples
#' library(future)
#' plan(multicore, workers = 1)
#' d <- delayed(3 + 4)
#' sched <- Scheduler$new(d, FutureJob, nworkers = 1)
#' @export
FutureJob <- R6Class(
  classname = "FutureJob",
  cloneable = FALSE,
  portable = TRUE,
  class = TRUE,
  inherit = Job,
  public = list(
    initialize = function(delayed_object) {
      env <- list(
        eval_delayed = eval_delayed,
        to_eval = delayed_object$prepare_eval(),
        timeout = delayed_object$timeout
      )

      private$.start_time <- proc.time()
      private$.future <- future(
        expr = quote(eval_delayed(to_eval, timeout)),
        # expr = to_eval$expr,
        # env = to_eval$env,
        substitute = FALSE,
        globals = env,
        packages = NULL,
        seed = delayed_object$seed
      )
      super$initialize(delayed_object)
    }
  ),

  active = list(
    finished = function() {
      finished <- resolved(private$.future)
      if (finished) {
        private$.runtime <- (proc.time() - private$.start_time)[[3]]
      }
      return(finished)
    },
    value = function() {
      if (is.null(private$.result)) {
        private$.result <- value(private$.future, signal = FALSE)
      }
      return(private$.result)
    }
  ),

  private = list(
    .future = NULL,
    .start_time = NULL
  )
)
jeremyrcoyle/delayed documentation built on Oct. 26, 2022, 7:24 a.m.