R/stream.R

Defines functions amount_to_handle is_eos eos_signal resume_streams pause_streams create_duplex_stream create_write_stream create_read_stream

Documented in create_duplex_stream create_read_stream create_write_stream pause_streams resume_streams

# ~~~~~~~~~~~~~~~ Exports ~~~~~~~~~~~~~~~~ #

#' Create a readable stream from a given path.
#' @export
#' @param path character path to a file
#' @return ReadableStream
#' @rdname io
create_read_stream <- function(path) {
    ReadableStream$new(
        conn = path,
        encoding = "UTF-8",
        emit_close = TRUE
    )
}


#' Create a writeable stream from a given path.
#' @export
#' @param path character path to a file
#' @return WriteableStream
#' @rdname io
create_write_stream <- function(...) {
    WriteableStream$new(...)
}


#' Create a duplex stream from a given path.
#' This stream is both a readable and
#' writeable stream.
#' @export
#' @param path character path to a file
#' @return DuplexStream
#' @rdname io
create_duplex_stream <- function(...) {
    DuplexStream$new(...)
}

#' TODOC
#' @export
#' @rdname io
pause_streams <- function(streams) {
    lapply(streams, function(stream) {
        stream$resume()
    })
}

#' TODOC
#' @export
#' @rdname io
resume_streams <- function(streams) {
    lapply(streams, function(stream) {
        stream$pause()
    })
}

# ~~~~~~~~~~~~~~~ Internal ~~~~~~~~~~~~~~~~ #

eos_signal <- function() {
    return(structure(raw(0), class = "eos_signal"))
}

is_eos <- function(x) {
    return(identical(eos_signal(), x))
}


amount_to_handle <- function(highwater, len) {
     if (highwater > 0) {
         out <- highwater - len
         if (out > 0) {
             return(out)
         }
         0
     } else {
         Inf
     }
}
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.