# ~~~~~~~~~~~~~~~ Exports ~~~~~~~~~~~~~~~~ #
#' Queue function `fn` execution.
#' Call is added to the current phase check loop.
#' @export
#' @rdname async
set_immediate <- function(fn, ...) {
if (!missing(...)) {
func <- fn
stopifnot(is.function(fn))
fn <- substitute(do.call(func, list(...)))
}
later::later(
func = function() {
.event_loop$add_to_queue(fn, "check")
.event_loop$execute_loop_sync("check")
},
delay = 0,
loop = .event_loop$loops$check
)
}
#' Waits `x` ms before queueing function `fn` execution.
#' Call is added to the next phase timer loop.
#' @export
#' @rdname async
set_timeout <- function(fn, ms, ...) {
ms <- ms / 1000
if (!missing(...)) {
func <- fn
stopifnot(is.function(fn))
fn <- substitute(do.call(func, list(...)))
}
later::later(
func = function() {
.event_loop$add_to_queue(fn, "timer")
.event_loop$execute_loop_sync("timer")
},
delay = ms,
loop = .event_loop$loops$timer
)
}
#' Execute function on next tick/phase of event loop.
#' E.g., if the current phase of the event loop is the
#' timer phase, the function will be queued prior to the callback phase.
#' @export
#' @rdname async
next_tick <- function(fn, ...) {
next_phase <- .event_loop$next_phase
if (!missing(...)) {
stopifnot(is.function(fn))
func <- fn
fn <- function() {
func(...)
}
}
.event_loop$add_to_queue(fn, "next_tick")
later::later(
func = function() {
.event_loop$execute_loop_sync("next_tick")
},
delay = 0,
loop = next_phase
)
}
# ~~~~~~~~~~~~~~~ Internal ~~~~~~~~~~~~~~~~ #
.event_delay <- 0
.event_loop <- NULL
#
# Current loop structure:
# Entry ◄───────────┐
# │ │
# │ ├────► Timer │
# │ │ │
# │ │ │
# │ ├────► Callback │
# │ │ │ event
# │ │ │ phase
# │ ├────► Poll │ "chain"
# │ │ │
# │ │ │
# │ ├────► Check │
# │ │ │
# ▼ │ │
# └────► Close ─────┘
#
# Each phase consists of a FIFO call stack that is executed synchronously.
# When the event loop reaches a phase, all calls in the stack are # executed.
# When functions are queued, e.g. via `set_timeout` or `set_immediate`, they are added
# to the call stack *after* the set duration.
#
# Each time a phase is triggered, all subsequent phases are triggered too.
# i.e., if callback is triggered, then poll, check and close phases are triggered.
#
EventLoopWrapper <- R6::R6Class(
"EventLoopWrapper",
public = list(
last_error = NULL,
loops = list(),
queue = list(
timer = fastmap::fastqueue(),
callback = fastmap::fastqueue(),
poll = fastmap::fastqueue(),
check = fastmap::fastqueue(),
close = fastmap::fastqueue(),
next_tick = fastmap::fastqueue()
),
initialize = function() {
private$.append_loop_list(list(timer = later::create_loop(parent = later::global_loop())))
private$.append_loop_list(list(callback = later::create_loop(parent = self$loops$timer)))
private$.append_loop_list(list(poll = later::create_loop(parent = self$loops$callback)))
private$.append_loop_list(list(check = later::create_loop(parent = self$loops$poll)))
private$.append_loop_list(list(close = later::create_loop(parent = self$loops$check)))
},
execute_loop_sync = function(queue) {
select_queue <- self$queue[[queue]]
if (queue == "poll") {
private$.poll_phase_call()
}
# TODO,
# execute all cbs in next tick
if (self$queue$next_tick$size() > 0) {
while (!is.null(callback <- self$queue$next_tick$remove(NULL))) {
withCallingHandlers(
expr = {
if (is.function(callback)) {
callback()
} else if (is.call(callback)) {
eval(callback)
}
},
error = function(e) {
store_errors()
error_callback_evaluation(
queue,
e
)
break
}
)
}
}
if (select_queue$size() > 0) {
while (!is.null(callback <- select_queue$remove(NULL))) {
withCallingHandlers(
expr = {
if (is.function(callback)) {
callback()
} else if (is.call(callback)) {
eval(callback)
}
},
error = function(e) {
store_errors()
error_callback_evaluation(
queue,
e
)
break
}
)
}
}
},
add_to_queue = function(fn, queue, force_add = FALSE) {
queue_list <- self$queue[[queue]]$as_list()
matches <- contains_function(fn, queue_list)
if (force_add || !matches) {
self$queue[[queue]]$add(fn)
}
invisible(self)
}
),
active = list(
# only return {emitters} package loops
current_phase = function() {
curr <- later::current_loop()
if (!any(unlist(lapply(self$loops, function(x) identical(x, curr))))) {
return(self$loops$timer)
} else {
return(curr)
}
},
next_phase = function() {
curr_phase <- self$current_phase
if (identical(curr_phase, self$loops$timer)) {
next_phase <- self$loops$callback
} else if (identical(curr_phase, self$loops$callback)) {
next_phase <- self$loops$poll
} else if (identical(curr_phase, self$loops$poll)) {
next_phase <- self$loops$check
} else if (identical(curr_phase, self$loops$check)) {
next_phase <- self$loops$close
} else {
next_phase <- self$loops$timer
}
return(next_phase)
},
next_phase_op_ms = function() {
later::next_op_secs(self$current_phase) / 1000
},
current_phase_op_ms = function() {
later::next_op_secs(self$next_phase) / 1000
}
),
private = list(
.append_loop_list = function(loop) {
self$loops <- append(self$loops, loop)
},
.poll_phase_call = function() {
wait_time <- later::next_op_secs(self$loops$poll)
if (!is.infinite(wait_time) && wait_time > 0) {
if (wait_time <= 1) {
print(sprintf("Poll phase: waiting %s seconds", wait_time))
Sys.sleep(wait_time)
}
}
}
),
class = FALSE,
cloneable = FALSE
)
.add_to_poll <- function(fn, ...) {
if (!missing(...)) {
func <- fn
stopifnot(is.function(fn))
fn <- substitute(do.call(func, list(...)))
print(fn)
}
later::later(
func = function() {
.event_loop$add_to_queue(fn, "poll")
.event_loop$execute_loop_sync("poll")
},
delay = .event_delay,
loop = .event_loop$loops$poll
)
}
.add_to_callback <- function(fn, ...) {
if (!missing(...)) {
func <- fn
stopifnot(is.function(fn))
fn <- substitute(do.call(func, list(...)))
}
later::later(
func = function() {
.event_loop$add_to_queue(fn, "callback")
.event_loop$execute_loop_sync("callback")
},
delay = .event_delay,
loop = .event_loop$loops$callback
)
}
.add_to_close <- function(fn, ...) {
if (!missing(...)) {
func <- fn
stopifnot(is.function(fn))
fn <- substitute(do.call(func, list(...)))
}
later::later(
func = function() {
.event_loop$add_to_queue(fn, "close")
.event_loop$execute_loop_sync("close")
},
delay = .event_delay,
loop = .event_loop$loops$close
)
}
## WIP
contains_function <- function(fn, fn_list) {
res <- lapply(
fn_list,
function(x) {
identical(x, fn)
}
)
any(unlist(res))
}
poll_call <- function(fn) {
handler <- function() {
res <- fn()
if (isTRUE(res)) {
next_tick(.add_to_poll, handler)
}
}
handler()
}
last_error <- NULL
# used so that events that are visible to users are
internal_events <- list(
pipe_error = ".EMITTERS_INTERNAL_PIPE_ERROR"
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.