processId <- local({
# pid is not sufficient to uniquely identify a process, because
# distributed futures span machines which could introduce pid
# collisions.
cached <- NULL
function() {
if (is.null(cached)) {
cached <<- digest::digest(list(
Sys.info(),
Sys.time()
))
}
# Sys.getpid() cannot be cached because forked children will
# then have the same processId as their parents.
paste(cached, Sys.getpid())
}
})
#' @include graph.R
Context <- R6Class(
'Context',
portable = FALSE,
class = FALSE,
public = list(
id = character(0),
.reactId = character(0),
.reactType = "other",
.label = character(0), # For debug purposes
.invalidated = FALSE,
.invalidateCallbacks = list(),
.flushCallbacks = list(),
.domain = NULL,
.pid = NULL,
.weak = NULL,
initialize = function(
domain, label='', type='other', prevId='',
reactId = rLog$noReactId,
id = .getReactiveEnvironment()$nextId(), # For dummy context
weak = FALSE
) {
id <<- id
.label <<- label
.domain <<- domain
.pid <<- processId()
.reactId <<- reactId
.reactType <<- type
.weak <<- weak
rLog$createContext(id, label, type, prevId, domain)
},
run = function(func) {
"Run the provided function under this context."
promises::with_promise_domain(reactivePromiseDomain(), {
withReactiveDomain(.domain, {
env <- .getReactiveEnvironment()
rLog$enter(.reactId, id, .reactType, .domain)
on.exit(rLog$exit(.reactId, id, .reactType, .domain), add = TRUE)
env$runWith(self, func)
})
})
},
invalidate = function() {
"Invalidate this context. It will immediately call the callbacks
that have been registered with onInvalidate()."
if (!identical(.pid, processId())) {
stop("Reactive context was created in one process and invalidated from another")
}
if (.invalidated)
return()
.invalidated <<- TRUE
rLog$invalidateStart(.reactId, id, .reactType, .domain)
on.exit(rLog$invalidateEnd(.reactId, id, .reactType, .domain), add = TRUE)
lapply(.invalidateCallbacks, function(func) {
func()
})
.invalidateCallbacks <<- list()
NULL
},
onInvalidate = function(func) {
"Register a function to be called when this context is invalidated.
If this context is already invalidated, the function is called
immediately."
if (!identical(.pid, processId())) {
stop("Reactive context was created in one process and accessed from another")
}
if (.invalidated)
func()
else
.invalidateCallbacks <<- c(.invalidateCallbacks, func)
NULL
},
addPendingFlush = function(priority) {
"Tell the reactive environment that this context should be flushed the
next time flushReact() called."
.getReactiveEnvironment()$addPendingFlush(self, priority)
},
onFlush = function(func) {
"Register a function to be called when this context is flushed."
.flushCallbacks <<- c(.flushCallbacks, func)
},
executeFlushCallbacks = function() {
"For internal use only."
lapply(.flushCallbacks, function(flushCallback) {
flushCallback()
})
},
isWeak = function() {
.weak
}
)
)
ReactiveEnvironment <- R6Class(
'ReactiveEnvironment',
portable = FALSE,
class = FALSE,
public = list(
.currentContext = NULL,
.nextId = 0L,
.pendingFlush = 'PriorityQueue',
.inFlush = FALSE,
initialize = function() {
.pendingFlush <<- PriorityQueue$new()
},
nextId = function() {
.nextId <<- .nextId + 1L
return(as.character(.nextId))
},
currentContext = function() {
if (is.null(.currentContext)) {
if (isTRUE(getOption('shiny.suppressMissingContextError'))) {
return(getDummyContext())
} else {
stop('Operation not allowed without an active reactive context. ',
'(You tried to do something that can only be done from inside a ',
'reactive expression or observer.)')
}
}
return(.currentContext)
},
runWith = function(ctx, contextFunc) {
old.ctx <- .currentContext
.currentContext <<- ctx
on.exit(.currentContext <<- old.ctx)
contextFunc()
},
addPendingFlush = function(ctx, priority) {
.pendingFlush$enqueue(ctx, priority)
},
hasPendingFlush = function() {
return(!.pendingFlush$isEmpty())
},
# Returns TRUE if anything was actually called
flush = function() {
# If nothing to flush, exit early
if (!hasPendingFlush()) return(invisible(FALSE))
# If already in a flush, don't start another one
if (.inFlush) return(invisible(FALSE))
.inFlush <<- TRUE
on.exit({
.inFlush <<- FALSE
rLog$idle(domain = NULL)
})
while (hasPendingFlush()) {
ctx <- .pendingFlush$dequeue()
ctx$executeFlushCallbacks()
}
invisible(TRUE)
}
)
)
.getReactiveEnvironment <- local({
reactiveEnvironment <- NULL
function() {
if (is.null(reactiveEnvironment))
reactiveEnvironment <<- ReactiveEnvironment$new()
return(reactiveEnvironment)
}
})
# Causes any pending invalidations to run. Returns TRUE if any invalidations
# were pending (i.e. if work was actually done).
flushReact <- function() {
return(.getReactiveEnvironment()$flush())
}
# Retrieves the current reactive context, or errors if there is no reactive
# context active at the moment.
getCurrentContext <- function() {
.getReactiveEnvironment()$currentContext()
}
hasCurrentContext <- function() {
!is.null(.getReactiveEnvironment()$.currentContext)
}
getDummyContext <- function() {
Context$new(
getDefaultReactiveDomain(), '[none]', type = 'isolate',
id = "Dummy", reactId = rLog$dummyReactId
)
}
wrapForContext <- function(func, ctx) {
force(func)
force(ctx)
function(...) {
ctx$run(function() {
captureStackTraces(
func(...)
)
})
}
}
reactivePromiseDomain <- function() {
promises::new_promise_domain(
wrapOnFulfilled = function(onFulfilled) {
force(onFulfilled)
ctx <- getCurrentContext()
wrapForContext(onFulfilled, ctx)
},
wrapOnRejected = function(onRejected) {
force(onRejected)
ctx <- getCurrentContext()
wrapForContext(onRejected, ctx)
}
)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.