#' Stream pipe pipe
#'
#' @description
#' Pipe the left-hand side stream into the right-hand side stream.
#' Returns the right-hand side for further use.
#'
#' @param lhs
#' @param rhs
#' @rdname stream_pipes
#' @export
`%|>%` <- function(lhs, rhs) {
invisible(lhs$pipe(rhs))
}
#' Stream tee pipe
#'
#' @description
#' Tees a readablestream, piping the left branch
#' into the right-hand side stream, and returning
#' the right branch for further use.
#'
#' @examples
#' # create a stream that prints the data sent to it
#' # and also increments each chunk by one
#' stream <- ReadableStream$new(1:100) %T>%
#' walk_stream(function(data) print(data)) %|>%
#' map_stream(function(data) data + 1)
#'
#' @param lhs stream to be teed
#' @param rhs stream to be piped to
#' @rdname stream_pipes
#' @export
`%T>%` <- function(lhs, rhs) {
tstream <- lhs$tee()
tstream[[1]]$pipe(rhs)
invisible(tstream[[2]])
}
# ~~~~~~~~~~~~~ Core functionals ~~~~~~~~~~~~~ #
#' Map stream
#'
#' Apply function `fn` to each chunk of data
#'
#' @param fn function applied to each chunk of stream
#' @rdname functional_streams
#' @export
map_stream <- function(fn) {
TransformStream$new(
list(),
transform = fn
)
}
#' Filter stream
#'
#' Filter a stream based on a boolean function, only passing data that results in a
#' true condition
#'
#' @param fn function applied to each chunk of stream that returns a boolean value
#' @rdname functional_streams
#' @export
filter_stream <- function(fn) {
TransformStream$new(
list(),
transform = function(chunk) {
if (isTRUE(fn(chunk))) {
chunk
} else {
ignore_chunk()
}
}
)
}
#' Reduce stream
#'
#' @description
#' Reduce stream data into one chunk via the iterative application of function `fn` to an accumulator and
#' each chunk
#'
#' See also `base::Reduce`
#'
#' @param fn
#' @param base_value
#' @rdname functional_streams
#' @export
reduce_stream <- function(fn, base_value = 0) {
accumulation <- base_value
stream <- TransformStream$new(
list(),
transform = function(chunk) {
if (is_eos(chunk)) {
next_tick(stream$end)
accumulation
} else {
accumulation <<- fn(accumulation, chunk)
ignore_chunk()
}
}
)
stream$on("end", function() {
accumulation <<- NULL
})
invisible(stream)
}
# ~~~~~~~~~~~~~ Misc functionals ~~~~~~~~~~~~~ #
# todo
# future_map_stream <- function(fn) {
# TransformStream$new(
# transform = function(chunk) {
# ignore_chunk()
# }
# )
# }
#' Walk stream
#'
#' @description
#' Map stream, but for side-effects
#' @rdname functional_streams
#' @export
walk_stream <- function(fn) {
TransformStream$new(
list(),
transform = function(chunk) {
fn(chunk)
chunk
}
)
}
#' Collect stream
#'
#' Form of reduce stream where all data is appended to a list before
#' being emitted.
#'
#' @description
#' todo
#'
#' @rdname functional_streams
#' @export
collect_stream <- function() {
reduce_stream(
append,
list()
)
}
#' Batch stream
#'
#' @rdname functional_streams
#' @export
batch_stream <- function(size) {
current_batch <- list()
stream <- TransformStream$new(
transform = function(chunk) {
current_batch <<- append(current_batch, chunk)
if (is_eos(chunk)) {
next_tick(stream$end)
current_batch
} else if (length(current_batch) >= size) {
out <- current_batch
current_batch <<- list()
out
} else {
ignore_chunk()
}
}
)
stream$on("end", function() {
current_batch <<- NULL
})
invisible(stream)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.