# ~~~~~ Piping ~~~~~ #
source_pipe_data <- function(chunk) {
out <- destination$write(chunk)
if (isFALSE(out)) {
private$.source_pipe_pause()
# add the unwritten data back to the stream
if (!is_ignore(chunk) && length(chunk) > 0) {
self$unshift(chunk)
}
}
}
source_pipe_error <- function(err) {
private$.readable_errored <- "Error propogated to RHS of pipe"
destination$emit(internal_events$pipe_error, err)
destination$off("error", private$.destination_pipe_error)
self$unpipe(destination)
self$off("error", private$.source_pipe_end)
}
source_pipe_pause <- function() {
private$.readable_pause()
# only 1 drain per stream
if (length(Filter(function(x) identical(x, destination), private$.readable_pipe_drains)) == 0) {
private$.readable_pipe_drains <- append(private$.readable_pipe_drains, destination)
destination$once("drain", function() {
private$.readable_pipe_drains <- splice_element(private$.readable_pipe_drains, destination)
if (length(private$.readable_pipe_drains) == 0) {
private$.readable_resume()
}
})
}
}
source_pipe_end <- function() {
# allow data to appropriately flow before signal
destination$write(eos_signal())
}
destination_pipe_error <- function(err) {
private$.readable_errored <- "Unpiped due to RHS pipe error"
self$unpipe(destination)
destination$off("error", private$.destination_pipe_error)
}
destination_unpipe <- function() {
self$unpipe(destination)
}
# ~~~~~ Tee ~~~~~ #
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.