Nothing
## NOTE: while tests should not be ordered, I've put these last (ish)
## because they take a bit longer than the others to run because of
## firing off the publisher instance. So I only want to run these if
## everything else seems OK.
context("subscription")
test_that("low level", {
ch <- "foo"
dat <- start_publisher(ch)
on.exit(file.remove(dat$filename))
## This is the sort of headache that the higher level interface is
## meant to remove:
vals <- list()
callback <- function(x) {
vals <<- c(vals, list(x))
if (as.numeric(x[[3]]) > 0.8) {
TRUE
} else {
FALSE
}
}
con <- test_hiredis_connection()
env <- environment()
subscribe <- con[[".__enclos_env__"]]$private$.subscribe
subscribe(ch, FALSE, callback, env)
expect_gt(length(vals), 0)
expect_true(all(vcapply(vals, "[[", 1L) == "message"))
expect_true(all(vcapply(vals, "[[", 2L) == ch))
## The payload:
v <- as.numeric(vcapply(vals, "[[", 3L))
expect_true(v[[length(v)]] > 0.8)
expect_true(all(v[-length(v)] < 0.8))
expect_equal(names(vals[[1]]), c("type", "channel", "value"))
expect_equal(length(unique(lapply(vals, names))), 1)
})
test_that("higher level", {
ch <- "foo"
dat <- start_publisher(ch)
on.exit(file.remove(dat$filename))
transform <- function(x) {
message(sprintf("[%s] %s: %s", Sys.time(), x$channel, x$value))
x
}
terminate <- function(x) {
x$value > 0.8
}
con <- test_hiredis_connection()
vals <- con$subscribe(ch, transform = transform, terminate = terminate)
expect_gt(length(vals), 0)
expect_true(all(vcapply(vals, "[[", 1L) == "message"))
expect_true(all(vcapply(vals, "[[", 2L) == ch))
## The payload:
v <- as.numeric(vcapply(vals, "[[", 3L))
expect_true(v[[length(v)]] > 0.8)
expect_true(all(v[-length(v)] < 0.8))
})
test_that("higher level: collect n", {
ch <- "foo"
dat <- start_publisher(ch)
on.exit(file.remove(dat$filename))
con <- test_hiredis_connection()
vals <- con$subscribe(ch, collect = TRUE, n = 5)
expect_equal(length(vals), 5)
## Collect nothing n times:
val <- con$subscribe(ch, collect = FALSE, n = 5)
expect_null(val)
})
test_that("pattern", {
ch <- c("foo1", "foo2")
dat <- lapply(ch, start_publisher)
filename <- vcapply(dat, "[[", "filename")
on.exit(file.remove(filename))
con <- test_hiredis_connection()
vals <- con$subscribe("foo*", pattern = TRUE, collect = TRUE, n = 20)
expect_equal(length(vals), 20)
expect_equal(length(vals[[1]]), 4)
expect_equal(names(vals[[1]]),
c("type", "pattern", "channel", "value"))
expect_true(all(vcapply(vals, "[[", "type") == "pmessage"))
expect_true(all(vcapply(vals, "[[", "pattern") == "foo*"))
chs <- vcapply(vals, "[[", "channel")
expect_true(all(chs %in% ch))
expect_true(all(ch %in% chs))
v <- as.numeric(vcapply(vals, "[[", "value"))
expect_true(all(v >= 0.0 & v <= 1.0))
})
## Flood and recover. This is a low-level regression test for a a
## nasty bug I managed to create earlier where we have queued messages
## *before* the UNSUBSCRIBE was sent but after we detatch from the
## SUBSCRIBE call.
##
## This might happen where we're just too slow to handle them
## efficiently) and issuing of an UNSUBSCRIBE command, here being done
## manually. Normally, this is not able to be done explicitly because
## the UNSUBSCRIBE is handled in an on.exit call, and because that
## leaves the n_discarded attribute non-fetchable.
test_that("flood and recover", {
terminate <- function(x) {
if (as.numeric(x$value) > 0.8) {
Sys.sleep(.5)
stop("Detected a disturbance in the force")
}
FALSE
}
display <- function(x) {
message(sprintf("[%s] %s: %s", Sys.time(), x$channel, x$value))
x
}
ch <- "foo"
dat <- start_publisher(ch)
on.exit(file.remove(dat$filename))
col <- make_collector()
fn <- make_callback(display, terminate, col$add, Inf)
con <- redis_connection()
ptr <- environment(con$command)$ptr
## Directly from the lower-level things
pattern <- FALSE
res1 <- try(.Call(redux:::Credux_redis_subscribe, ptr, ch, pattern,
fn, .GlobalEnv),
silent = TRUE)
res2 <- .Call(redux:::Credux_redis_unsubscribe, ptr, ch, pattern)
expect_is(res1, "try-error")
expect_equivalent(res2, list("unsubscribe", ch, 0L))
expect_gt(attr(res2, "n_discarded"), 0)
## This one is important:
expect_equal(con$command("PING"), redis_status("PONG"))
## Again with the higher level interface:
expect_error(res <- con$subscribe(ch, pattern, fn, .GlobalEnv),
"Detected a disturbance in the force")
expect_equal(con$command("PING"), redis_status("PONG"))
})
test_that("error cases", {
ch <- "foo"
dat <- start_publisher(ch)
on.exit(file.remove(dat$filename))
## This is the sort of headache that the higher level interface is
## meant to remove:
vals <- list()
callback <- function(x) {
vals <<- c(vals, list(x))
if (as.numeric(x[[3]]) > 0.8) {
TRUE
} else {
FALSE
}
}
con <- test_hiredis_connection()
env <- environment()
subscribe <- con[[".__enclos_env__"]]$private$.subscribe
expect_error(subscribe(NULL, FALSE, callback, env),
"channel must be character")
expect_error(subscribe(character(), FALSE, callback, env),
"At least one channel must be given")
expect_error(subscribe(ch, NULL, callback, env),
"pattern must be a scalar")
expect_error(subscribe(ch, FALSE, NULL, env),
"callback must be a function")
expect_error(subscribe(ch, FALSE, callback, NULL),
"envir must be a environment")
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.