R/stream-sources.R

Defines functions get_poll_method get_pull_method get_push_method

Source <- R6::R6Class(
    "Source",
    public = list(
        poll_connection = NULL,
        initialize = function(source_object) {
            is_read_source <- "pull" %in% names(rlang::caller_env(n = 2)[["public_fields"]])

            if (is.character(source_object)) {
                if (is_read_source) {
                    self$source_object <- file(source_object, open = "rb", blocking = FALSE)
                } else {
                    self$source_object <- file(source_object, open = "wb", blocking = FALSE)
                }
            }

            if (inherits(self$source_object, "connection")) {
                suppressWarnings(open(self$source_object))
            } else if (!is.function(self$source_object)) {
                self$source_object <- Queue$new()
                if (class(source_object) %in% c("data.frame", "data.table", "tibble")) {
                    for (i in seq_len(nrow(source_object))) {
                        self$source_object$add(source_object[i,])
                    }
                } else {
                    for (i in seq_along(source_object)) {
                        self$source_object$add(source_object[[i]])
                    }
                }
            } else {
                self$source_object <- source_object
            }

            # poll method
            self$poll_connection <- get_poll_method(self$source_object, is_read_source)
            environment(self$poll_connection) <- rlang::current_env()
        },
        close = function() {
            if (inherits(self$source_object, "connection") ||
            inherits(self$source_object, "processx_connection")) {
                try(
                    expr = {
                        close(self$source_object)
                    },
                    silent = TRUE
                )
            } else if ("pull" %in% names(rlang::caller_env(n = 2)[["public_fields"]])) {
                self$source_object <- eos_signal()
            }
        }
    ),
    class = FALSE,
    cloneable = FALSE
)


PullSource <- R6::R6Class(
    classname = "PullSource",
    inherit = Source,
    public = list(
        source_object = NULL,
        pull = NULL,
        initialize = function(source_object) {
            super$initialize(source_object)
            # ~~ Set methods ~~ #
            self$pull <- get_pull_method(self$source_object)
            environment(self$pull) <- rlang::current_env()
            invisible(self)
        }
    ),
    class = FALSE,
    cloneable = FALSE
)

PushSource <- R6::R6Class(
    inherit = Source,
    public = list(
        source_object = NULL,
        push = NULL,
        initialize = function(source_object) {
            super$initialize(source_object)
            # ~~ Set methods ~~ #
            self$push <- get_push_method(self$source_object)
            environment(self$push) <- rlang::current_env()
        }
    ),
    class = FALSE,
    cloneable = FALSE
)

PushPullSource <- R6::R6Class(
    inherit = Source,
    public = list(
        source_object = NULL,
        push = NULL,
        pull = NULL,
        initialize = function(source_object) {
            super$initialize(source_object)
            # ~~ Set methods ~~ #
            self$push <- get_push_method(self$source_object)
            self$pull <- get_pull_method(self$source_object)
            environment(self$pull) <- rlang::current_env()
            environment(self$push) <- rlang::current_env()
        }
    )
)

get_push_method <- function(source_object) {
    if (inherits(source_object, "processx_connection")) {
        function(text, ...) {
            processx::processx_conn_write(
                str = text,
                con = self$source_object
            )
        }
    } else if (inherits(source_object, "connection") ||
        is.character(source_object)) {
        function(text, ...) {
            if (!is.character(text)) {
                text <- as.character(text)
            }
            writeLines(
                con = self$source_object,
                text = text,
                sep = "\n"
            )
        }
    } else {
        function(text, ...) {
            self$source_object$add(text)
        }
    }
}

get_pull_method <- function(source_object) {
       if (inherits(source_object, "processx_connection")) {
           function(n) {
               processx::conn_read_chars(
                   con = self$source_object,
                   n
               )
           }
       } else if (inherits(source_object, "connection")) {
           function(n) {
               readChar(
                   con = self$source_object,
                   useBytes = TRUE,
                   nchars = n
               )
           }
       } else {
           function(...) {
               self$source_object$remove()
           }
       }
}

get_poll_method <- function(source_object, is_read_source) {
    if (is_eos(source_object)) {
        function() FALSE
    } else if (inherits(source_object, "processx_connection")) {
        function() {
            processx::poll(list(self$source_object), 1)[[1]] != "closed"
        }
    } else if (inherits(source_object, "connection")) {
        function() {
            tryCatch(
                expr = {
                    isOpen(self$source_object)
                },
                error = function(e) {
                    FALSE
                }
            )
        }
    } else {
        function() {
            !is_eos(self$source_object$peek())
        }
    }
}
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.