R/datashield.aggregate.R

Defines functions datashield.aggregate

Documented in datashield.aggregate

#' Data aggregation
#'
#' Aggregates the expression result using the specified aggregation method in the current Datashield session.
#'
#' @param conns \code{\link{DSConnection-class}} object or a list of \code{\link{DSConnection-class}}s.
#' @param expr Expression to evaluate.
#' @param async Whether the result of the call should be retrieved asynchronously. When TRUE (default) the calls are parallelized over
#'   the connections, when the connection supports that feature, with an extra overhead of requests.
#' @param success Callback function that will be called each time an aggregation result is received from a connection. 
#'   The expected function signature is the connection/study name and the result value. Default is NULL (no callback).
#' @param error Callback function that will be called each time the aggregation request has failed. 
#'   The expected function signature is the connection/study name and the error message. Default is NULL (no callback).
#'
#' @return The result of the aggregation
#'
#' @examples
#'\dontrun{
#' # call aggregate function on server side asynchronously
#' # i.e. each study connection will process the request in parallel
#' result <- datashield.aggregate(conns, expr = quote(someFunction(D, 123)))
#'
#' # call aggregate function on server side synchronously, i.e. each study 
#' # connection will be called, one after the other, in a blocking way 
#' result <- datashield.aggregate(conns, expr = quote(someFunction(D, 123)), async = FALSE)
#' 
#' # call aggregate functions that are defined in the provided named list. 
#' # Connections are filtered by the list names.
#' result <- datashield.aggregate(conns,
#'   list(server1=quote(someFunction(D, 123)), server2=quote(someFunction(G, 456))))
#' 
#' # call aggregate function with callback functions
#' result <- datashield.aggregate(conns, expr = quote(someFunction(D, 123)),
#'   success = function(server, res) {
#'     # do something with server's result value
#'   },
#'   error = function(server, error) {
#'     # do something with server's error message
#'   })
#' }
#'
#' @export
datashield.aggregate <- function(conns, expr, async=TRUE, success=NULL, error=NULL) {
  .clearLastErrors()
  rval <- NULL
  
  # prepare expressions as a named list
  exprs <- .asNamedListOfValues(conns, expr)
  
  if (is.list(conns)) {
    # filter connections to aggregate 
    fconns <- .filterConnectionsByName(conns, names(exprs))
    
    results <- list()
    async <- lapply(fconns, function(conn) { ifelse(async, dsIsAsync(conn)$aggregate, FALSE) })
    pb <- .newProgress(total = 1 + length(fconns))
    # async first
    for (n in names(fconns)) {
      if(async[[n]]) {
        tryCatch({
          results[[n]] <- dsAggregate(fconns[[n]], exprs[[n]], async=TRUE)
        }, error = function(e) {
          .appendError(n, e$message)
          if (.is.callback(error)) {
            error(n, e$message)
          }
        })
      }
    }
    # not async (blocking calls)
    for (n in names(fconns)) {
      if(!async[[n]]) {
        tryCatch({
          .tickProgress(pb, tokens = list(what = paste0("Aggregating ", fconns[[n]]@name, " (", .deparse(exprs[[n]]), ")")))
          results[[n]] <- dsAggregate(fconns[[n]], exprs[[n]], async=FALSE)
        }, error = function(e) {
          .appendError(n, e$message)
          if (.is.callback(error)) {
            error(n, e$message)
          }
        })
      }
    }
    # polling
    rval <- replicate(length(fconns), NULL)
    names(rval) <- names(fconns)
    completed <- replicate(length(fconns), FALSE)
    names(completed) <- names(fconns)
    checks <- 1
    while (!all(completed)) {
      for (n in names(fconns)) {
        dexpr <- .deparse(exprs[[n]])
        if (!completed[[n]]) {
          tryCatch({
            if (!.hasLastErrors(n)) {
              .updateProgress(pb, step = length(subset(completed, completed == TRUE)), total = length(fconns), tokens = list(what = paste0("Checking ", fconns[[n]]@name, " (", dexpr, ")")))
              if (async[[n]]) {
                completed[[n]] <- dsIsCompleted(results[[n]])
                if (completed[[n]]) {
                  .tickProgress(pb, tokens = list(what = paste0("Getting aggregate ", fconns[[n]]@name, " (", dexpr, ")")))
                  rval[[n]] <- dsFetch(results[[n]])  
                  if (.is.callback(success)) {
                    success(n, rval[[n]])
                  }
                }
              } else {
                completed[[n]] <- TRUE
                .tickProgress(pb, tokens = list(what = paste0("Getting aggregate ", fconns[[n]]@name, " (", dexpr, ")")))
                rval[[n]] <- dsFetch(results[[n]]) 
                if (.is.callback(success)) {
                  success(n, rval[[n]])
                }
              }
            } else {
              completed[[n]] <- TRUE
              rval[[n]] <- NULL
            }
          }, error = function(e) {
            .appendError(n, e$message)
            completed[[n]] <- TRUE
            rval[[n]] <- NULL
            if (.is.callback(error)) {
              error(n, e$message)
            }
          })
        } else {
          # heart beat request
          dsKeepAlive(fconns[[n]])
        }
      }
      if (!all(completed)) {
        .updateProgress(pb, step = length(subset(completed, completed == TRUE)), total = length(fconns), tokens = list(what = paste0("Waiting... ", " (", ifelse(is.vector(expr), "...", .deparse(expr)), ")")))
        Sys.sleep(.getSleepTime(checks))
        checks <- checks + 1
      }
    }
    .tickProgress(pb, tokens = list(what = paste0("Aggregated (", ifelse(is.vector(expr), "...", .deparse(expr)), ")")))
  } else {
    rval <- tryCatch({
      res <- dsAggregate(conns, exprs[[conns@name]])
      dsFetch(res)
    }, error = function(e) {
      .appendError(conns@name, e$message)
      if (.is.callback(error)) {
        error(conns@name, e$message)
      }
      NULL
    })
    if (.is.callback(success) && !.hasLastErrors(conns@name)) {
      success(conns@name, rval)
    }
  }
  .checkLastErrors()
  rval
}

Try the DSI package in your browser

Any scripts or data that you put into this service are public.

DSI documentation built on Oct. 6, 2022, 9:06 a.m.