R/event-emitter.R

Defines functions on once

Documented in on once

#' Event Emitter
#'
#' @export
EventEmitter <- R6::R6Class(
    "EventEmitter",
    public = list(
        #' @param event_name
        #' @param fn TODOC
        add_listener = function(event_name, fn) {
            pretty_stopifnot(
                "argument `fn` supplied to emitter must be a function",
                sprintf("fn is of class '%s'", class(fn)),
                is.function(fn)
            )
            self$emit("new_listener", event_name, fn)
            private$.check_listener_length(event_name)
            private$.listeners[[event_name]] <- append(private$.listeners[[event_name]], fn)
            invisible(self)
        },
        #' @param event_name TODOC
        #' @param fn TODOC
        on = function(event_name, fn) {
            invisible(self$add_listener(event_name, fn))
        },
        #' @param event_name TODOC
        #' @param fn TODOC
        prepend_listener = function(event_name, fn) {
            pretty_stopifnot(
                "argument `fn` supplied to emitter must be a function",
                sprintf("fn is of class '%s'", class(fn)),
                is.function(fn)
            )
            self$emit("new_listener", event_name, fn)
            private$.check_listener_length(event_name)
            private$.listeners[[event_name]] <- prepend(private$.listeners[[event_name]], fn)
            invisible(self)
        },
        #' @param event_name TODOC
        #' @param fn TODOC
        once = function(event_name, fn) {
            pretty_stopifnot(
                "argument `fn` supplied to emitter must be a function",
                sprintf("fn is of class '%s'", class(fn)),
                is.function(fn)
            )
            self$emit("new_listener", event_name, fn)
            private$.check_listener_length(event_name)
            wrapper <- private$.once_wrapper(fn, event_name, wrapper)
            private$.listeners[[event_name]] <- append(private$.listeners[[event_name]], wrapper)
            invisible(self)
        },
        #' @param event_name TODOC
        #' @param fn TODOC
        prepend_once_listener = function(event_name, fn) {
            pretty_stopifnot(
                "argument `fn` supplied to emitter must be a function",
                sprintf("fn is of class '%s'", class(fn)),
                is.function(fn)
            )
            self$emit("new_listener", event_name, fn)
            wrapper <- private$.once_wrapper(fn, event_name, wrapper)
            private$.check_listener_length(event_name)
            private$.listeners[[event_name]] <- prepend(private$.listeners[[event_name]], wrapper)
            invisible(self)
        },
        #' @param event_name TODOC
        #' @param fn TODOC
        remove_listener = function(event_name, fn) {
            pretty_stopifnot(
                "argument `fn` supplied to emitter must be a function",
                sprintf("fn is of class '%s'", class(fn)),
                is.function(fn)
            )
            self$emit("remove_listener", event_name, fn)
            if (!is.null(private$.listeners[[event_name]])) {
                matches <- unlist(lapply(private$.listeners[[event_name]], function(x) identical(x, fn)))
                private$.listeners[[event_name]][
                    matches
                ] <- NULL
            }
            invisible(self)
        },
        #' @param event_name TODOC
        #' @param fn TODOC
        off = function(event_name, fn) {
            return(self$remove_listener(event_name, fn))
        },
        #' @param event_name TODOC
        remove_all_listeners = function(event_name) {
            private$.listeners[[event_name]] <- NULL
            invisible(self)
        },
        #' @param event_name TODOC
        #' @param ... arguments to pass to functions registered to a given event
        emit = function(event_name, ...) {
            event_listeners <- private$.listeners[[event_name]]
            if (is_falsey(event_listeners)) {
                if (event_name == "error") {
                     error_unhandled_error_event(...)
                } else {
                    return(invisible(FALSE))
                }
            } else {
                fn <- function() callback(...)
                private$.with_error_handler(
                    code = {
                        for (callback in event_listeners) {
                            if (event_name == "close") {
                                .add_to_close(fn)
                            } else {
                                fn()
                            }
                        }
                    }
                )
            }
            return(invisible(TRUE))
        },
        #' @param event_name TODOC
        listener_count = function(event_name) {
            return(length(private$.listeners[[event_name]]))
        },
        #' @param event_name TODOC
        raw_listeners = function(event_name) {
            return(private$.listeners[[event_name]])
        },
        #' @description
        #' TODOC
        event_names = function() {
            ev_names <- names(private$.listeners)
            return(ev_names[ev_names %nin% internal_events])
        },
        #' @param n TODOC
        set_max_listeners = function(n) {
            private$.max_listeners <- n
            invisible(self)
        },
        #' @description
        #' TODOC
        get_max_listeners = function() {
            return(private$.max_listeners)
        }
    ),
    private = list(
        .listeners = list(),
        .max_listeners = 10,
        .check_listener_length = function(event_name) {
            count <- self$listener_count(event_name)
            if (count >= self$get_max_listeners()) {
                rlang::warn(
                    c(
                        "!" = "Possible <EventEmitter> leak detected.",
                        i = sprintf(
                            "There are more listeners than expected for event: '%s'",
                            event_name
                        ),
                        " " = sprintf("Expected maximum: %s", private$.max_listeners)
                    )
                )
            }
        },
        .once_wrapper = function(fn, event_name, this) {
            return(
                function(...) {
                    fn(...)
                    self$off(event_name, this)
                }
            )
        },
        .with_error_handler = function(code) {
            tryCatch(
                expr = {
                    force(code)
                },
                error = function(e) {
                    store_errors()
                    next_tick(
                        self$emit,
                        "error",
                        sprintf(
                            "Error %s: %s",
                            deparse(e[["call"]]),
                            e[["message"]]
                        ))
                }
            )
        }
    ),
    cloneable = FALSE
)

#' Async once-event
#' @param emitter an EventEmitter object
#' @param event character vector
#' @rdname async
#' @export
once <- function(emitter, event) {
    promises::promise(function(resolve, reject) {
        emitter$once(event, function(...) {
            if (missing(...)) {
                resolve(NULL)
            }
            resolve(...)
        })
        emitter$once("error", function(...) {
            if (missing(...)) {
                reject(NULL)
            }
            reject(...)
        })
    })
}

#' Async event iterator
#' @param emitter an EventEmitter object
#' @param event character vector
#' @rdname async
#' @export
on <- function(emitter, event) {
    listeners <- emitter$raw_listeners(event)
    emitter$remove_all_listeners(event)
    p <- once(emitter, event)
    p_listeners <- lapply(listeners, function(listener) {
        promises::promise(function(resolve, reject) {
            promises::then(p, function(...) {
                if (is.null(...)) {
                    resolve(listener())
                } else {
                    resolve(listener(...))
                }
            })
        })
    })
    return(coro::as_iterator(p_listeners))
}
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.