Nothing
#' Read and write msgpack formatted messages over R connections.
#'
#' A `msgConnection` object decodes msgpack messages from an
#' underlying R raw connection.
#'
#' @param con A [connection] object open in binary mode.
#' @param max_size The largest partial message to store, in
#' bytes. `NA` means do not enforce a limit.
#' @param read_size How many bytes to read at a time.
#' @param ... Unpacking options (see [unpackMsg]).
#' @return `msgConnection()` returns an object of class
#' `msgConnection`.
#'
#' @examples
#' out <- rawConnection(raw(0), open="wb")
#' apply(quakes, 1, function(x) writeMsg(x, out))
#' length(rawConnectionValue(out))
#' inn <- msgConnection(rawConnection(rawConnectionValue(out), open="rb"))
#' readMsg(inn)
#' readMsgs(inn, 3)
#' @export
msgConnection <- function(con, read_size=2^16, max_size=NA, ...) {
partial <- raw(0)
status <- "ok"
bread <- 0
bwrite <- 0
reader <- function(desired) {
# ignore "desired" and just read non-blockingly.
readRaw(con, read_size)
}
readMsgs <- function(n=NA, read_size = parent.env(environment())$read_size) {
if (is.na(n)) n <- .Machine$integer.max
msgs_bytes <- unpackMsgs(partial, n, max_size = max_size, reader = reader)
partial <<- msgs_bytes$remaining
status <<- msgs_bytes$status
bread <<- bread + msgs_bytes$bytes_read
msgs_bytes$msgs
}
doClose <- function(...) {
close(con, ...)
}
#msgConnection object is just the orig object with this
#environment dangled off it.
structure(addClass(con, "msgConnection"), reader = environment())
}
#' @export
summary.msgConnection <- function(object, ...) {
s <- NextMethod("summary")
c(s, list(status = status(object)))
}
#' @rdname msgConnection
#' @export
close.msgConnection <- function(con, ...) {
attr(con, "reader")$doClose(...)
}
catenator <- function(val=c()) {
# An in-memory FIFO type object.
#tracemem(val)
start <- 0
end <- length(val)
function(x, action="store", ..., opts) {
switch(action,
store = {
lx <- length(x)
l <- length(val)
if (lx > 0) {
#check for overflow
if (end + lx > l && start > 0) {
# rotate back to start
if (start > 0 && end != start) {
val[1:(end-start)] <- val[(start+1):end]
}
end <<- end - start
start <<- 0
}
if (end + lx > l) {
# double array length
length(val) <<- max(end + lx, 2 * l);
}
#inject new values
val[ (end + 1):(end + lx) ] <<- x
end <<- end + lx
}
dbg("lx", lx, "start", start, "end", end, "\n")
x
},
read = {
if (end > start) {
val[(start+1):end]
} else val[c()]
},
buf = {
val
},
start = start,
length = end - start,
end = end,
contents = {
list(val, start, end)
},
reset = {
val <<- x
start <<- 0
end <<- length(x)
},
drop = {
if (x <= end - start && x >= 0) {
start <<- start + x
} else {
stop("have ", end - start, ", can't drop ", x)
}
})
}
}
lister <- function(val = list()) {
n <- length(val)
function(x, action="store") {
switch(action,
store = {
if (n > length(val))
length(val) <<- max(1, 2 * length(val))
n <<- n + 1
val[[n]] <<- x
},
read = {
length(val) <<- n
val
},
length = {
n
},
clear = {
n <<- 0
length(val) <<- 0
}
)
}
}
addClass <- function(x, classes) structure(x, class = c(classes, class(x)))
#' @return `partial(con)` returns any data that has been read ahead of
#' the last decoded message.
#' @rdname msgConnection
#' @export
partial <- function(con) UseMethod("partial")
#' @rdname msgConnection
#' @export
partial.msgConnection <- function(con) {
attr(con, "reader")$partial
}
#' @rdname msgConnection
#' @export
#' @param n The maximum number of messages to read. A value of NA
#' means to parse all available messages until end of input.
#' @return `readMsgs(con, n)` returns a list of up to `n` decoded messages.
readMsgs <- function(con, n = NA, ...) {
UseMethod("readMsgs")
}
#' @export
readMsgs.msgConnection <- function(con, n = NA, ...) {
attr(con, "reader")$readMsgs(n, ...)
}
#' @rdname msgConnection
#' @return `status(con)` returns the status of msgpack decoding on the
#' connection. A value of `"ok"` indicates all requested messages
#' were read, `"buffer underflow"` for a non-blocking connection
#' indicates that only part of a message has been received, and
#' `"end of input"` means the last available message has been read.
#' Other values indicate errors encountered in decoding, which will
#' effectively halt reading.
#' @export
status <- function(con) UseMethod("status")
#' @rdname msgConnection
#' @export
status.msgConnection <- function(con) {
attr(con, "reader")$status
}
#' @rdname msgConnection
#' @return `seek(con)` returns the number of bytes that have been
#' successfully read or written, depending on the mode of the
#' connection. (Repositioning is not supported.)
#' @param rw See [seek()].
#' @export
seek.msgConnection <- function(con, rw = summary(con)$mode, ...) {
rw <- pmatch(rw, c("read", "write"), 0L)
switch(rw,
attr(con, "reader")$bread,
attr(con, "reader")$bwrite,
)
}
#' `readMsg(con)` reads exactly one message from a
#' msgConnection, or throws an error.
#'
#' @rdname msgConnection
#' @return `readMsg(con)` returns one decoded message.
#' @export
readMsg <- function(con, ...) {
UseMethod("readMsg", con)
}
#' @export
readMsg.msgConnection <- function(con, ...) {
x <- readMsgs(con, 1, ...)
if (length(x) < 1) {
stop(status(con))
}
x[[1]]
}
#' `writeMsg(x, con)` writes a single message to a msgConnection.
#'
#' @rdname msgConnection
#' @param obj An R object.
#' @export
writeMsg <- function(obj, con, ...) {
UseMethod("writeMsg", con)
}
#' @export
writeMsg.connection <- function(obj, con, ...) {
writeMsgs(list(obj), con, ...)
}
#' @export
writeMsg.msgConnection <- function(obj, con, ...) {
writeMsgs(list(obj), con, ...)
}
#' `writeMsgs(l, conn)` writes a list of
#' messages to a connection. That is, `writeMsg(1:10, conn)` writes one
#' message containing an array, while `writeMsgs(1:10, conn)` writes
#' ten consecutive messages each containing one integer.
#'
#' `writeMsg` will work with any R connection in raw mode, but reading
#' requires a msgConnection object.
#'
#' Because msgpack messages have unpredictable length, the decoder
#' reads ahead in chunks, then finds the boundaries between messages.
#' Therefore when reading over a socket or a fifo it is best to use a
#' nonblocking connection, and it will not work to mix readMsg and
#' readBin on the same connection.
#'
#' If you are reading data from a not completely trusted source you
#' should specify options `max_size` and `max_depth` (see
#' [unpackOpts]). Without it, some deeply nested or cleverly designed
#' messages can cause a stack overflow or out-of-memory error. With
#' these options set, you will get an R exception instead.
#' @rdname msgConnection
#' @param objs A list of R objects.
#' @export
writeMsgs <- function(objs, con, ...) {
UseMethod("writeMsgs", con)
}
#' @export
writeMsgs.connection <- function(objs, con, ...) {
writeRaw(packMsgs(objs, ...), con)
}
#' @export
writeMsgs.msgConnection <- function(objs, con, ...) {
buf <- packMsgs(objs, ...)
result <- writeRaw(buf, con)
attr(con, "reader")$bwrite <- attr(con, "reader")$bwrite + length(buf)
invisible(result)
}
## To support test harness, we use "readRaw" and "writeRaw"
## internally, instead of "readBin" which is not an S3 method
readRaw <- function(con, n, ...) {
UseMethod("readRaw")
}
writeRaw <- function(object, con, ...) {
UseMethod("writeRaw", con)
}
readRaw.connection <- function(con, n) {
# I get errors thrown (sometimes) when reading at the end of
# a nonblocking fifo.
tryCatch({
readBin(con, 'raw', n)
},
error = function(e) {
# warning("Ignoring ", e)
raw(0)
})
}
writeRaw.connection <- function(object, con, ...) {
writeBin(object, con, ...)
}
## An inefficient double ended byte buffer for test harness purposes
rawBuffer <- function(object = raw(0)) {
open <- "r"
bytes <- length(object)
buf <- rawConnection(object, open = "r")
object <- NULL
write <- function(object) {
switch(open,
"w" = {
writeBin(object, buf)
},
"r" = {
data <- readBin(buf, 'raw', bytes - seek(buf))
close(buf)
buf <<- rawConnection(data,
open = "w")
open <<- "w"
writeBin(data, buf)
write(object)
}
)
}
read <- function(n) {
switch(open,
"r" = {
readBin(buf, 'raw', n)
},
"w" = {
##convert a write buffer into a read buffer
val <- rawConnectionValue(buf)
close(buf)
buf <<- rawConnection(val, open = "r")
bytes <<- length(val)
open <<- "r"
read(n)
}
)
}
doClose <- function(n) {
close(buf)
buf <- NULL
}
structure(list(environment()), class = "rawBuffer")
}
writeRaw.rawBuffer <- function(object, con) {
con[[1]]$write(object)
}
readRaw.rawBuffer <- function(con, n) {
con[[1]]$read(n)
}
close.rawBuffer <- function(con) {
con[[1]]$doClose()
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.