R/stream-helpers.R

Defines functions destination_unpipe destination_pipe_error source_pipe_end source_pipe_pause source_pipe_error source_pipe_data

# ~~~~~ Piping ~~~~~ #

source_pipe_data <- function(chunk) {
    out <- destination$write(chunk)
    if (isFALSE(out)) {
        private$.source_pipe_pause()
        # add the unwritten data back to the stream
        if (!is_ignore(chunk) && length(chunk) > 0) {
            self$unshift(chunk)
        }
    }
}

source_pipe_error <- function(err) {
    private$.readable_errored <- "Error propogated to RHS of pipe"
    destination$emit(internal_events$pipe_error, err)
    destination$off("error", private$.destination_pipe_error)
    self$unpipe(destination)
    self$off("error", private$.source_pipe_end)
}

source_pipe_pause <- function() {
    private$.readable_pause()
    # only 1 drain per stream
    if (length(Filter(function(x) identical(x, destination), private$.readable_pipe_drains)) == 0) {
        private$.readable_pipe_drains <- append(private$.readable_pipe_drains, destination)
        destination$once("drain", function() {
            private$.readable_pipe_drains <- splice_element(private$.readable_pipe_drains, destination)
            if (length(private$.readable_pipe_drains) == 0) {
                private$.readable_resume()
            }
        })
    }
}

source_pipe_end <- function() {
    # allow data to appropriately flow before signal
    destination$write(eos_signal())
}

destination_pipe_error <- function(err) {
    private$.readable_errored <- "Unpiped due to RHS pipe error"
    self$unpipe(destination)
    destination$off("error", private$.destination_pipe_error)
}

destination_unpipe <- function() {
    self$unpipe(destination)
}

# ~~~~~ Tee ~~~~~ #
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.