tests/testthat/test-parallel.R

suppressPackageStartupMessages({
    library(dplyr)
})


test_that("Parallisation works as expected", {

    skip_if_not(is_full_test())

    bign <- 150
    sigma <- as_vcov(
        c(2, 1, 0.7, 3, 4),
        c(
            0.3,
            0.4, 0.2,
            0.5, 0.3, 0.2,
            0.1, 0.2, 0.3, 0.5
        )
    )

    dat <- get_sim_data(bign, sigma, trt = 8) %>%
        mutate(is_miss = rbinom(n(), 1, 0.5)) %>%
        mutate(outcome = if_else(is_miss == 1 & visit == "visit_3", NA_real_, outcome)) %>%
        select(-is_miss)

    dat_ice <- dat %>%
        group_by(id) %>%
        arrange(id, visit) %>%
        filter(is.na(outcome)) %>%
        slice(1) %>%
        ungroup() %>%
        select(id, visit) %>%
        mutate(strategy = "JR")

    vars <- set_vars(
        outcome = "outcome",
        group = "group",
        strategy = "strategy",
        subjid = "id",
        visit = "visit",
        covariates = c("age", "sex", "visit * group")
    )

    ###########################
    #
    #  Potential Unit tests
    #

    test_parallel <- function(method, ncores = 2) {
        set.seed(101)
        time_1_core <- time_it({
            results_1 <- draws(
                data = dat,
                data_ice = dat_ice,
                vars = vars,
                method = method,
                ncores = 1,
                quiet = TRUE
            )
        })

        set.seed(101)
        time_2_core <- time_it({
            results_2 <- draws(
                data = dat,
                data_ice = dat_ice,
                vars = vars,
                method = method,
                ncores = ncores,
                quiet = TRUE
            )
        })

        results_1$formula <- x ~ a + b + c + d
        results_2$formula <- x ~ a + b + c + d

        # Test is currently disabled as for some reason there is no performance gains
        # when run on github actions, this test appears to run fine everywhere
        # else though...
        # expect_true(time_1_core > (time_2_core * 1.3))
        expect_equal(results_1, results_2)

        res <- list(
            results_1 = results_1,
            results_2 = results_2,
            time_1 = time_1_core,
            time_2 = time_2_core
        )
        return(res)
    }

    x1 <- test_parallel(method_approxbayes(n_samples = 120))
    x2 <- test_parallel(method_condmean(n_samples = 120))
    x3 <- test_parallel(method_condmean(type = "jackknife"))
    x4 <- test_parallel(method_bmlmi(B = 70, D = 2))

})


test_that("Basic parallisation works as expected", {
    set.seed(3812)
    bign <- 100
    sigma <- as_vcov(
        c(2, 1, 0.7),
        c(
            0.3,
            0.4, 0.2
        )
    )
    dat <- get_sim_data(bign, sigma, trt = 8) %>%
        mutate(is_miss = rbinom(n(), 1, 0.5)) %>%
        mutate(outcome = if_else(is_miss == 1 & visit == "visit_3", NA_real_, outcome)) %>%
        select(-is_miss)

    dat_ice <- dat %>%
        group_by(id) %>%
        arrange(id, visit) %>%
        filter(is.na(outcome)) %>%
        slice(1) %>%
        ungroup() %>%
        select(id, visit) %>%
        mutate(strategy = "JR")

    vars <- set_vars(
        outcome = "outcome",
        group = "group",
        strategy = "strategy",
        subjid = "id",
        visit = "visit",
        covariates = c("age", "sex", "visit * group")
    )

    set.seed(3013)
    x1 <- draws(
        quiet = TRUE,
        dat,
        dat_ice,
        vars,
        method = method_approxbayes(n_samples = 10)
    )

    set.seed(3013)
    x2 <- draws(
        quiet = TRUE,
        dat,
        dat_ice,
        vars,
        ncores = 2,
        method = method_approxbayes(n_samples = 10)
    )

    # Tolerance is set here to address mmrm issue where the first optimiser fails when run
    # in parallel and moves onto the second optimiser, where as in sequence the first
    # optimiser works fine. i.e. results are slightly different due to different optimisers
    # being used
    # https://github.com/openpharma/mmrm/issues/151
    expect_equal(x1, x2, tolerance = 0.0001)

})


###########################
#
#  Manual time testing
#
# method <- method_approxbayes(n_samples = 20)
# method <- method_condmean(n_samples = 80)
# method <- method_condmean(type = "jackknife")
# time_it({
#     results_2 <- draws(
#         data = dat,
#         data_ice = dat_ice,
#         vars = vars,
#         method = method,
#         ncores = 1
#     )
# })
# time_it({
#     results_2 <- draws(
#         data = dat,
#         data_ice = dat_ice,
#         vars = vars,
#         method = method,
#         ncores = 2
#     )
# })




test_that("Creation and management of user defined clusters works as expected", {
    # Setup a function to be run on the parallel cluster that requires
    # global objects (namely the `inner_fun` and environment `e`) as well
    # as a handful of packages to be loaded
    e <- new.env()
    e$x <- 20
    inner_fun <- function(x) {
        local_env <- e
        temp1 <- e$x + 0
        temp2 <- rnorm(2)
        temp3 <- dplyr::as_tibble(dplyr::starwars) # Explicit namespace
        temp4 <- day(x) # lubridate::day()
        e$x
    }
    outer_fun <- function(x) {
        temp1 <- inner_fun(2)
        temp2 <- anova.lme %>% invisible() # nlme::anova.lme()
        temp3 <- iris
        temp1 + x + 10
    }

    # Check that function can be run (e.g. all elements are correctly exported)
    set.seed(1223)
    cl1 <- make_rbmi_cluster(2, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme", "dplyr"))
    res_1_a <- parallel::clusterCall(cl1, rnorm, 200)
    res_1_b <- parallel::clusterApplyLB(cl1, c(4, 5), outer_fun)
    expect_equal(res_1_b, list(34, 35))

    # Test that re-using an existing cluster is quick to load
    time <- time_it({
        set.seed(1223)
        cl2 <- make_rbmi_cluster(cl1, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme"))
    })
    expect_true(as.numeric(time) <= 2)

    # Should produce identical results as before
    res_2_a <- parallel::clusterCall(cl2, rnorm, 200)
    res_2_b <- parallel::clusterApplyLB(cl2, c(4, 5), outer_fun)
    expect_equal(res_2_a, res_1_a)
    expect_equal(res_2_b, res_1_b)

    # Both clusters should be closed as they are points to the same thing
    parallel::stopCluster(cl2)
    expect_true(is_cluster_closed(cl1))
    expect_true(is_cluster_closed(cl2))


    # Check that seed ensures reproducibility
    set.seed(1223)
    cl <- make_rbmi_cluster(2, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme"))
    res_3_a <- parallel::clusterCall(cl1, rnorm, 200)
    expect_equal(res_3_a, res_1_a)
    parallel::stopCluster(cl)
})
insightsengineering/rbmi documentation built on Feb. 28, 2025, 3:34 a.m.