Nothing
context("worker usage")
test_that("connect to invalid endpoint errors", {
w = methods::new(CMQWorker)
expect_error(w$connect("tcp://localhost:12345", 0L))
w$close()
})
test_that("recv without pending workers errors before timeout", {
m = methods::new(CMQMaster)
addr = m$listen("inproc://endpoint")
expect_error(m$recv(-1L))
m$close(500L)
})
test_that("recv timeout works", {
m = methods::new(CMQMaster)
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
expect_error(m$recv(0L))
m$close(500L)
})
test_that("worker evaluation", {
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 500L)
m$recv(500L)
m$send(expression(5 * 2))
status = w$process_one()
result = m$recv(500L)
expect_true(status)
expect_equal(result, 10)
w$close()
m$close(500L)
})
test_that("export variable to worker", {
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 500L)
m$add_env("x", 3)
m$recv(500L)
m$send(expression(5 + x))
status = w$process_one()
result = m$recv(500L)
expect_true(status)
expect_equal(result, 8)
m$add_env("x", 5)
m$send(expression(5 + x))
status = w$process_one()
result = m$recv(500L)
expect_true(status)
expect_equal(result, 10)
w$close()
m$close(500L)
})
test_that("load package on worker", {
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 500L)
m$add_pkg("parallel")
m$recv(500L)
m$send(expression(splitIndices(1, 1)[[1]]))
status = w$process_one()
result = m$recv(500L)
expect_true(status)
expect_equal(result, 1)
w$close()
m$close(500L)
})
test_that("errors are sent back to master", {
skip("this works interactively but evaluates the error on testthat")
m = methods::new(CMQMaster)
w = methods::new(CMQWorker, m$context())
addr = m$listen("inproc://endpoint")
m$add_pending_workers(1L)
w$connect(addr, 500L)
m$recv(500L)
m$send(expression(stop("errmsg")))
status = w$process_one()
result = m$recv(500L)
expect_true(status)
expect_true(inherits(result, c("condition", "worker_error")))
w$close()
m$close(500L)
})
test_that("worker R API", {
skip_on_os("windows")
skip_if_not(has_connectivity("127.0.0.1")) # -> this or inproc w/ passing context
m = methods::new(CMQMaster)
addr = m$listen("tcp://127.0.0.1:*")
m$add_pending_workers(1L)
# addr = m$listen("inproc://endpoint") # mailbox.cpp assertion error
p = parallel::mcparallel(worker(addr))
expect_null(m$recv(1000L))
m$send(expression(5 + 1))
res = m$recv(500L)
expect_equal(res[[1]], 6)
m$send_shutdown()
pc = parallel::mccollect(p, wait=TRUE, timeout=0.5)
expect_equal(pc[[1]], NULL)
m$close(500L)
})
test_that("communication with two workers", {
skip_on_os("windows")
skip_if_not(has_connectivity("127.0.0.1"))
m = methods::new(CMQMaster)
addr = m$listen("tcp://127.0.0.1:*")
m$add_pending_workers(2L)
w1 = parallel::mcparallel(worker(addr))
w2 = parallel::mcparallel(worker(addr))
expect_null(m$recv(1000L)) # worker 1 up
m$send(expression({ Sys.sleep(0.5); 5 + 2 }))
expect_null(m$recv(500L)) # worker 2 up
m$send(expression({ Sys.sleep(0.5); 3 + 1 }))
r1 = m$recv(1000L)
m$send_shutdown()
r2 = m$recv(1000L)
m$send_shutdown()
expect_equal(sort(c(r1, r2)), c(4,7))
coll1 = parallel::mccollect(w1, wait=TRUE, timeout=0.5)
expect_equal(names(coll1), as.character(w1$pid))
coll2 = parallel::mccollect(w2, wait=TRUE, timeout=0.5)
expect_equal(names(coll2), as.character(w2$pid))
m$close(500L)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.