tests/testthat/test-2-worker.r

context("worker usage")

test_that("connect to invalid endpoint errors", {
    w = methods::new(CMQWorker)
    expect_error(w$connect("tcp://localhost:12345", 0L))
    w$close()
})

test_that("recv without pending workers errors before timeout", {
    m = methods::new(CMQMaster)
    addr = m$listen("inproc://endpoint")
    expect_error(m$recv(-1L))
    m$close(500L)
})

test_that("recv timeout works", {
    m = methods::new(CMQMaster)
    addr = m$listen("inproc://endpoint")
    m$add_pending_workers(1L)
    expect_error(m$recv(0L))
    m$close(500L)
})

test_that("worker evaluation", {
    m = methods::new(CMQMaster)
    w = methods::new(CMQWorker, m$context())
    addr = m$listen("inproc://endpoint")
    m$add_pending_workers(1L)
    w$connect(addr, 500L)

    m$recv(500L)
    m$send(expression(5 * 2))
    status = w$process_one()
    result = m$recv(500L)

    expect_true(status)
    expect_equal(result, 10)

    w$close()
    m$close(500L)
})

test_that("export variable to worker", {
    m = methods::new(CMQMaster)
    w = methods::new(CMQWorker, m$context())
    addr = m$listen("inproc://endpoint")
    m$add_pending_workers(1L)
    w$connect(addr, 500L)

    m$add_env("x", 3)
    m$recv(500L)
    m$send(expression(5 + x))
    status = w$process_one()
    result = m$recv(500L)
    expect_true(status)
    expect_equal(result, 8)

    m$add_env("x", 5)
    m$send(expression(5 + x))
    status = w$process_one()
    result = m$recv(500L)
    expect_true(status)
    expect_equal(result, 10)

    w$close()
    m$close(500L)
})

test_that("load package on worker", {
    m = methods::new(CMQMaster)
    w = methods::new(CMQWorker, m$context())
    addr = m$listen("inproc://endpoint")
    m$add_pending_workers(1L)
    w$connect(addr, 500L)

    m$add_pkg("parallel")

    m$recv(500L)
    m$send(expression(splitIndices(1, 1)[[1]]))
    status = w$process_one()
    result = m$recv(500L)

    expect_true(status)
    expect_equal(result, 1)

    w$close()
    m$close(500L)
})

test_that("errors are sent back to master", {
    skip("this works interactively but evaluates the error on testthat")

    m = methods::new(CMQMaster)
    w = methods::new(CMQWorker, m$context())
    addr = m$listen("inproc://endpoint")
    m$add_pending_workers(1L)
    w$connect(addr, 500L)

    m$recv(500L)
    m$send(expression(stop("errmsg")))
    status = w$process_one()
    result = m$recv(500L)

    expect_true(status)
    expect_true(inherits(result, c("condition", "worker_error")))

    w$close()
    m$close(500L)
})

test_that("worker R API", {
    skip_on_os("windows")
    skip_if_not(has_connectivity("127.0.0.1")) # -> this or inproc w/ passing context

    m = methods::new(CMQMaster)
    addr = m$listen("tcp://127.0.0.1:*")
    m$add_pending_workers(1L)
#    addr = m$listen("inproc://endpoint") # mailbox.cpp assertion error

    p = parallel::mcparallel(worker(addr))
    expect_null(m$recv(1000L))
    m$send(expression(5 + 1))
    res = m$recv(500L)
    expect_equal(res[[1]], 6)

    m$send_shutdown()
    pc = parallel::mccollect(p, wait=TRUE, timeout=0.5)
    expect_equal(pc[[1]], NULL)
    m$close(500L)
})

test_that("communication with two workers", {
    skip_on_os("windows")
    skip_if_not(has_connectivity("127.0.0.1"))

    m = methods::new(CMQMaster)
    addr = m$listen("tcp://127.0.0.1:*")
    m$add_pending_workers(2L)
    w1 = parallel::mcparallel(worker(addr))
    w2 = parallel::mcparallel(worker(addr))

    expect_null(m$recv(1000L)) # worker 1 up
    m$send(expression({ Sys.sleep(0.5); 5 + 2 }))
    expect_null(m$recv(500L)) # worker 2 up
    m$send(expression({ Sys.sleep(0.5); 3 + 1 }))
    r1 = m$recv(1000L)
    m$send_shutdown()
    r2 = m$recv(1000L)
    m$send_shutdown()
    expect_equal(sort(c(r1, r2)), c(4,7))

    coll1 = parallel::mccollect(w1, wait=TRUE, timeout=0.5)
    expect_equal(names(coll1), as.character(w1$pid))
    coll2 = parallel::mccollect(w2, wait=TRUE, timeout=0.5)
    expect_equal(names(coll2), as.character(w2$pid))

    m$close(500L)
})

Try the clustermq package in your browser

Any scripts or data that you put into this service are public.

clustermq documentation built on Nov. 21, 2023, 5:06 p.m.