tests/testthat/test_CallbackAsyncFSelect.R

# stages in $optimize() --------------------------------------------------------

test_that("on_optimization_begin works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_optimization_begin = function(callback, context) {
      context$instance$terminator$param_set$values$n_evals = 20
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("regr.mse"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(instance$terminator$param_set$values$n_evals, 20)
  expect_rush_reset(instance$rush, type = "kill")
})

test_that("on_optimization_end works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_optimization_end = function(callback, context) {
      context$instance$terminator$param_set$values$n_evals = 20
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("regr.mse"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(instance$terminator$param_set$values$n_evals, 20)
  expect_rush_reset(instance$rush, type = "kill")
})

# stager in worker_loop() ------------------------------------------------------

test_that("on_worker_begin works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_worker_begin = function(callback, context) {
      instance = context$instance
      mlr3misc::get_private(instance)$.eval_point(list(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE))
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(1)
  rush::rush_plan(n_workers = 1, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("regr.mse"),
    term_evals = 1,
    callbacks = callback)

  expect_equal(c(TRUE, FALSE, TRUE, FALSE), as.logical(instance$archive$data[, c("x1", "x2", "x3", "x4"), with = FALSE]))
  expect_rush_reset(instance$rush, type = "kill")
})

test_that("on_worker_end works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_worker_end = function(callback, context) {
      instance = context$instance
      mlr3misc::get_private(instance)$.eval_point(list(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE))
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(1)
  rush::rush_plan(n_workers = 1, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("regr.mse"),
    term_evals = 2,
    callbacks = callback)

  expect_equal(c(TRUE, FALSE, TRUE, FALSE), as.logical(instance$archive$data[nrow(instance$archive$data), c("x1", "x2", "x3", "x4"), with = FALSE]))
  expect_rush_reset(instance$rush, type = "kill")
})

# stages in $.eval_point() -----------------------------------------------------

test_that("on_optimizer_before_eval and on_optimizer_after_eval works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_optimizer_before_eval = function(callback, context) {
      context$xs = list(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE)
    },

    on_optimizer_after_eval = function(callback, context) {
      context$ys = list(regr.mse = 0)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(1)
  rush::rush_plan(n_workers = 1, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("regr.mse"),
    term_evals = 1,
    callbacks = callback)

  expect_equal(c(TRUE, FALSE, TRUE, FALSE), as.logical(instance$archive$data[, c("x1", "x2", "x3", "x4"), with = FALSE]))
  expect_equal(0, instance$archive$data$regr.mse)
  expect_rush_reset(instance$rush, type = "kill")
})

# stages in $eval() ------------------------------------------------------------

test_that("on_eval_after_xs works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()
  options(bbotk_local = TRUE)

  callback = callback_async_fselect(id = "test",
    on_eval_after_xs = function(callback, context) {
      context$xs_objective = list(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(1)
  rush::rush_plan(n_workers = 1, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("regr.mse"),
    term_evals = 1,
    callbacks = callback)

  expect_equal(instance$archive$benchmark_result$resample_result(1)$learners[[1]]$state$feature_names, c("x1", "x3"))
})

test_that("on_eval_after_resample works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_eval_after_resample = function(callback, context) {
      callback$state$extra_performance = context$resample_result$aggregate(msr("classif.acc"))
    },

    on_eval_before_archive = function(callback, context) {
      context$aggregated_performance$classif.acc = callback$state$extra_performance
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("classif.ce"),
    term_evals = 2,
    callbacks = callback)

  expect_names(names(instance$archive$data), must.include = c("classif.ce", "classif.acc"))
})

# stages in $assign_result() in FSelectInstanceAsyncSingleCrit ------------------

test_that("on_fselect_result_begin in FSelectInstanceSingleCrit works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_fselect_result_begin = function(callback, context) {
      context$result_xdt = data.table(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE)
      context$result_y = c(regr.mse = 0.7)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("regr.mse"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(instance$result_x_search_space, data.table(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE))
  expect_equal(instance$result_y, c(regr.mse = 0.7))
})

test_that("on_result_end in FSelectInstanceSingleCrit works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  callback = callback_async_fselect(id = "test",
    on_result_end = function(callback, context) {
      context$result$classif.ce = 0.7
    }
  )

  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("classif.ce"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(instance$result$classif.ce, 0.7)
})

test_that("on_result in FSelectInstanceSingleCrit works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  expect_warning({callback = callback_async_fselect(id = "test",
    on_result = function(callback, context) {
      context$result$classif.ce = 0.7
    }
  )}, "deprecated")

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("classif.ce"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(instance$result$classif.ce, 0.7)
})

# stages in $assign_result() in FSelectInstanceBatchMultiCrit -------------------

test_that("on_fselect_result_begin in FSelectInstanceBatchMultiCrit works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_fselect_result_begin = function(callback, context) {
      context$result_xdt = data.table(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE)
      context$result_ydt = data.table(regr.mse = 0.7, regr.rmse = 0.8)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = TEST_MAKE_TSK(),
    learner = lrn("regr.rpart"),
    resampling = rsmp("holdout"),
    measures = msrs(c("regr.mse", "regr.rmse")),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(instance$result_x_search_space, data.table(x1 = TRUE, x2 = FALSE, x3 = TRUE, x4 = FALSE))
  expect_equal(instance$result_y, data.table(regr.mse = 0.7, regr.rmse = 0.8))
})

test_that("on_result_end in FSelectInstanceBatchMultiCrit works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect(id = "test",
    on_result_end = function(callback, context) {
      set(context$result, j = "classif.ce", value = 0.7)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msrs(c("classif.ce", "classif.acc")),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(unique(instance$result$classif.ce), 0.7)
})

test_that("on_result in FSelectInstanceBatchMultiCrit works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  expect_warning({callback = callback_async_fselect(id = "test",
    on_result = function(callback, context) {
      set(context$result, j = "classif.ce", value = 0.7)
    }
  )}, "deprecated")

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msrs(c("classif.ce", "classif.acc")),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")
  expect_equal(unique(instance$result$classif.ce), 0.7)
})

# stages in mlr3 workhorse -----------------------------------------------------

test_that("on_resample_begin works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect("test",
    on_resample_begin = function(callback, context) {
      # expect_* does not work
      assert_task(context$task)
      assert_learner(context$learner)
      assert_resampling(context$resampling)
      checkmate::assert_number(context$iteration)
      checkmate::assert_null(context$pdatas)
      context$data_extra = list(success = TRUE)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("classif.ce"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")

  walk(as.data.table(instance$archive$benchmark_result)$data_extra, function(data_extra) {
    expect_true(data_extra$success)
  })
})

test_that("on_resample_before_train works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect("test",
    on_resample_before_train = function(callback, context) {
      assert_task(context$task)
      assert_learner(context$learner)
      assert_resampling(context$resampling)
      checkmate::assert_number(context$iteration)
      checkmate::assert_null(context$pdatas)
      context$data_extra = list(success = TRUE)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("classif.ce"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")

  walk(as.data.table(instance$archive$benchmark_result)$data_extra, function(data_extra) {
    expect_true(data_extra$success)
  })
})

test_that("on_resample_before_predict works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect("test",
    on_resample_before_predict = function(callback, context) {
      assert_task(context$task)
      assert_learner(context$learner)
      assert_resampling(context$resampling)
      checkmate::assert_null(context$pdatas)
      context$data_extra = list(success = TRUE)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("classif.ce"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")

  walk(as.data.table(instance$archive$benchmark_result)$data_extra, function(data_extra) {
    expect_true(data_extra$success)
  })
})

test_that("on_resample_end works", {
  skip_on_cran()
  skip_if_not_installed("rush")
  flush_redis()

  callback = callback_async_fselect("test",
    on_resample_end = function(callback, context) {
      # expect_* does not work
      assert_task(context$task)
      assert_learner(context$learner)
      assert_resampling(context$resampling)
      checkmate::assert_number(context$iteration)
      checkmate::assert_class(context$pdatas$test, "PredictionData")
      context$learner$state = mlr3misc::insert_named(context$learner$state, list(state_success = TRUE))
      context$data_extra = list(success = TRUE)
    }
  )

  on.exit(mirai::daemons(0))
  mirai::daemons(2)
  rush::rush_plan(n_workers = 2, worker_type = "remote")
  instance = fselect(
    fselector = fs("async_random_search"),
    task = tsk("pima"),
    learner = lrn("classif.rpart"),
    resampling = rsmp("holdout"),
    measures = msr("classif.ce"),
    term_evals = 2,
    callbacks = callback)

  expect_class(instance$objective$context, "ContextAsyncFSelect")

  walk(as.data.table(instance$archive$benchmark_result)$data_extra, function(data_extra) {
    expect_true(data_extra$success)
  })

  walk(instance$archive$benchmark_result$score()$learner, function(learner, ...) {
    expect_true(learner$state$state_success)
  })
})
mlr-org/mlr3fselect documentation built on July 5, 2025, 3:22 a.m.