Nothing
context("unpack from streams")
#most of these tests to be implemented using a fifo connection.
`%is%` <- expect_equal
msgs <- as.raw(c(0xa5, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
0xa3, 0x61, 0x6e, 0x64,
0xa7, 0x67, 0x6f, 0x6f))
two_ends <- function(f, read_size = 2^16, max_buf = NA) {
fl <- file("", open="w+b", blocking=FALSE)
on.exit(close(fl), add=TRUE)
f(fl, msgConnection(fl))
}
test_that("consume N messages and return remaining data", {
expect_equal(unpackMsgs(as.raw(1:10)),
list(msgs = as.list(1:10),
remaining = raw(0),
status = "end of input",
bytes_read = 10))
expect_equal(unpackMsgs(msgs),
list(msgs = list("hello", "and"),
remaining = as.raw(c(0xa7, 0x67, 0x6f, 0x6f)),
status = "buffer underflow",
bytes_read = 10))
expect_equal(unpackMsgs(as.raw(c(0xa7, 0x6f, 0x6f))),
list(msgs = list(),
remaining = as.raw(c(0xa7, 0x6f, 0x6f)),
status = "buffer underflow",
bytes_read = 0))
expect_equal(unpackMsgs(as.raw(c(1:10)), 3),
list(msgs = as.list(1:3),
remaining = as.raw(4:10),
status = "ok",
bytes_read = 3))
})
test_that("packMsgs round trip", {
unpackMsgs(packMsgs(c(list(1:10), 1:10)))$msgs %is%
c(list(1:10), as.list(1:10))
})
test_that("Errors raised in parsing are caught", {
unpackMsgs(as.raw(c(0xc1)))$status %is% "malformed input"
})
test_that("read from a connection (blocking)", {
conn <- msgConnection(rawConnection(msgs, open="r"))
readMsg(conn) %is% "hello"
readMsg(conn) %is% "and"
expect_error(readMsg(conn))
close(conn)
conn <- msgConnection(rawConnection(msgs, open="r"))
readMsgs(conn) %is% list("hello", "and")
status(conn) %is% "buffer underflow"
close(conn)
})
test_that("write to a connection", {
conn <- rawConnection(raw(), open="w")
writeMsg(1:10, conn)
writeMsgs(1:10, conn)
expect_equal(
unpackMsgs(rawConnectionValue(conn))$msgs,
c(list(1:10), as.list(1:10)))
close(conn)
})
test_that("write to and read from connections", {
con <- msgConnection(rawBuffer(raw(0)))
writeMsgs(1:10, con)
readMsgs(con) %is% as.list(1:10)
close(con)
two_ends(function(A, B) {
writeMsgs(1:10, A)
flush(A)
readMsgs(B)
}) %is% as.list(1:10)
})
test_that("read non-blocking with complete message", {
test <- packMsgs(list("hello", "and"))
conn <- msgConnection(
rawConnection(packMsgs(list("hello", "and", "world")), open="r"),
read_size = length(test))
readMsgs(conn) %is% list("hello", "and", "world")
close(conn)
})
test_that("read non-blocking with incomplete message", {
two_ends(function(endA, endB) {
writeMsg(1, endA)
partial <- packMsgs(list("here is a partial message", 2))
writeBin(partial[1:10], endA)
flush(endA)
readMsgs(endB) %is% list(1)
status(endB) %is% "buffer underflow"
partial(endB) %is% partial[1:10]
writeBin(partial[11:length(partial)], endA)
flush(endA)
Sys.sleep(0.1)
readMsgs(endB) %is% list("here is a partial message", 2)
})
})
test_that("underflow at incomplete message (1-process)", {
buf <- rawBuffer()
partial <- packMsgs(list(1, "here is a partial message", 2))
writeRaw(partial[1:10], buf)
con <- msgConnection(buf)
readMsgs(con) %is% list(1)
writeRaw(partial[11:length(partial)], buf)
readMsgs(con) %is% list("here is a partial message", 2)
readMsgs(con) %is% list()
close(con)
})
test_that("read non-blocking with array breaking over chunks", {
##this should at least trigger the underflow handler, no?
partial <- packMsgs(list("hello", 1:2))
full <- packMsgs(list("hello", 1:10))
conn <- rawConnection(full, open="r")
conn <- msgConnection(conn, read_size = length(partial))
readMsgs(conn) %is% list("hello", 1:10)
close(conn)
})
test_that("rawBuffer", {
x <- rawBuffer(as.raw(1:5))
tryCatch({
readRaw(x, 3) %is% as.raw(1:3)
writeRaw(as.raw(6:10), x)
readRaw(x, 5) %is% as.raw(4:8)
readRaw(x, 10) %is% as.raw(9:10)
readRaw(x, 10) %is% raw(0)
writeRaw(as.raw(11:25), x)
readRaw(x, 100) %is% as.raw(11:25)
writeRaw(as.raw(26:27), x)
readRaw(x, 2) %is% as.raw(26:27)
readRaw(x, 0) %is% raw(0)
readRaw(x, 10) %is% raw(0)
}, finally = {
close(x)
})
})
test_that("read non-blocking and underflow handling when variously interrupted",
{
orig <- list("hello", c("hello", "world"), list("hello", "world", c(1, 2, 3)))
packed <- packMsgs(orig)
cut <- 28
for (cut in 1:(length(packed) - 1)) {
firstChunk <- packed[1:cut]
secondChunk <- packed[ (cut+1) : (length(packed)) ]
con <- msgConnection(rawBuffer(firstChunk))
expect_error(read1 <- readMsgs(con), NA, info = paste0("at cut ", cut))
writeRaw(secondChunk, con)
expect_error(read2 <- readMsgs(con), NA, info = paste0("at cut ", cut))
expect_equal(c(read1, read2), orig, info = paste0("at cut ", cut))
close(con)
}
})
test_that("assembling an array > read_size", {
mess <- (1:25) + rep(0, 1000)
c <- msgConnection(rawBuffer(raw(0)), read_size=32)
writeMsg(mess, c)
readMsgs(c) %is% list(mess)
close(c)
})
test_that("resume from interrupt when message >> read_size",
{
mess <- (1:25) + rep(0, 1000)
con <- msgConnection(rawBuffer(raw(0)), read_size=32)
bin <- packMsg(mess)
writeRaw(bin[1:100], con)
readMsgs(con) %is% list()
writeRaw(bin[101:200], con)
readMsgs(con) %is% list()
writeRaw(bin[201:length(bin)], con)
readMsgs(con) %is% list(mess)
close(con)
})
test_that("Assembling a string >> read size", {
## strings are individual messages that stretch over potentially many reads.
mess <- paste0(letters[sample(26, 1000, replace=TRUE)], collapse="")
con <- msgConnection(rawBuffer(raw(0)), read_size=32)
writeMsgs(list(mess), con)
readMsgs(con) %is% list(mess)
expect_equal(seek(con, rw="r"), 1003)
close(con)
})
test_that("seek method", {
con = msgConnection(rawConnection(msgs, open="r"))
readMsgs(con)
seek(con) %is% 10
close(con)
con = msgConnection(rawConnection(msgs, open="w"))
writeMsg("hello", con)
seek(con) %is% 6
close(con)
con = msgConnection(rawBuffer())
writeMsg("hello", con)
expect_error(seek(con))
seek(con, rw = "w") %is% 6
seek(con, rw = "r") %is% 0
readMsg(con)
seek(con, rw = "r") %is% 6
close(con)
})
`%@%` <- function(x, name) {
attr(x, as.character(substitute(name)))
}
test_that("large blob to force GC?", {
data <- sample(as.raw(0:255), 0x2000000, TRUE)
con <- msgConnection(rawConnection(raw(0), open="wb"))
packet <- packMsg(data)
writeMsg(data, con)
bytes <- rawConnectionValue(con)
close(con)
con2 <- msgConnection(rawConnection(bytes, open="rb"))
as.read <<- readMsg(con2)
close(con2)
expect_identical(as.read, data)
})
test_that("smallish blob under gctorture", {
data <- sample(as.raw(0:255), 0x30000, TRUE)
con <- msgConnection(rawConnection(raw(0), open="wb"))
packet <- packMsg(data)
writeMsg(data, con)
bytes <- rawConnectionValue(con)
close(con)
con2 <- msgConnection(rawConnection(bytes, open="rb"))
as.read <- NULL
local({
gctorture(TRUE)
on.exit(gctorture(FALSE))
# getting Error: RAW() can only be applied to a 'raw', not a 'NULL'
# I think perhaps because the buffer gets collected.
# I should hold an external pointer to the buffer, then?
# gctorture triggers an early error "2" so yeah.
# any way to debug GC, though?
as.read <<- readMsg(con2)
})
close(con2)
expect_identical(as.read, data)
})
# I'd like to have some tests with reading/writing to a separate
# process. I tried with parallel/mcfork, makeForkCluster, but forking
# seems to cause weirdness with nonblocking. (however this was before
# I sorted out partial reads.) Subprocess package seems to not have
# that prob. but the subprocess that is created doesn't have my
# functions loaded.
print.raw <- function(x, max.print = getOption("max.print"), ...) {
# Display raw vectors as hex dumps. Debugging use, not exported.
con <- pipe("hexdump -C", "wb")
if (length(x) > max.print) {
writeBin(x[1:max.print], con)
close(con)
cat(paste0("[ reached getOption(\"max.print\") -- omitted ",
length(x) - max.print,
" entries ] \n"))
} else {
writeBin(x, con[1:max.print])
close(con)
}
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.