Source <- R6::R6Class(
"Source",
public = list(
poll_connection = NULL,
initialize = function(source_object) {
is_read_source <- "pull" %in% names(rlang::caller_env(n = 2)[["public_fields"]])
if (is.character(source_object)) {
if (is_read_source) {
self$source_object <- file(source_object, open = "rb", blocking = FALSE)
} else {
self$source_object <- file(source_object, open = "wb", blocking = FALSE)
}
}
if (inherits(self$source_object, "connection")) {
suppressWarnings(open(self$source_object))
} else if (!is.function(self$source_object)) {
self$source_object <- Queue$new()
if (class(source_object) %in% c("data.frame", "data.table", "tibble")) {
for (i in seq_len(nrow(source_object))) {
self$source_object$add(source_object[i,])
}
} else {
for (i in seq_along(source_object)) {
self$source_object$add(source_object[[i]])
}
}
} else {
self$source_object <- source_object
}
# poll method
self$poll_connection <- get_poll_method(self$source_object, is_read_source)
environment(self$poll_connection) <- rlang::current_env()
},
close = function() {
if (inherits(self$source_object, "connection") ||
inherits(self$source_object, "processx_connection")) {
try(
expr = {
close(self$source_object)
},
silent = TRUE
)
} else if ("pull" %in% names(rlang::caller_env(n = 2)[["public_fields"]])) {
self$source_object <- eos_signal()
}
}
),
class = FALSE,
cloneable = FALSE
)
PullSource <- R6::R6Class(
classname = "PullSource",
inherit = Source,
public = list(
source_object = NULL,
pull = NULL,
initialize = function(source_object) {
super$initialize(source_object)
# ~~ Set methods ~~ #
self$pull <- get_pull_method(self$source_object)
environment(self$pull) <- rlang::current_env()
invisible(self)
}
),
class = FALSE,
cloneable = FALSE
)
PushSource <- R6::R6Class(
inherit = Source,
public = list(
source_object = NULL,
push = NULL,
initialize = function(source_object) {
super$initialize(source_object)
# ~~ Set methods ~~ #
self$push <- get_push_method(self$source_object)
environment(self$push) <- rlang::current_env()
}
),
class = FALSE,
cloneable = FALSE
)
PushPullSource <- R6::R6Class(
inherit = Source,
public = list(
source_object = NULL,
push = NULL,
pull = NULL,
initialize = function(source_object) {
super$initialize(source_object)
# ~~ Set methods ~~ #
self$push <- get_push_method(self$source_object)
self$pull <- get_pull_method(self$source_object)
environment(self$pull) <- rlang::current_env()
environment(self$push) <- rlang::current_env()
}
)
)
get_push_method <- function(source_object) {
if (inherits(source_object, "processx_connection")) {
function(text, ...) {
processx::processx_conn_write(
str = text,
con = self$source_object
)
}
} else if (inherits(source_object, "connection") ||
is.character(source_object)) {
function(text, ...) {
if (!is.character(text)) {
text <- as.character(text)
}
writeLines(
con = self$source_object,
text = text,
sep = "\n"
)
}
} else {
function(text, ...) {
self$source_object$add(text)
}
}
}
get_pull_method <- function(source_object) {
if (inherits(source_object, "processx_connection")) {
function(n) {
processx::conn_read_chars(
con = self$source_object,
n
)
}
} else if (inherits(source_object, "connection")) {
function(n) {
readChar(
con = self$source_object,
useBytes = TRUE,
nchars = n
)
}
} else {
function(...) {
self$source_object$remove()
}
}
}
get_poll_method <- function(source_object, is_read_source) {
if (is_eos(source_object)) {
function() FALSE
} else if (inherits(source_object, "processx_connection")) {
function() {
processx::poll(list(self$source_object), 1)[[1]] != "closed"
}
} else if (inherits(source_object, "connection")) {
function() {
tryCatch(
expr = {
isOpen(self$source_object)
},
error = function(e) {
FALSE
}
)
}
} else {
function() {
!is_eos(self$source_object$peek())
}
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.