#' R6 Class representing a readable connection
#'
#' Events:
#' * 'data'
#' * 'readable'
#' * 'pause'
#' * 'resume'
#' * 'new_listener'
#' * 'remove_listener'
#' * 'error'
#' * 'close'
#'
#' Two types of data consumption:
#' * flowing mode
#' * readable mode
#'
#' Errors:
#' * Errors are propogated by the event emitter, and will halt execution unless handled appropriately. If an error
#' listener is attached, errors are caught and do not halt execution. Pipes automatically propogate errors to the
#' right-hand-side pipe (if it exists).
#' @export
ReadableStream <- R6::R6Class(
"ReadableStream",
inherit = EventEmitter,
public = list(
#' @description
#' Create a new ReadableStream object
#' @param source_object
#' file path,
#' connection,
#' processx connection,
#' socket connection,
#' iterator,
#' or anything coercable to an iterator
#' @param highwater_mark maximum buffer size
#' @param queue_strategy
#' the function used to determine the maximum size of the internal buffer.
#' the buffer size is determined by calling queue_strategy(internal_buffer)
#' @param encoding encoding used when reading from the connection
#' @param emit_close should close be emitted when destroying the stream?
#' @return A new `ReadableStream` object.
initialize = function(source_object,
highwater_mark = NULL,
queue_strategy = object.size,
encoding = "UTF8") {
pretty_stopifnot(
"encoding must be a character vector",
sprintf("encoding is of class %s", class(encoding)),
is.character(encoding)
)
private$.readable_source <- PullSource$new(source_object)
private$.readable_encoding <- encoding
private$.readable_queueing_strategy <- queue_strategy
private$.read_buffer <- Queue$new(guesstimate_size(source_object))
private$.readable_highwater_mark <- set_highwater(highwater_mark, queue_strategy)
## listeners
self$on("error", function(err) {
if (isFALSE(private$.readable_errored)) {
private$.readable_errored <- err
if (length(private$.readable_pipes) == 0) {
self$destroy(err)
} else {
self$destroy()
}
}
})
## switch to flowing if there is a data listener
## and flow is not explicitly paused
self$on("new_listener", function(event_name, .fn) {
if (private$.readable_teed && event_name %in% c("data", "readable")) {
pretty_stopifnot(
"Cannot add listeners to a teed ReadableStream",
"ReadableStream has been teed",
!private$.readable_teed
)
}
if (event_name == "data" && !isFALSE(private$.readable_data_flowing)) {
private$.readable_resume()
next_tick(poll_call, private$.read_poll)
} else if (event_name == "readable") {
private$.readable_pause()
next_tick(poll_call, private$.read_poll)
}
})
# if there are no data listeners, nor pipes, stream must be paused
self$on("remove_listener", function(event_name, .fn) {
if (event_name == "data" &&
length(self$raw_listeners(event_name)) == 0 &&
length(private$.readable_pipes) == 0) {
private$.readable_pause()
}
})
invisible(self)
},
#' Print
#' @description
#' Pretty print a <ReadableStream> object
print = function() {
cat("# a <ReadableStream>\n")
lapply(seq_along(self$readable_state), function(n) {
cat(
stringr::str_pad(names(self$readable_state[n]), 14, side = "left"),
":",
format(self$readable_state[[n]]),
"\n"
)
})
invisible(self)
},
#' Read from stream
#' @description
#' How many lines to read from the internal buffer. If n < 0, read
#' all lines.
#' @param n
read = function(n = -1) {
# can be used to force an end signal
if (n == 0) {
return(eos_signal())
}
private$.readable_needs_emit <- TRUE
if (private$.read_buffer$size() > 0) {
extracted_chunks <- private$.read_buffer$remove(n = n)
return(extracted_chunks)
}
NULL
},
#' pipe to a writeable stream
#' @param destination writeable stream
#' @description
pipe = function(destination) {
pretty_stopifnot(
"Pipe destination must inherit from a Writeable or Duplex stream",
sprintf("Destination is of class '%s'", class(destination)),
inherits(destination, c("WriteableStream", "DuplexStream"))
)
pretty_stopifnot(
"Stream cannot pipe when it has been destroyed, or is in the process of ending",
sprintf("Stream has already %s", ifelse(private$.readable_destroyed, "been destroyed", "ended")),
!private$.readable_destroyed && !private$.readable_ended
)
pretty_stopifnot(
"ReadableStream cannot be piped when teed",
"ReadableStream has already been teed",
!private$.readable_teed
)
private$.readable_pipes <- append(destination, private$.readable_pipes)
.source_pipe_data <- set_function(source_pipe_data)
.source_pipe_error <- set_function(source_pipe_error)
.source_pipe_pause <- set_function(source_pipe_pause)
.source_pipe_end <- set_function(source_pipe_end)
.destination_pipe_error <- set_function(destination_pipe_error)
.destination_unpipe <- set_function(destination_unpipe)
destination$prepend_listener("error", .destination_pipe_error)
destination$once("close", .destination_unpipe)
self$on("data", .source_pipe_data)
self$on("error", .source_pipe_error)
self$on("end", .source_pipe_end)
if (!isTRUE(private$.readable_data_flowing)) {
private$.readable_resume()
poll_call(private$.read_poll)
}
destination$emit("pipe", self)
invisible(destination)
},
#' remove a pipe pointing to a writeable stream
#' @param destination writeable stream
#' @description
unpipe = function(destination) {
# remove all pipes if unspecified
if (missing(destination)) {
lapply(private$.readable_pipes, function(destination) {
destination$emit("unpipe", self)
})
private$.readable_pipes <- list()
} else {
pretty_stopifnot(
"Unpipe destination must inherit from a Writeable or Duplex stream",
sprintf("Destination is of class '%s'", class(destination)),
inherits(destination, c("WriteableStream", "DuplexStream"))
)
private$.readable_pipes <- splice_element(private$.readable_pipes, destination)
destination$emit("unpipe", self)
}
if (length(private$.readable_pipes) == 0) {
private$.readable_pause()
}
invisible(destination)
},
# WIP
tee = function() {
pretty_stopifnot(
"ReadableStream cannot be teed multiple times",
"ReadableStream has already been teed",
!private$.readable_teed
)
pretty_stopifnot(
"Stream cannot tee when it has been destroyed, or is in the process of ending",
sprintf("Stream has already %s", ifelse(private$.readable_destroyed, "been destroyed", "ended")),
!private$.readable_destroyed && !private$.readable_ended
)
pretty_stopifnot(
"Stream cannot tee when it has been piped from",
"Stream has already piped to another stream",
length(private$.readable_pipes) == 0
)
left_stream <- ReadableStream$new(
source_object = list(),
highwater_mark = private$.readable_highwater_mark,
queue_strategy = private$.readable_queueing_strategy,
encoding = private$.readable_encoding
)
right_stream <- ReadableStream$new(
source_object = list(),
highwater_mark = private$.readable_highwater_mark,
queue_strategy = private$.readable_queueing_strategy,
encoding = private$.readable_encoding
)
# todo, more logic here!
self$on("data", function(data) {
left_stream$push(data)
right_stream$push(data)
})
self$on("error", function(err) {
left_stream$emit("error", err)
right_stream$emit("error", err)
})
private$.readable_teed <- TRUE
invisible(list(left_stream, right_stream))
},
#' Pause stream
#' @description
#' Pause the stream, and emit a 'pause' event (if not already paused)
pause = function() {
if (length(private$.readable_pipes) == 0) {
private$.readable_pause()
} else {
rlang::warn(
c(
x = "Cannot manually pause a piped stream.",
i = "Unpipe the stream if manual control is required."
)
)
}
invisible(self)
},
#' @description
#' Resume stream, and emit a 'resume' event (if not already flowing)
resume = function() {
if (length(private$.readable_pipes) == 0) {
if (length(self$raw_listeners("data")) == 0) {
rlang::warn(
c(
"i" = "Resuming <ReadableStream> without any data listeners attached",
">" = "This may result in data being read without a destination"
)
)
}
private$.readable_resume()
} else {
rlang::warn(
c(
x = "Cannot manually resume a piped stream.",
i = "Unpipe the stream if manual control is required."
)
)
}
invisible(self)
},
unshift = function(chunk) {
private$.add_chunk(chunk, append = FALSE)
},
push = function(chunk) {
private$.add_chunk(chunk, append = TRUE)
},
#' Destroy stream
#' @description
#' Destroy the stream, closing its file descriptor and emitting
#' a close event when applicable
#' @param why a message to print when destroying the stream
destroy = function(why = NULL) {
private$.readable_destroyed <- TRUE
private$.readable_data_flowing <- FALSE
if (!is.null(why)) {
store_errors()
rlang::warn(
c(
"<ReadableStream> destroyed with message: ",
as.character(why)
)
)
}
# cleanup
private$.read_buffer$reset()
private$.readable_source$close()
self$emit("close")
invisible(self)
}
),
private = list(
finalize = function() {
self$destroy()
},
.read_buffer = NULL,
.readable_source = NULL,
# readable options
.readable_encoding = NULL,
.readable_queueing_strategy = NULL,
.readable_highwater_mark = NULL,
# readable state
.readable_needs_emit = TRUE,
.readable_reading = FALSE,
.readable_data_flowing = NULL,
.readable_destroyed = FALSE,
.readable_ended = FALSE,
.readable_ending = FALSE,
.readable_errored = FALSE,
.readable_teed = FALSE,
.readable_pipes = list(),
.readable_pipe_drains = list(),
.readable_resume = function() {
if (!isTRUE(private$.readable_data_flowing)) {
private$.readable_data_flowing <- TRUE
self$emit("resume")
}
invisible(self)
},
.readable_pause = function() {
if (!isFALSE(private$.readable_data_flowing)) {
private$.readable_data_flowing <- FALSE
super$emit("pause")
}
invisible(self)
},
.read_poll = function() {
private$.with_error_handler(
code = {
# destroyed means that no data is *ever* passed
if (private$.readable_destroyed ||
is.null(private$.readable_data_flowing)
) {
return(FALSE)
}
private$.readable_emit_data()
private$.readable_maybe_pull()
private$.readable_maybe_end()
# continue polling
return(TRUE)
}
)
},
.read = function() {
res <- TRUE
while (res) {
chunk <- private$.readable_source$pull(private$.readable_amount_to_read())
if (length(chunk) == 0) {
chunk <- eos_signal()
}
res <- self$push(chunk)
if (isFALSE(res)) {
break
}
}
},
.add_chunk = function(chunk, append = TRUE) {
if (private$.readable_ended) {
return(FALSE)
} else if (is_eos(chunk)) {
private$.readable_ending <- TRUE
} else if (!append) {
private$.read_buffer$add(chunk, TRUE)
} else if (private$.readable_amount_to_read() > 0) {
if (append && isFALSE(private$.readable_reading)) {
private$.readable_reading <- TRUE
private$.read_buffer$add(chunk)
private$.readable_reading <- FALSE
}
return(TRUE)
}
FALSE
},
.readable_length = function() {
if (identical(private$.readable_queueing_strategy, object.size)) {
object.size(private$.read_buffer$as_list()) - object.size(list())
} else {
private$.readable_queueing_strategy(private$.read_buffer$as_list())
}
},
.readable_amount_to_read = function() {
amount_to_handle(private$.readable_highwater_mark, private$.readable_length())
},
.readable_emit_data = function() {
if (private$.read_buffer$size() > 0) {
# push data
if (private$.readable_data_flowing) {
buffer <- self$read(n = 1)
if (length(buffer) > 0) {
self$emit("data", buffer)
}
} else if (private$.readable_needs_emit) {
# emit a readable event once
private$.readable_needs_emit <- FALSE
self$emit("readable")
}
}
},
.readable_maybe_pull = function() {
if (private$.readable_source$poll_connection()) {
if (private$.read_buffer$size() == 0 ||
private$.readable_amount_to_read() > 0) {
next_tick(private$.read)
}
}
},
.readable_maybe_end = function() {
if (!private$.readable_source$poll_connection()) {
# if stream is ending and the buffer has been emptied,
# emit end
if (private$.readable_ending && private$.read_buffer$size() == 0) {
private$.readable_ending <- FALSE
private$.readable_ended <- TRUE
self$emit("end")
} else if (private$.readable_ended) {
self$destroy()
}
}
},
.source_pipe_data = NULL,
.source_pipe_error = NULL,
.source_pipe_pause = NULL,
.source_pipe_end = NULL,
.destination_pipe_error = NULL,
.destination_unpipe = NULL
),
active = list(
readable_state = function() {
list(
# settings
encoding = private$.readable_encoding,
highwater_mark = private$.readable_highwater_mark,
# data len/src
source = private$.readable_source$source_object,
buffer = private$.read_buffer,
buffer_length = private$.readable_length(),
# data flow states
piped = length(private$.readable_pipes) > 0,
teed = private$.readable_teed,
flowing = private$.readable_data_flowing,
ended = private$.readable_ended,
destroyed = private$.readable_destroyed,
errored = private$.readable_destroyed
)
}
),
cloneable = FALSE
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.