init_duplex <- function(...) {
groups <- list(
public_fields = WriteableStream$public_fields,
public_methods = WriteableStream$public_methods,
private_fields = WriteableStream$private_fields,
private_methods = WriteableStream$private_methods,
active = WriteableStream$active
)
DuplexStream[["private_methods"]][[".initialize_writeable"]] <- WriteableStream$public_methods$initialize
DuplexStream[["private_methods"]][[".writeable_poll"]] <- WriteableStream$private_methods$.poll
DuplexStream[["private_methods"]][[".readable_poll"]] <- ReadableStream$private_methods$.poll
for (i in seq_along(groups)) {
object_group <- groups[i]
for (objects in object_group) {
group_name <- names(object_group)[[1]]
Map(
f = function(fn_body, name) {
if (is.null(fn_body)) fn_body <- list(NULL)
if (is.null(DuplexStream[[group_name]][name][[1]])) {
DuplexStream[[group_name]][[name]] <- fn_body
}
},
objects,
names(objects)
)
}
}
}
#' Duplex Stream
#' @export
DuplexStream <- R6::R6Class(
"DuplexStream",
inherit = ReadableStream,
public = list(
#' @description
#' Create a new DuplexStream object
#' @param source_object a file path, connection, processx connection, or socket connection
#' @param readable_highwater_mark maximum buffer size
#' @param writeable_highwater_mark maximum buffer size
#' @param encoding encoding used when reading from the connection
#' @return A new `DuplexStream` object.
initialize = function(source_object,
readable_highwater_mark = 16384,
writeable_highwater_mark = readable_highwater_mark,
readable_queue_strategy = object.size,
writeable_queue_strategy = readable_queue_strategy,
encoding = "unknown") {
super$initialize(
source_object,
readable_highwater_mark,
readable_queue_strategy,
encoding
)
private$.initialize_writeable(
source_object,
writeable_highwater_mark,
writeable_queue_strategy,
encoding
)
# when working with objects, readable and writeable sides have to be from the same source
if (!is.character(source_object) && !inherits(source_object, "connection")) {
private$.readable_source <- PushPullSource$new(source_object)
private$.writeable_source <- private$.readable_source
}
invisible(self)
},
#' TODOC
destroy = function(why = NULL) {
if (identical(private$.read_poll, rlang::caller_fn())) {
# only destroy if writeable already destroyed
# (writeable has precedence)
if (isTRUE(private$.writeable_destroyed)) {
private$.readable_destroyed <- TRUE
private$.readable_data_flowing <- FALSE
}
} else if (identical(private$.write_poll, rlang::caller_fn())) {
private$.writeable_destroyed <- TRUE
private$.writeable_data_flowing <- FALSE
} else {
private$.readable_destroyed <- TRUE
private$.readable_data_flowing <- FALSE
private$.writeable_destroyed <- TRUE
private$.writeable_data_flowing <- FALSE
}
# only occurs if writeable & readable have both called destroy
# or destroy was called manually
if (private$.writeable_destroyed && private$.readable_destroyed) {
if (!is.null(why)) {
store_errors()
rlang::warn(
c(
"<DuplexStream> destroyed with message: ",
"x" = as.character(unlist(why))
)
)
}
private$.writeable_source$close()
private$.readable_source$close()
private$.read_buffer$reset()
private$.write_buffer$reset()
self$emit("close")
}
invisible(self)
},
print = function() {
cat("# a <DuplexStream>\n")
lapply(seq_along(self$duplex_state), function(n) {
cat(
stringr::str_pad(names(self$duplex_state[n]), 14, side = "left"),
":",
format(self$duplex_state[[n]]),
"\n"
)
})
invisible(self)
}
),
active = list(
duplex_state = function() {
list(
# settings
encoding = c(private$.readable_encoding, private$.writeable_encoding),
highwater_mark = c(private$.readable_highwater_mark, private$.writeable_highwater_mark),
# data len/src
source = private$.writeable_source$source_object,
buffer_length = private$.readable_length() + private$.writeable_length(),
# data flow states
piped = length(private$.readable_pipes) > 0,
teed = private$.readable_teed,
flowing = any(c(private$.readable_data_flowing, private$.writeable_data_flowing)),
ended = all(c(private$.writeable_ended, private$.readable_ended)),
destroyed = all(c(private$.writeable_destroyed, private$.readable_destroyed))
)
}
),
private = list(
finalize = function() {
self$destroy()
}
),
cloneable = FALSE
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.