R/stream-readable.R

#' R6 Class representing a readable connection
#'
#' Events:
#'   * 'data'
#'   * 'readable'
#'   * 'pause'
#'   * 'resume'
#'   * 'new_listener'
#'   * 'remove_listener'
#'   * 'error'
#'   * 'close'
#'
#' Two types of data consumption:
#'    * flowing  mode
#'    * readable mode
#'
#' Errors:
#'    * Errors are propogated by the event emitter, and will halt execution unless handled appropriately. If an error
#'      listener is attached, errors are caught and do not halt execution. Pipes automatically propogate errors to the
#'      right-hand-side pipe (if it exists).
#' @export
ReadableStream <- R6::R6Class(
    "ReadableStream",
    inherit = EventEmitter,
    public = list(
        #' @description
        #' Create a new ReadableStream object
        #' @param source_object
        #' file path,
        #' connection,
        #' processx connection,
        #' socket connection,
        #' iterator,
        #' or anything coercable to an iterator
        #' @param highwater_mark maximum buffer size
        #' @param queue_strategy
        #' the function used to determine the maximum size of the internal buffer.
        #' the buffer size is determined by calling queue_strategy(internal_buffer)
        #' @param encoding encoding used when reading from the connection
        #' @param emit_close should close be emitted when destroying the stream?
        #' @return A new `ReadableStream` object.
        initialize = function(source_object,
                              highwater_mark = NULL,
                              queue_strategy = object.size,
                              encoding = "UTF8") {
            pretty_stopifnot(
                "encoding must be a character vector",
                sprintf("encoding is of class %s", class(encoding)),
                is.character(encoding)
            )

            private$.readable_source <- PullSource$new(source_object)
            private$.readable_encoding <- encoding
            private$.readable_queueing_strategy <- queue_strategy
            private$.read_buffer <- Queue$new(guesstimate_size(source_object))
            private$.readable_highwater_mark <- set_highwater(highwater_mark, queue_strategy)

            ## listeners
            self$on("error", function(err) {
                if (isFALSE(private$.readable_errored)) {
                    private$.readable_errored <- err
                    if (length(private$.readable_pipes) == 0) {
                        self$destroy(err)
                    } else {
                        self$destroy()
                    }
                }
            })


            ## switch to flowing if there is a data listener
            ## and flow is not explicitly paused
            self$on("new_listener", function(event_name, .fn) {
                if (private$.readable_teed && event_name %in% c("data", "readable")) {
                    pretty_stopifnot(
                        "Cannot add listeners to a teed ReadableStream",
                        "ReadableStream has been teed",
                        !private$.readable_teed
                    )
                }
                if (event_name == "data" && !isFALSE(private$.readable_data_flowing)) {
                    private$.readable_resume()
                    next_tick(poll_call, private$.read_poll)
                } else if (event_name == "readable") {
                    private$.readable_pause()
                    next_tick(poll_call, private$.read_poll)
                }
            })

            # if there are no data listeners, nor pipes, stream must be paused
            self$on("remove_listener", function(event_name, .fn) {
                if (event_name == "data" &&
                    length(self$raw_listeners(event_name)) == 0 &&
                    length(private$.readable_pipes) == 0) {
                    private$.readable_pause()
                }
            })

            invisible(self)
        },
        #' Print
        #' @description
        #' Pretty print a <ReadableStream> object
        print = function() {
            cat("# a <ReadableStream>\n")
            lapply(seq_along(self$readable_state), function(n) {
                cat(
                    stringr::str_pad(names(self$readable_state[n]), 14, side = "left"),
                    ":",
                    format(self$readable_state[[n]]),
                    "\n"
                )
            })
            invisible(self)
        },
        #' Read from stream
        #' @description
        #' How many lines to read from the internal buffer. If n < 0, read
        #' all lines.
        #' @param n
        read = function(n = -1) {
            # can be used to force an end signal
            if (n == 0) {
                return(eos_signal())
            }
            private$.readable_needs_emit <- TRUE

            if (private$.read_buffer$size() > 0) {
                extracted_chunks <- private$.read_buffer$remove(n = n)
                return(extracted_chunks)
            }
            NULL
        },
        #' pipe to a writeable stream
        #' @param destination writeable stream
        #' @description
        pipe = function(destination) {
            pretty_stopifnot(
                "Pipe destination must inherit from a Writeable or Duplex stream",
                sprintf("Destination is of class '%s'", class(destination)),
                inherits(destination, c("WriteableStream", "DuplexStream"))
            )
            pretty_stopifnot(
                "Stream cannot pipe when it has been destroyed, or is in the process of ending",
                sprintf("Stream has already %s", ifelse(private$.readable_destroyed, "been destroyed", "ended")),
                !private$.readable_destroyed && !private$.readable_ended
            )
            pretty_stopifnot(
                "ReadableStream cannot be piped when teed",
                "ReadableStream has already been teed",
                !private$.readable_teed
            )

            private$.readable_pipes <- append(destination, private$.readable_pipes)

            .source_pipe_data <- set_function(source_pipe_data)
            .source_pipe_error <- set_function(source_pipe_error)
            .source_pipe_pause <- set_function(source_pipe_pause)
            .source_pipe_end <- set_function(source_pipe_end)
            .destination_pipe_error <- set_function(destination_pipe_error)
            .destination_unpipe <- set_function(destination_unpipe)

            destination$prepend_listener("error", .destination_pipe_error)
            destination$once("close", .destination_unpipe)
            self$on("data", .source_pipe_data)
            self$on("error", .source_pipe_error)
            self$on("end", .source_pipe_end)

            if (!isTRUE(private$.readable_data_flowing)) {
                private$.readable_resume()
                poll_call(private$.read_poll)
            }

            destination$emit("pipe", self)
            invisible(destination)
        },
        #' remove a pipe pointing to a writeable stream
        #' @param destination writeable stream
        #' @description
        unpipe = function(destination) {
            # remove all pipes if unspecified
            if (missing(destination)) {
                lapply(private$.readable_pipes, function(destination) {
                    destination$emit("unpipe", self)
                })
                private$.readable_pipes <- list()
            } else {
                pretty_stopifnot(
                    "Unpipe destination must inherit from a Writeable or Duplex stream",
                    sprintf("Destination is of class '%s'", class(destination)),
                    inherits(destination, c("WriteableStream", "DuplexStream"))
                )
                private$.readable_pipes <- splice_element(private$.readable_pipes, destination)
                destination$emit("unpipe", self)
            }
            if (length(private$.readable_pipes) == 0) {
                private$.readable_pause()
            }
            invisible(destination)
        },
        # WIP
        tee = function() {
            pretty_stopifnot(
                "ReadableStream cannot be teed multiple times",
                "ReadableStream has already been teed",
                !private$.readable_teed
            )
            pretty_stopifnot(
                "Stream cannot tee when it has been destroyed, or is in the process of ending",
                sprintf("Stream has already %s", ifelse(private$.readable_destroyed, "been destroyed", "ended")),
                !private$.readable_destroyed && !private$.readable_ended
            )
            pretty_stopifnot(
                "Stream cannot tee when it has been piped from",
                "Stream has already piped to another stream",
                length(private$.readable_pipes) == 0
            )

            left_stream <- ReadableStream$new(
                source_object = list(),
                highwater_mark = private$.readable_highwater_mark,
                queue_strategy = private$.readable_queueing_strategy,
                encoding = private$.readable_encoding
            )
            right_stream <- ReadableStream$new(
                source_object = list(),
                highwater_mark = private$.readable_highwater_mark,
                queue_strategy = private$.readable_queueing_strategy,
                encoding = private$.readable_encoding
            )

            # todo, more logic here!

            self$on("data", function(data) {
                left_stream$push(data)
                right_stream$push(data)
            })

            self$on("error", function(err) {
                left_stream$emit("error", err)
                right_stream$emit("error", err)
            })

            private$.readable_teed <- TRUE

            invisible(list(left_stream, right_stream))
        },
        #' Pause stream
        #' @description
        #' Pause the stream, and emit a 'pause' event (if not already paused)
        pause = function() {
            if (length(private$.readable_pipes) == 0) {
                private$.readable_pause()
            } else {
                rlang::warn(
                    c(
                        x = "Cannot manually pause a piped stream.",
                        i = "Unpipe the stream if manual control is required."
                    )
                )
            }
            invisible(self)
        },
        #' @description
        #' Resume stream, and emit a 'resume' event (if not already flowing)
        resume = function() {
            if (length(private$.readable_pipes) == 0) {
                if (length(self$raw_listeners("data")) == 0) {
                    rlang::warn(
                        c(
                            "i" = "Resuming <ReadableStream> without any data listeners attached",
                            ">" = "This may result in data being read without a destination"
                        )
                    )
                }
                private$.readable_resume()
            } else {
                rlang::warn(
                    c(
                        x = "Cannot manually resume a piped stream.",
                        i = "Unpipe the stream if manual control is required."
                    )
                )
            }
            invisible(self)
        },
        unshift = function(chunk) {
            private$.add_chunk(chunk, append = FALSE)
        },
        push = function(chunk) {
            private$.add_chunk(chunk, append = TRUE)
        },
        #' Destroy stream
        #' @description
        #' Destroy the stream, closing its file descriptor and emitting
        #' a close event when applicable
        #' @param why a message to print when destroying the stream
        destroy = function(why = NULL) {
            private$.readable_destroyed <- TRUE
            private$.readable_data_flowing <- FALSE

            if (!is.null(why)) {
                store_errors()
                rlang::warn(
                    c(
                        "<ReadableStream> destroyed with message: ",
                        as.character(why)
                    )
                )
            }

            # cleanup
            private$.read_buffer$reset()
            private$.readable_source$close()
            self$emit("close")

            invisible(self)
        }
    ),
    private = list(
        finalize = function() {
            self$destroy()
        },
        .read_buffer = NULL,
        .readable_source = NULL,
        # readable options
        .readable_encoding = NULL,
        .readable_queueing_strategy = NULL,
        .readable_highwater_mark = NULL,
        # readable state
        .readable_needs_emit = TRUE,
        .readable_reading = FALSE,
        .readable_data_flowing = NULL,
        .readable_destroyed = FALSE,
        .readable_ended = FALSE,
        .readable_ending = FALSE,
        .readable_errored = FALSE,
        .readable_teed = FALSE,
        .readable_pipes = list(),
        .readable_pipe_drains = list(),
        .readable_resume = function() {
            if (!isTRUE(private$.readable_data_flowing)) {
                private$.readable_data_flowing <- TRUE
                self$emit("resume")
            }
            invisible(self)
        },
        .readable_pause = function() {
            if (!isFALSE(private$.readable_data_flowing)) {
                private$.readable_data_flowing <- FALSE
                super$emit("pause")
            }
            invisible(self)
        },
        .read_poll = function() {
            private$.with_error_handler(
                code = {
                    # destroyed means that no data is *ever* passed
                    if (private$.readable_destroyed ||
                        is.null(private$.readable_data_flowing)
                    ) {
                        return(FALSE)
                    }

                    private$.readable_emit_data()
                    private$.readable_maybe_pull()
                    private$.readable_maybe_end()

                    # continue polling
                    return(TRUE)
                }
            )
        },
        .read = function() {
            res <- TRUE
            while (res) {
                chunk <- private$.readable_source$pull(private$.readable_amount_to_read())
                if (length(chunk) == 0) {
                    chunk <- eos_signal()
                }
                res <- self$push(chunk)
                if (isFALSE(res)) {
                    break
                }
            }
        },
        .add_chunk = function(chunk, append = TRUE) {
            if (private$.readable_ended) {
                return(FALSE)
            } else if (is_eos(chunk)) {
                private$.readable_ending <- TRUE
            } else if (!append) {
                private$.read_buffer$add(chunk, TRUE)
            } else if (private$.readable_amount_to_read() > 0) {
                if (append && isFALSE(private$.readable_reading)) {
                    private$.readable_reading <- TRUE
                    private$.read_buffer$add(chunk)
                    private$.readable_reading <- FALSE
                }
                return(TRUE)
            }
            FALSE
        },
        .readable_length = function() {
            if (identical(private$.readable_queueing_strategy, object.size)) {
                object.size(private$.read_buffer$as_list()) - object.size(list())
            } else {
                private$.readable_queueing_strategy(private$.read_buffer$as_list())
            }
        },
        .readable_amount_to_read = function() {
            amount_to_handle(private$.readable_highwater_mark, private$.readable_length())
        },
        .readable_emit_data = function() {
            if (private$.read_buffer$size() > 0) {
                # push data
                if (private$.readable_data_flowing) {
                    buffer <- self$read(n = 1)
                    if (length(buffer) > 0) {
                        self$emit("data", buffer)
                    }
                } else if (private$.readable_needs_emit) {
                    # emit a readable event once
                    private$.readable_needs_emit <- FALSE
                    self$emit("readable")
                }
            }
        },
        .readable_maybe_pull = function() {
            if (private$.readable_source$poll_connection()) {
                if (private$.read_buffer$size() == 0 ||
                    private$.readable_amount_to_read() > 0) {
                    next_tick(private$.read)
                }
            }
        },
        .readable_maybe_end = function() {
            if (!private$.readable_source$poll_connection()) {
                # if stream is ending and the buffer has been emptied,
                # emit end
                if (private$.readable_ending && private$.read_buffer$size() == 0) {
                    private$.readable_ending <- FALSE
                    private$.readable_ended <- TRUE
                    self$emit("end")
                } else if (private$.readable_ended) {
                    self$destroy()
                }
            }
        },
        .source_pipe_data = NULL,
        .source_pipe_error = NULL,
        .source_pipe_pause = NULL,
        .source_pipe_end = NULL,
        .destination_pipe_error = NULL,
        .destination_unpipe = NULL
    ),
    active = list(
        readable_state = function() {
            list(
                # settings
                encoding = private$.readable_encoding,
                highwater_mark = private$.readable_highwater_mark,

                # data len/src
                source = private$.readable_source$source_object,
                buffer = private$.read_buffer,
                buffer_length = private$.readable_length(),

                # data flow states
                piped = length(private$.readable_pipes) > 0,
                teed = private$.readable_teed,
                flowing = private$.readable_data_flowing,
                ended = private$.readable_ended,
                destroyed = private$.readable_destroyed,
                errored = private$.readable_destroyed
            )
        }
    ),
    cloneable = FALSE
)
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.