R/civis_future.R

Defines functions make_docker_cmd fetch_logs.CivisFuture resolved.CivisFuture resolved cancel.CivisFuture cancel result.CivisFuture run.CivisFuture CivisFuture civis_platform

Documented in cancel cancel.CivisFuture CivisFuture civis_platform fetch_logs.CivisFuture resolved resolved.CivisFuture result.CivisFuture run.CivisFuture

#' @importFrom future run result
NULL

#' Evaluate an expression in Civis Platform
#'
#' This is used as with the \code{\link{future}} API as an argument to \code{\link{plan}}.
#'
#' @param ... Arguments to \code{\link{CivisFuture}} and then \code{\link{scripts_post_containers}}
#' @return The result of evaluating \code{expr}.
#' @examples \dontrun{
#'
#'  plan(civis_platform)
#'
#'  # Specify required resources, image, and tag.
#'  fut <- future({2 + 2},
#'    required_resources = list(cpu = 1024, memory = 2048),
#'    docker_image_name = "civisanalytics/datascience-r",
#'    docker_image_tag = "3")
#'
#'  # Evaluate the future later
#'  fut <- future({2 + 2}, lazy = TRUE)
#'  run(fut)
#'
#'  # check if a future has resolved
#'  resolved(fut)
#'
#'  # block until the future resolves, and return the value or throw error
#'  value(fut)
#'
#'  # cancel the job
#'  cancel(fut)
#'
#'  # grab the run logs
#'  fetch_logs(fut)
#'
#'  # handle errors
#'  fut <- future({stop("Error!")})
#'  e <- tryCatch(value(fut), error = function(e) e)
#'  get_error(e)
#' }
#'
#' @export
civis_platform <- function(...) {
  future <- CivisFuture(...)
  if (!future$lazy) future <- run(future)
  future
}
class(civis_platform) <- c("CivisFuture", "future", "function")

#' Evaluate an expression in Civis Platform
#' @inheritParams future::Future
#' @param local deprecated as of \code{civis} v3.0.1
#' @param required_resources resources, see \code{\link{scripts_post_containers}}
#' @param docker_image_name the image for the container script.
#' @param docker_image_tag the tag for the Docker image.
#' @param ... arguments to \code{\link{scripts_post_containers}}
#'
#' @return A \code{CivisFuture} inheriting from \code{\link{Future}} that evaluates \code{expr} on the given container.
#'
#' @export
CivisFuture <- function(expr = NULL,
                        envir = parent.frame(),
                        substitute = FALSE,
                        globals = TRUE,
                        packages = NULL,
                        lazy = FALSE,
                        local = lifecycle::deprecated(),
                        gc = FALSE,
                        earlySignal = FALSE,
                        label = NULL,
                        required_resources = list(cpu = 1024, memory = 2048, diskSpace = 4),
                        docker_image_name = "civisanalytics/datascience-r",
                        docker_image_tag = "latest",
                         ...) {

  if (lifecycle::is_present(local)) {
    lifecycle::deprecate_soft(when = "3.0.1",
                              what = "civis::CivisFuture(local)")
  }
  
  gp <- future::getGlobalsAndPackages(expr, envir = envir, globals = globals)

  ## if there are globals, assign them in envir
  env <- new.env()
  if (length(gp) > 0) {
    env <- list2env(gp$globals)

  }

  future <- future::Future(expr = expr,
                           envir = env,
                           substitute = substitute,
                           globals = gp$globals,
                           packages = unique(c(packages, gp$packages)),
                           lazy = lazy,
                           gc = gc,
                           earlySignal = earlySignal,
                           label = label,
                           version = "1.8",  # see: https://github.com/civisanalytics/civis-r/issues/168

                           # extra args to scripts_post_containers
                           required_resources = required_resources,
                           docker_image_name = docker_image_name,
                           docker_image_tag = docker_image_tag, ...)
  future$.callResult <- TRUE
  structure(future, class = c("CivisFuture", class(future)))
}

#' @export
#' @param future CivisFuture object.
#' @describeIn CivisFuture Run a CivisFuture
run.CivisFuture <- function(future, ...) {
  if (is.null(future$job$containerId)) {
    cargo <- c(expr = future$expr, envir = future$envir,
               packages = list(future$packages))
    task_file_id <- write_civis_file(cargo)
    runner_file_id <- upload_runner_script()
    cmd <- make_docker_cmd(task_file_id, runner_file_id)
    future$job <- scripts_post_containers(future$required_resources,
                                          docker_command = cmd,
                                          docker_image_name = future$docker_image_name,
                                          docker_image_tag = future$docker_image_tag, ...)
    future$job <- scripts_post_containers_runs(future$job$id)
    future$state <- c("running")
  }
  return(future)
}

#' @export
#' @param future CivisFuture object.
#' @describeIn CivisFuture Return the value of a CivisFuture
result.CivisFuture <- function(future, ...) {
  if (future$state == "created") {
    future <- run(future)
  }

  # if the result isn't collected, try to collect it.
  if (is.null(future$result)) {
    tryCatch({
      future$run <- await(scripts_get_containers_runs, id = future$job$containerId,
                          run_id = future$job$id)
      civis_state <- future$run$state
      
      # `future` > v1.31.0 no longer recognizes "success"/"succeeded" as a resolved state
      # See Issue #245
      if (civis_state %in% c("success", "succeeded")) {
        future$state <- "finished"
      } else {
        future$state <- civis_state
      }
      
      value <- read_civis(civis_script(future$job$containerId),
                          using = readRDS)[[1]]
      future$result <- future::FutureResult(value=value)
      future$logs  <- fetch_logs(future$run)
    }, civis_error = function(e) {
      future$state <- "failed"
      e$message <- paste0(c(e$message, fetch_logs(e)), collapse = "\n")
      stop(e)
    }, error = function(e) stop(e))
  }
  future$result
}

#' Cancel the evaluation of a CivisFuture.
#' @param future CivisFuture object.
#' @param ... unused for CivisFuture.
#' @export
cancel <- function(future, ...) {
  UseMethod("cancel")
}

#' @export
#' @describeIn CivisFuture Cancel a CivisFuture
cancel.CivisFuture <- function(future, ...) {
  scripts_delete_containers_runs(id = future$job$containerId, run_id = future$job$id)
  future$state <- "cancelled"
}

#' Check whether a CivisFuture has resolved.
#' @param future CivisFuture object.
#' @param ... unused for CivisFuture.
#' @export
resolved <- function(future, ...) {
  UseMethod("resolved")
}

#' @export
#' @param future CivisFuture object.
#' @describeIn CivisFuture Check if a CivisFuture has resolved
resolved.CivisFuture <- function(future, ...){
  if (!is.null(future$job$containerId)) {
    civis_state <- scripts_get_containers_runs(id = future$job$containerId,
                                               run_id = future$job$id)$state
    
    # `future` > v1.31.0 no longer recognizes "success"/"succeeded" as a resolved state
    # See Issue #245
    if (civis_state %in% c("success", "succeeded")) {
      future$state <- "finished"
    } else {
      future$state <- civis_state
    }
  }
  future$state %in% c("finished", "failed", "cancelled")
}

#' @export
#' @param object CivisFuture
#' @describeIn CivisFuture Fetch logs from a CivisFuture
fetch_logs.CivisFuture <- function(object, ...){
  object$logs
}

make_docker_cmd <- function(task_file_id, run_script_file_id) {
  # this loads the same initial packages in the same order as default R.
  sprintf("Rscript -e \"civis::download_civis(%i, 'run.R')\" && \
    Rscript --default-packages=methods,datasets,utils,grDevices,graphics,stats run.R %i",
          run_script_file_id, task_file_id)
}

upload_runner_script <- function() {
  path <- system.file("scripts", "r_remote_eval.R", package = "civis")
  write_civis_file(path, name = "r_remote_eval.R")
}

Try the civis package in your browser

Any scripts or data that you put into this service are public.

civis documentation built on April 1, 2023, 12:01 a.m.