tests/testthat/test-4-pool.r

context("pool")

skip_if_not(has_connectivity("127.0.0.1"))

test_that("starting and stopping multicore", {
    skip_on_os("windows")

    w = workers(1, qsys_id="multicore")
    expect_equal(w$workers_total, 1)
    expect_equal(w$workers_running, 0)
    expect_null(w$recv(5000L))
    expect_equal(w$workers_running, 1)
    w$send(3 + 4)
    expect_equal(w$workers_running, 1)
    expect_equal(w$recv(1000L), 7)
    expect_equal(w$workers_running, 1)
    w$send_shutdown()
    expect_equal(w$workers_running, 0)
    expect_equal(w$workers_total, 0)
    expect_error(w$send(1))
    expect_error(w$recv(1000L))
    w$cleanup()
    expect_equal(w$workers_running, 0)
    expect_equal(w$workers_total, 0)
    expect_error(w$send(2))
    expect_error(w$recv(1000L))
    expect_equal(w$workers_running, 0)
    expect_equal(w$workers_total, 0)
})

test_that("pending workers area cleaned up properly", {
    skip_on_os("windows")
    w = workers(1, qsys_id="multicore")
    w$cleanup()
    expect_equal(w$workers_running, 0)
    expect_equal(w$workers_total, 0)
})

test_that("calculations are really done on the worker", {
    skip_on_os("windows")
    x = 1
    y = 2
    w = workers(1, qsys_id="multicore")
    expect_null(w$recv(5000L))
    w$env(y = 3)
    w$send(x + y, x=4)
    expect_equal(w$recv(1000L), 7)
    w$send_shutdown()
    w$cleanup()
})

test_that("call references are matched properly", {
    skip_on_os("windows")
    skip_on_cran()

    w = workers(2, qsys_id="multicore")
    expect_null(w$recv(5000L))

    r1 = w$send({Sys.sleep(1); 1})
    expect_null(w$recv(1000L))
    r2 = w$send(2)
    expect_equal(w$recv(500L), 2)
    expect_equal(w$current()$call_ref, r2)
    w$send_shutdown()
    expect_equal(w$recv(2000L), 1)
    expect_equal(w$current()$call_ref, r1)
    w$cleanup()
})

test_that("multiprocess", {
    skip("https://github.com/r-lib/processx/issues/236")

    w = workers(1, qsys_id="multiprocess")
    expect_null(w$recv())
    w$send(3 + 5)
    expect_equal(w$recv(), 8)
    w$send_shutdown()
    w$cleanup()
})

test_that("work_chunk on multiprocess", {
    skip("https://github.com/r-lib/processx/issues/236")

    w = workers(1, qsys_id="multiprocess")
    expect_null(w$recv())
    w$send(clustermq:::work_chunk(chunk, `+`), chunk=list(a=1:3, b=4:6))
    res = w$recv()
    expect_equal(res$result, list(`1`=5, `2`=7, `3`=9))
    expect_equal(res$warnings, list())
    expect_equal(res$errors, list())
    w$send_shutdown()
    w$cleanup()
})

test_that("worker creation passes template filling values", {
    TMPL_FILLER <<- R6::R6Class("TMPL_FILLER",
        inherit = QSys,
        public = list(
            initialize = function(addr, n_jobs, master, ...) {
                super$initialize(addr=addr, master=master, template="LSF")
                self$filled = private$fill_options(...)
            },
            filled = list()
        )
    )
    old_defaults = getOption("clustermq.defaults")
    on.exit(options(clustermq.defaults = old_defaults))
    options(clustermq.defaults = list(cores="defaults_test", memory="invalid"))

    w = workers(1, qsys_id="tmpl_filler", template=list(memory="test"))
    rm(TMPL_FILLER, envir=.GlobalEnv)

    expect_equal(w$workers$filled$memory, "test")
    expect_equal(w$workers$filled$cores, "defaults_test")
})

Try the clustermq package in your browser

Any scripts or data that you put into this service are public.

clustermq documentation built on June 8, 2025, 11:24 a.m.