R/stream-functionals.R

Defines functions batch_stream collect_stream walk_stream reduce_stream filter_stream map_stream `%T>%` `%|>%`

Documented in batch_stream collect_stream filter_stream map_stream reduce_stream walk_stream

#' Stream pipe pipe
#'
#' @description
#' Pipe the left-hand side stream into the right-hand side stream.
#' Returns the right-hand side for further use.
#'
#' @param lhs
#' @param rhs
#' @rdname stream_pipes
#' @export
`%|>%` <- function(lhs, rhs) {
    invisible(lhs$pipe(rhs))
}

#' Stream tee pipe
#'
#' @description
#' Tees a readablestream, piping the left branch
#' into the right-hand side stream, and returning
#' the right branch for further use.
#'
#' @examples
#' # create a stream that prints the data sent to it
#' # and also increments each chunk by one
#' stream <- ReadableStream$new(1:100) %T>%
#'      walk_stream(function(data) print(data)) %|>%
#'      map_stream(function(data) data + 1)
#'
#' @param lhs stream to be teed
#' @param rhs stream to be piped to
#' @rdname stream_pipes
#' @export
`%T>%` <- function(lhs, rhs) {
    tstream <- lhs$tee()
    tstream[[1]]$pipe(rhs)
    invisible(tstream[[2]])
}

# ~~~~~~~~~~~~~ Core functionals ~~~~~~~~~~~~~ #

#' Map stream
#'
#' Apply function `fn` to each chunk of data
#'
#' @param fn function applied to each chunk of stream
#' @rdname functional_streams
#' @export
map_stream <- function(fn) {
    TransformStream$new(
        list(),
        transform = fn
    )
}

#' Filter stream
#'
#' Filter a stream based on a boolean function, only passing data that results in a
#' true condition
#'
#' @param fn function applied to each chunk of stream that returns a boolean value
#' @rdname functional_streams
#' @export
filter_stream <- function(fn) {
    TransformStream$new(
        list(),
        transform = function(chunk) {
            if (isTRUE(fn(chunk))) {
                chunk
            } else {
                ignore_chunk()
            }
        }
    )
}

#' Reduce stream
#'
#' @description
#' Reduce stream data into one chunk via the iterative application of function `fn` to an accumulator and
#' each chunk
#'
#' See also `base::Reduce`
#'
#' @param fn
#' @param base_value
#' @rdname functional_streams
#' @export
reduce_stream <- function(fn, base_value = 0) {
    accumulation <- base_value
    stream <- TransformStream$new(
        list(),
        transform = function(chunk) {
            if (is_eos(chunk)) {
                next_tick(stream$end)
                accumulation
            } else {
                accumulation <<- fn(accumulation, chunk)
                ignore_chunk()
            }
        }
    )
    stream$on("end", function() {
        accumulation <<- NULL
    })

    invisible(stream)
}

# ~~~~~~~~~~~~~ Misc functionals ~~~~~~~~~~~~~ #

# todo
# future_map_stream <- function(fn) {
#     TransformStream$new(
#         transform = function(chunk) {
#             ignore_chunk()
#         }
#     )
# }

#' Walk stream
#'
#' @description
#' Map stream, but for side-effects
#' @rdname functional_streams
#' @export
walk_stream <- function(fn) {
    TransformStream$new(
        list(),
        transform = function(chunk) {
            fn(chunk)
            chunk
        }
    )
}

#' Collect stream
#'
#' Form of reduce stream where all data is appended to a list before
#' being emitted.
#'
#' @description
#' todo
#'
#' @rdname functional_streams
#' @export
collect_stream <- function() {
    reduce_stream(
        append,
        list()
    )
}

#' Batch stream
#'
#' @rdname functional_streams
#' @export
batch_stream <- function(size) {
    current_batch <- list()
    stream <- TransformStream$new(
        transform = function(chunk) {
            current_batch <<- append(current_batch, chunk)
            if (is_eos(chunk)) {
                next_tick(stream$end)
                current_batch
            } else if (length(current_batch) >= size) {
                out <- current_batch
                current_batch <<- list()
                out
            } else {
                ignore_chunk()
            }
        }
    )
    stream$on("end", function() {
        current_batch <<- NULL
    })

    invisible(stream)
}
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.