R/stream-duplex.R

Defines functions init_duplex

init_duplex <- function(...) {
    groups <- list(
        public_fields = WriteableStream$public_fields,
        public_methods = WriteableStream$public_methods,
        private_fields = WriteableStream$private_fields,
        private_methods = WriteableStream$private_methods,
        active = WriteableStream$active
    )

    DuplexStream[["private_methods"]][[".initialize_writeable"]] <- WriteableStream$public_methods$initialize
    DuplexStream[["private_methods"]][[".writeable_poll"]] <- WriteableStream$private_methods$.poll
    DuplexStream[["private_methods"]][[".readable_poll"]] <- ReadableStream$private_methods$.poll

    for (i in seq_along(groups)) {
        object_group <- groups[i]
        for (objects in object_group) {
            group_name <- names(object_group)[[1]]
            Map(
                f = function(fn_body, name) {
                    if (is.null(fn_body)) fn_body <- list(NULL)
                    if (is.null(DuplexStream[[group_name]][name][[1]])) {
                        DuplexStream[[group_name]][[name]] <- fn_body
                    }
                },
                objects,
                names(objects)
            )
        }
    }
}

#' Duplex Stream
#' @export
DuplexStream <- R6::R6Class(
    "DuplexStream",
    inherit = ReadableStream,
    public = list(
        #' @description
        #' Create a new DuplexStream object
        #' @param source_object a file path, connection, processx connection, or socket connection
        #' @param readable_highwater_mark maximum buffer size
        #' @param writeable_highwater_mark maximum buffer size
        #' @param encoding encoding used when reading from the connection
        #' @return A new `DuplexStream` object.
        initialize = function(source_object,
                              readable_highwater_mark = 16384,
                              writeable_highwater_mark = readable_highwater_mark,
                              readable_queue_strategy = object.size,
                              writeable_queue_strategy = readable_queue_strategy,
                              encoding = "unknown") {
            super$initialize(
                source_object,
                readable_highwater_mark,
                readable_queue_strategy,
                encoding
            )
            private$.initialize_writeable(
                source_object,
                writeable_highwater_mark,
                writeable_queue_strategy,
                encoding
            )
            # when working with objects, readable and writeable sides have to be from the same source
            if (!is.character(source_object) && !inherits(source_object, "connection")) {
                private$.readable_source <- PushPullSource$new(source_object)
                private$.writeable_source <- private$.readable_source
            }
            invisible(self)
        },
        #' TODOC
        destroy = function(why = NULL) {

            if (identical(private$.read_poll, rlang::caller_fn())) {
                # only destroy if writeable already destroyed
                # (writeable has precedence)
                if (isTRUE(private$.writeable_destroyed)) {
                    private$.readable_destroyed <- TRUE
                    private$.readable_data_flowing <- FALSE
                }
            } else if (identical(private$.write_poll, rlang::caller_fn())) {
                private$.writeable_destroyed <- TRUE
                private$.writeable_data_flowing <- FALSE
            } else {
                private$.readable_destroyed <- TRUE
                private$.readable_data_flowing <- FALSE
                private$.writeable_destroyed <- TRUE
                private$.writeable_data_flowing <- FALSE
            }

            # only occurs if writeable & readable have both called destroy
            # or destroy was called manually
            if (private$.writeable_destroyed && private$.readable_destroyed) {
                if (!is.null(why)) {
                    store_errors()
                    rlang::warn(
                        c(
                            "<DuplexStream> destroyed with message: ",
                            "x" = as.character(unlist(why))
                        )
                    )
                }
                private$.writeable_source$close()
                private$.readable_source$close()
                private$.read_buffer$reset()
                private$.write_buffer$reset()
                self$emit("close")
            }

            invisible(self)
        },
        print = function() {
            cat("# a <DuplexStream>\n")
            lapply(seq_along(self$duplex_state), function(n) {
                cat(
                    stringr::str_pad(names(self$duplex_state[n]), 14, side = "left"),
                    ":",
                    format(self$duplex_state[[n]]),
                    "\n"
                )
            })
            invisible(self)
        }
    ),
    active = list(
        duplex_state = function() {
            list(
                # settings
                encoding = c(private$.readable_encoding, private$.writeable_encoding),
                highwater_mark = c(private$.readable_highwater_mark, private$.writeable_highwater_mark),

                # data len/src
                source = private$.writeable_source$source_object,
                buffer_length = private$.readable_length() + private$.writeable_length(),

                # data flow states
                piped = length(private$.readable_pipes) > 0,
                teed = private$.readable_teed,
                flowing = any(c(private$.readable_data_flowing, private$.writeable_data_flowing)),
                ended = all(c(private$.writeable_ended, private$.readable_ended)),
                destroyed = all(c(private$.writeable_destroyed, private$.readable_destroyed))
            )
        }
    ),
    private = list(
        finalize = function() {
            self$destroy()
        }
    ),
    cloneable = FALSE
)
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.