R/task_bundle.R

##' Create a task bundle.  Generally these are not created manually,
##' but this page serves to document what task bundles are and the
##' methods that they have.
##'
##' A task bundle exists to group together tasks that are related.  It
##' is possible for a task to belong to multiple bundles.
##'
##' @template task_bundle_methods
##'
##' @title Create a task bundle
##' @param obj An observer or queue object
##' @param tasks A list of tasks
##' @param groups Optional vector of groups.  If given, then additional
##'    tasks can be added to the bundle if they share the same group names.
##' @param names Optional vector of names to label output with.
##' @export
task_bundle <- function(obj, tasks, groups=NULL, names=NULL) {
  ## TODO: What is groups used for here?  Seems no longer needed?
  .R6_task_bundle$new(obj, tasks, groups, names)
}

## TODO: Next, make an automatically updating version.
.R6_task_bundle <- R6::R6Class(
  "task_bundle",

  public=
    list(
      obj=NULL,
      tasks=NULL,
      key_complete=NULL,
      groups=NULL,
      names=NULL,
      con=NULL,
      keys=NULL,

      initialize=function(obj, tasks, groups, names) {
        self$con <- obj$con
        self$keys <- obj$keys
        self$obj <- obj
        self$tasks <- setNames(tasks, vcapply(tasks, "[[", "id"))

        self$key_complete <- unique(vcapply(tasks, "[[", "key_complete"))
        self$groups <- groups

        if (!is.null(names) && length(names) != length(tasks)) {
          stop("Incorrect length names")
        }
        self$names <- names
      },

      ids=function() {
        names(self$tasks)
      },

      update_groups=function() {
        task_ids <- setdiff(tasks_in_groups(self$con, self$keys, self$groups),
                            self$ids())
        if (length(task_ids)) {
          tasks <- setNames(lapply(task_ids, self$obj$task_get), task_ids)
          self$tasks <- c(self$tasks, tasks)
          self$key_complete <- union(self$key_complete,
                                     unique(vcapply(tasks, "[[", "key_complete")))
          ## Can't deal with this for now :(
          self$names <- NULL
        }
        invisible(task_ids)
      },

      overview=function() {
        tasks_overview(self$con, self$keys, self$ids())
      },
      status=function(follow_redirect=FALSE) {
        self$obj$tasks_status(self$ids(), follow_redirect=follow_redirect)
      },
      results=function(follow_redirect=FALSE) {
        self$wait(0, 0, FALSE, follow_redirect)
      },
      wait=function(timeout=60, time_poll=1, progress_bar=TRUE, follow_redirect=FALSE) {
        task_bundle_wait(self, timeout, time_poll, progress_bar, follow_redirect)
      },
      wait1=function(timeout=60, time_poll=1, follow_redirect=FALSE) {
        task_bundle_wait1(self, timeout, time_poll, follow_redirect)
      },
      times=function(unit_elapsed="secs") {
        tasks_times(self$con, self$keys, self$ids(), unit_elapsed)
      },


      delete_tasks=function() {
        invisible(self$obj$tasks_drop(self$ids()))
      }))


## There are a bunch of ways of getting appropriate things here:
task_bundle_get <- function(obj, groups=NULL, task_ids=NULL) {
  if (!xor(is.null(task_ids), is.null(groups))) {
    stop("Exactly one of task_ids or groups must be given")
  }
  if (is.null(groups)) {
    groups <- obj$tasks_lookup_group(task_ids)
  } else {
    task_ids <- obj$tasks_in_groups(groups)
  }

  tasks <- lapply(task_ids, obj$task_get)
  names(tasks) <- task_ids
  task_bundle(obj, tasks, groups)
}


task_bundle_wait <- function(obj, timeout, time_poll, progress_bar, follow_redirect) {
  assert_integer_like(time_poll)
  task_ids <- obj$ids()
  status <- obj$status()
  done <- !(status == TASK_PENDING | status == TASK_RUNNING |
              status == TASK_ORPHAN)

  ## Immediately collect all results:
  results <- named_list(task_ids)
  if (any(done)) {
    results[done] <- lapply(obj$tasks[done],
                            function(t) t$result(follow_redirect))
  }

  cleanup <- function(results, names) {
    if (!is.null(names)) {
      names(results) <- names
    }
    results
  }
  if (all(done)) {
    return(cleanup(results, obj$names))
  } else if (timeout == 0) {
    stop("Tasks not yet completed; can't be immediately returned")
  }

  p <- progress(total=length(obj$tasks), show=progress_bar)
  t0 <- Sys.time()
  timeout <- as.difftime(timeout, units="secs")

  p(sum(done))
  i <- 1L
  while (!all(done)) {
    if (Sys.time() - t0 > timeout) {
      stop(sprintf("Exceeded maximum time (%d / %d tasks pending)",
                   sum(!done), length(done)))
    }
    res <- task_bundle_fetch1(obj, time_poll, follow_redirect)
    if (is.null(res)) {
      p(0)
    } else {
      p(1)
      task_id <- res[[1]]
      result <- res[[2]]
      done[[task_id]] <- TRUE
      ## NOTE: This conditional is needed to avoid deleting the
      ## element in results if we get a NULL result.
      if (!is.null(result)) {
        results[[task_id]] <- result
      }
    }

  }
  cleanup(results, obj$names)
}

task_bundle_wait1 <- function(obj, timeout, time_poll, follow_redirect) {
  status <- obj$status(follow_redirect)
  done <- !(status == TASK_PENDING | status == TASK_RUNNING |
              status == TASK_ORPHAN)
  if (all(done)) {
    done <- vnapply(obj$key_complete, obj$obj$con$LLEN) == 0
  }
  if (all(done)) {
    return(NULL)
  }
  times_up <- time_checker(timeout)
  repeat {
    if (times_up()) {
      return(NULL)
    }
    res <- task_bundle_fetch1(obj, time_poll, follow_redirect)
    if (!is.null(res)) {
      return(res)
    }
  }
}

task_bundle_fetch1 <- function(bundle, timeout, follow_redirect) {
  if (as.integer(timeout) > 0) {
    res <- bundle$con$BLPOP(bundle$key_complete, timeout)
  } else {
    res <- lpop_mult(bundle$con, bundle$key_complete)
  }
  if (!is.null(res)) {
    id <- res[[2]]
    list(id=id,
         result=bundle$obj$task_result(id, follow_redirect=follow_redirect))

  } else {
    NULL
  }
}
traitecoevo/rrqueue documentation built on May 31, 2019, 7:44 p.m.