R/react.R

Defines functions flushReact getCurrentContext hasCurrentContext getDummyContext wrapForContext reactivePromiseDomain

processId <- local({
  # pid is not sufficient to uniquely identify a process, because
  # distributed futures span machines which could introduce pid
  # collisions.
  cached <- NULL
  function() {
    if (is.null(cached)) {
      cached <<- digest::digest(list(
        Sys.info(),
        Sys.time()
      ))
    }
    # Sys.getpid() cannot be cached because forked children will
    # then have the same processId as their parents.
    paste(cached, Sys.getpid())
  }
})

#' @include graph.R
Context <- R6Class(
  'Context',
  portable = FALSE,
  class = FALSE,
  public = list(
    id = character(0),
    .reactId = character(0),
    .reactType = "other",
    .label = character(0),      # For debug purposes
    .invalidated = FALSE,
    .invalidateCallbacks = list(),
    .flushCallbacks = list(),
    .domain = NULL,
    .pid = NULL,
    .weak = NULL,

    initialize = function(
      domain, label='', type='other', prevId='',
      reactId = rLog$noReactId,
      id = .getReactiveEnvironment()$nextId(), # For dummy context
      weak = FALSE
    ) {
      id <<- id
      .label <<- label
      .domain <<- domain
      .pid <<- processId()
      .reactId <<- reactId
      .reactType <<- type
      .weak <<- weak
      rLog$createContext(id, label, type, prevId, domain)
    },
    run = function(func) {
      "Run the provided function under this context."

      promises::with_promise_domain(reactivePromiseDomain(), {
        withReactiveDomain(.domain, {
          env <- .getReactiveEnvironment()
          rLog$enter(.reactId, id, .reactType, .domain)
          on.exit(rLog$exit(.reactId, id, .reactType, .domain), add = TRUE)
          env$runWith(self, func)
        })
      })
    },
    invalidate = function() {
      "Invalidate this context. It will immediately call the callbacks
        that have been registered with onInvalidate()."

      if (!identical(.pid, processId())) {
        stop("Reactive context was created in one process and invalidated from another")
      }

      if (.invalidated)
        return()
      .invalidated <<- TRUE

      rLog$invalidateStart(.reactId, id, .reactType, .domain)
      on.exit(rLog$invalidateEnd(.reactId, id, .reactType, .domain), add = TRUE)

      lapply(.invalidateCallbacks, function(func) {
        func()
      })
      .invalidateCallbacks <<- list()
      NULL
    },
    onInvalidate = function(func) {
      "Register a function to be called when this context is invalidated.
        If this context is already invalidated, the function is called
        immediately."

      if (!identical(.pid, processId())) {
        stop("Reactive context was created in one process and accessed from another")
      }

      if (.invalidated)
        func()
      else
        .invalidateCallbacks <<- c(.invalidateCallbacks, func)
      NULL
    },
    addPendingFlush = function(priority) {
      "Tell the reactive environment that this context should be flushed the
        next time flushReact() called."
      .getReactiveEnvironment()$addPendingFlush(self, priority)
    },
    onFlush = function(func) {
      "Register a function to be called when this context is flushed."
      .flushCallbacks <<- c(.flushCallbacks, func)
    },
    executeFlushCallbacks = function() {
      "For internal use only."

      lapply(.flushCallbacks, function(flushCallback) {
        flushCallback()
      })
    },
    isWeak = function() {
      .weak
    }
  )
)

ReactiveEnvironment <- R6Class(
  'ReactiveEnvironment',
  portable = FALSE,
  class = FALSE,
  public = list(
    .currentContext = NULL,
    .nextId = 0L,
    .pendingFlush = 'PriorityQueue',
    .inFlush = FALSE,

    initialize = function() {
      .pendingFlush <<- PriorityQueue$new()
    },
    nextId = function() {
      .nextId <<- .nextId + 1L
      return(as.character(.nextId))
    },
    currentContext = function() {
      if (is.null(.currentContext)) {
        if (isTRUE(getOption('shiny.suppressMissingContextError'))) {
          return(getDummyContext())
        } else {
          stop('Operation not allowed without an active reactive context. ',
               '(You tried to do something that can only be done from inside a ',
               'reactive expression or observer.)')
        }
      }
      return(.currentContext)
    },
    runWith = function(ctx, contextFunc) {
      old.ctx <- .currentContext
      .currentContext <<- ctx
      on.exit(.currentContext <<- old.ctx)
      contextFunc()
    },
    addPendingFlush = function(ctx, priority) {
      .pendingFlush$enqueue(ctx, priority)
    },
    hasPendingFlush = function() {
      return(!.pendingFlush$isEmpty())
    },
    # Returns TRUE if anything was actually called
    flush = function() {
      # If nothing to flush, exit early
      if (!hasPendingFlush()) return(invisible(FALSE))
      # If already in a flush, don't start another one
      if (.inFlush) return(invisible(FALSE))
      .inFlush <<- TRUE
      on.exit({
        .inFlush <<- FALSE
        rLog$idle(domain = NULL)
      })

      while (hasPendingFlush()) {
        ctx <- .pendingFlush$dequeue()
        ctx$executeFlushCallbacks()
      }

      invisible(TRUE)
    }
  )
)

.getReactiveEnvironment <- local({
  reactiveEnvironment <- NULL
  function() {
    if (is.null(reactiveEnvironment))
      reactiveEnvironment <<- ReactiveEnvironment$new()
    return(reactiveEnvironment)
  }
})

# Causes any pending invalidations to run. Returns TRUE if any invalidations
# were pending (i.e. if work was actually done).
flushReact <- function() {
  return(.getReactiveEnvironment()$flush())
}

# Retrieves the current reactive context, or errors if there is no reactive
# context active at the moment.
getCurrentContext <- function() {
  .getReactiveEnvironment()$currentContext()
}
hasCurrentContext <- function() {
  !is.null(.getReactiveEnvironment()$.currentContext)
}

getDummyContext <- function() {
  Context$new(
    getDefaultReactiveDomain(), '[none]', type = 'isolate',
    id = "Dummy", reactId = rLog$dummyReactId
  )
}

wrapForContext <- function(func, ctx) {
  force(func)
  force(ctx)

  function(...) {
    ctx$run(function() {
      captureStackTraces(
        func(...)
      )
    })
  }
}

reactivePromiseDomain <- function() {
  promises::new_promise_domain(
    wrapOnFulfilled = function(onFulfilled) {
      force(onFulfilled)
      ctx <- getCurrentContext()
      wrapForContext(onFulfilled, ctx)
    },
    wrapOnRejected = function(onRejected) {
      force(onRejected)
      ctx <- getCurrentContext()
      wrapForContext(onRejected, ctx)
    }
  )
}
tomkuipers1402/shiny documentation built on Feb. 13, 2020, 7:22 p.m.