R/stream-transform.R

Defines functions is_ignore ignore_chunk

# transform internally implements .write and .read methods
# user has to supply a .transform method for it to function
TransformStream <- R6::R6Class(
    "TransformStream",
    inherit = DuplexStream,
    public = list(
        initialize = function(source_object = NULL,
                              readable_highwater_mark = 16384,
                              writeable_highwater_mark = readable_highwater_mark,
                              encoding = "UTF-8",
                              transform = NULL) {
            super$initialize(
                source_object,
                readable_highwater_mark = readable_highwater_mark,
                writeable_highwater_mark = writeable_highwater_mark,
                encoding = encoding
            )

            if (!is.null(transform)) {
                pretty_stopifnot(
                    "transform argument must be a function",
                    sprintf("argument is of class '%s'", class(transform)),
                    is.function(transform)
                )
                pretty_stopifnot(
                    "transform method must have the correct function signature: one argument for passing chunks",
                    sprintf("length of supplied method arguments is %s", length(formalArgs(transform))),
                    !is.null(formalArgs(transform)) && length(formalArgs(transform)) == 1
                )

                unlockBinding(".transform", private)
                private$.transform <- transform
                lockBinding(".transform", private)
            }
            invisible(self)
        },
        write = function(chunk, encoding) {
            transformed_chunk <- private$.transform(chunk)
            if (!is_ignore(transformed_chunk)) {
                out <- private$.write(transformed_chunk, encoding)
                next_tick(poll_call, private$.write_poll)
                invisible(out)
            }
        }
    ),
    private = list(
        .transform = function(chunk) {
            error_method_not_implemented(".transform")
        }
    ),
    cloneable = FALSE
)

ignore_chunk <- function() {
    return(structure(raw(0), class = "ignore_signal"))
}

is_ignore <- function(x) {
    return(identical(ignore_chunk(), x))
}
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.