R/event-loop.R

Defines functions poll_call contains_function .add_to_close .add_to_callback .add_to_poll next_tick set_timeout set_immediate

Documented in next_tick set_immediate set_timeout

# ~~~~~~~~~~~~~~~ Exports ~~~~~~~~~~~~~~~~ #

#' Queue function `fn` execution.
#' Call is added to the current phase check loop.
#' @export
#' @rdname async
set_immediate <- function(fn, ...) {
    if (!missing(...)) {
        func <- fn
        stopifnot(is.function(fn))
        fn <- substitute(do.call(func, list(...)))
    }
    later::later(
        func = function() {
            .event_loop$add_to_queue(fn, "check")
            .event_loop$execute_loop_sync("check")
        },
        delay = 0,
        loop = .event_loop$loops$check
    )
}

#' Waits `x` ms before queueing function `fn` execution.
#' Call is added to the next phase timer loop.
#' @export
#' @rdname async
set_timeout <- function(fn, ms, ...) {
    ms <- ms / 1000
    if (!missing(...)) {
        func <- fn
        stopifnot(is.function(fn))
        fn <- substitute(do.call(func, list(...)))
    }
    later::later(
        func = function() {
            .event_loop$add_to_queue(fn, "timer")
            .event_loop$execute_loop_sync("timer")
        },
        delay = ms,
        loop = .event_loop$loops$timer
    )
}


#' Execute function on next tick/phase of event loop.
#' E.g., if the current phase of the event loop is the
#' timer phase, the function will be queued prior to the callback phase.
#' @export
#' @rdname async
next_tick <- function(fn, ...) {
    next_phase <- .event_loop$next_phase
    if (!missing(...)) {
        stopifnot(is.function(fn))
        func <- fn
        fn <- function() {
            func(...)
        }
    }

    .event_loop$add_to_queue(fn, "next_tick")
    later::later(
        func = function() {
            .event_loop$execute_loop_sync("next_tick")
        },
        delay = 0,
        loop = next_phase
    )
}

# ~~~~~~~~~~~~~~~ Internal ~~~~~~~~~~~~~~~~ #

.event_delay <- 0
.event_loop <- NULL

#
# Current loop structure:
#   Entry  ◄───────────┐
#    │                 │
#  │ ├────► Timer      │
#  │ │                 │
#  │ │                 │
#  │ ├────► Callback   │
#  │ │                 │   event
#  │ │                 │   phase
#  │ ├────► Poll       │  "chain"
#  │ │                 │
#  │ │                 │
#  │ ├────► Check      │
#  │ │                 │
#  ▼ │                 │
#    └────► Close ─────┘
#
# Each phase consists of a FIFO call stack that is executed synchronously.
# When the event loop reaches a phase, all calls in the stack are # executed.
# When functions are queued, e.g. via `set_timeout` or `set_immediate`, they are added
# to the call stack *after* the set duration.
#
# Each time a phase is triggered, all subsequent phases are triggered too.
# i.e., if callback is triggered, then poll, check and close phases are triggered.
#
EventLoopWrapper <- R6::R6Class(
    "EventLoopWrapper",
    public = list(
        last_error = NULL,
        loops = list(),
        queue = list(
            timer = fastmap::fastqueue(),
            callback = fastmap::fastqueue(),
            poll = fastmap::fastqueue(),
            check = fastmap::fastqueue(),
            close = fastmap::fastqueue(),
            next_tick = fastmap::fastqueue()
        ),
        initialize = function() {
            private$.append_loop_list(list(timer = later::create_loop(parent = later::global_loop())))
            private$.append_loop_list(list(callback = later::create_loop(parent = self$loops$timer)))
            private$.append_loop_list(list(poll = later::create_loop(parent = self$loops$callback)))
            private$.append_loop_list(list(check = later::create_loop(parent = self$loops$poll)))
            private$.append_loop_list(list(close = later::create_loop(parent = self$loops$check)))
        },
        execute_loop_sync = function(queue) {
            select_queue <- self$queue[[queue]]
            if (queue == "poll") {
                private$.poll_phase_call()
            }
            # TODO,
            # execute all cbs in next tick
            if (self$queue$next_tick$size() > 0) {
                                while (!is.null(callback <- self$queue$next_tick$remove(NULL))) {
                                    withCallingHandlers(
                                        expr = {
                                            if (is.function(callback)) {
                                                callback()
                                            } else if (is.call(callback)) {
                                                eval(callback)
                                            }
                                        },
                                        error = function(e) {
                                            store_errors()
                                            error_callback_evaluation(
                                                queue,
                                                e
                                            )
                                            break
                                        }
                                    )
                                }
            }
            if (select_queue$size() > 0) {
                while (!is.null(callback <- select_queue$remove(NULL))) {
                    withCallingHandlers(
                        expr = {
                            if (is.function(callback)) {
                                callback()
                            } else if (is.call(callback)) {
                                eval(callback)
                            }
                        },
                        error = function(e) {
                            store_errors()
                            error_callback_evaluation(
                                queue,
                                e
                            )
                            break
                        }
                    )
                }
            }
        },
        add_to_queue = function(fn, queue, force_add = FALSE) {
            queue_list <- self$queue[[queue]]$as_list()
            matches <- contains_function(fn, queue_list)
            if (force_add || !matches) {
                self$queue[[queue]]$add(fn)
            }
            invisible(self)
        }
    ),
    active = list(
        # only return {emitters} package loops
        current_phase = function() {
            curr <- later::current_loop()
            if (!any(unlist(lapply(self$loops, function(x) identical(x, curr))))) {
                return(self$loops$timer)
            } else {
                return(curr)
            }
        },
        next_phase = function() {
            curr_phase <- self$current_phase
            if (identical(curr_phase, self$loops$timer)) {
                next_phase <- self$loops$callback
            } else if (identical(curr_phase, self$loops$callback)) {
                next_phase <- self$loops$poll
            } else if (identical(curr_phase, self$loops$poll)) {
                next_phase <- self$loops$check
            } else if (identical(curr_phase, self$loops$check)) {
                next_phase <- self$loops$close
            } else {
                next_phase <- self$loops$timer
            }
            return(next_phase)
        },
        next_phase_op_ms = function() {
            later::next_op_secs(self$current_phase) / 1000
        },
        current_phase_op_ms = function() {
            later::next_op_secs(self$next_phase) / 1000
        }
    ),
    private = list(
        .append_loop_list = function(loop) {
            self$loops <- append(self$loops, loop)
        },
        .poll_phase_call = function() {
            wait_time <- later::next_op_secs(self$loops$poll)
            if (!is.infinite(wait_time) && wait_time > 0) {
                if (wait_time <= 1) {
                    print(sprintf("Poll phase: waiting %s seconds", wait_time))
                    Sys.sleep(wait_time)
                }
            }
        }
    ),
    class = FALSE,
    cloneable = FALSE
)

.add_to_poll <- function(fn, ...) {
    if (!missing(...)) {
        func <- fn
        stopifnot(is.function(fn))
        fn <- substitute(do.call(func, list(...)))
        print(fn)
    }
    later::later(
        func = function() {
            .event_loop$add_to_queue(fn, "poll")
            .event_loop$execute_loop_sync("poll")
        },
        delay = .event_delay,
        loop = .event_loop$loops$poll
    )
}

.add_to_callback <- function(fn, ...) {
    if (!missing(...)) {
        func <- fn
        stopifnot(is.function(fn))
        fn <- substitute(do.call(func, list(...)))
    }
    later::later(
        func = function() {
            .event_loop$add_to_queue(fn, "callback")
            .event_loop$execute_loop_sync("callback")
        },
        delay = .event_delay,
        loop = .event_loop$loops$callback
    )
}

.add_to_close <- function(fn, ...) {
    if (!missing(...)) {
        func <- fn
        stopifnot(is.function(fn))
        fn <- substitute(do.call(func, list(...)))
    }
    later::later(
        func = function() {
            .event_loop$add_to_queue(fn, "close")
            .event_loop$execute_loop_sync("close")
        },
        delay = .event_delay,
        loop = .event_loop$loops$close
    )
}

## WIP
contains_function <- function(fn, fn_list) {
    res <- lapply(
        fn_list,
        function(x) {
            identical(x, fn)
        }
    )

    any(unlist(res))
}

poll_call <- function(fn) {
    handler <- function() {
        res <- fn()
        if (isTRUE(res)) {
            next_tick(.add_to_poll, handler)
        }
    }
    handler()
}

last_error <- NULL

# used so that events that are visible to users are
internal_events <- list(
    pipe_error = ".EMITTERS_INTERNAL_PIPE_ERROR"
)
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.