tests/testthat/test-tracking-runs.R

context("Tracking")

teardown({
  mlflow_clear_test_dir("mlruns")
  options(MLflowObservers = NULL)
})

test_that("mlflow_start_run()/mlflow_get_run() work properly", {
  mlflow_clear_test_dir("mlruns")
  client <- mlflow_client()
  run <- mlflow_start_run(
    client = client,
    experiment_id = "0",
    tags = list(foo = "bar", foz = "baz", mlflow.user = "user1", mlflow.runName = "my_run")
  )

  run <- mlflow_get_run(client = client, run$run_uuid)

  expect_identical(run$user_id, "user1")

  expect_true(
    all(purrr::transpose(run$tags[[1]]) %in%
      list(
        list(key = "foz", value = "baz"),
        list(key = "foo", value = "bar"),
        list(key = "mlflow.user", value = "user1"),
        list(key = "mlflow.runName", value = run$run_name)
      )
    )
  )
})

test_that("a run can be started properly if MLFLOW_RUN_ID is set", {
  mlflow_clear_test_dir("mlruns")
  # Typical use case: Invoke an R script that interacts with the MLflow API from
  # outside of R, e.g. MLproject, Python, CLI
  start_get_id_stop <- function() {
    tryCatch(mlflow_id(mlflow_start_run()), finally = {
      mlflow_end_run()
    })
  }
  id <- start_get_id_stop()
  withr::with_envvar(list(MLFLOW_RUN_ID = id), {
    expect_equal(start_get_id_stop(), id)
  })
})

test_that("mlflow_end_run() works properly", {
  mlflow_clear_test_dir("mlruns")
  mlflow_start_run()
  killed_time <- mlflow:::current_time()
  client <- mlflow_client()
  run <- mlflow_end_run(
    client = client, run_id = mlflow:::mlflow_get_active_run_id(),
    status = "KILLED", end_time = killed_time
  )
  expect_identical(run$status, "KILLED")
  expect_identical(run$end_time, as.POSIXct(as.double(c(killed_time)) / 1000, origin = "1970-01-01"))

  # Verify that only expected run field names are present and that all run info fields are set
  # (not NA).
  run_info_names <- c("run_uuid", "experiment_id", "user_id", "run_name", "status", "start_time",
  "artifact_uri", "lifecycle_stage", "run_id", "end_time")
  run_data_names <- c("metrics", "params", "tags")
  expect_setequal(c(run_info_names, run_data_names), names(run))
  expect_true(!anyNA(run[run_info_names]))
})

test_that("mlflow_start_run()/mlflow_end_run() works properly with nested runs", {
  mlflow_clear_test_dir("mlruns")
  runs <- list(
    mlflow_start_run(),
    mlflow_start_run(nested = TRUE),
    mlflow_start_run(nested = TRUE)
  )
  client <- mlflow_client()
  for (i in seq(3, 1, -1)) {
    expect_equal(mlflow:::mlflow_get_active_run_id(), runs[[i]]$run_uuid)
    run <- mlflow_end_run(client = client, run_id = runs[[i]]$run_uuid)
    expect_identical(run$run_uuid, runs[[i]]$run_uuid)
    if (i > 1) {
      tags <- run$tags[[1]]
      expect_equal(
        tags[tags$key == "mlflow.parentRunId", ]$value,
        runs[[i - 1]]$run_uuid
      )
    }
  }
  expect_null(mlflow:::mlflow_get_active_run_id())
})

test_that("mlflow_restore_run() work properly", {
  mlflow_clear_test_dir("mlruns")
  client <- mlflow_client()
  run1 <- mlflow_start_run(
    client = client,
    experiment_id = "0",
    tags = list(foo = "bar", foz = "baz", mlflow.user = "user1", mlflow.runName = "my_run")
  )

  run2 <- mlflow_get_run(client = client, run1$run_uuid)
  mlflow_delete_run(client = client, run_id = run1$run_uuid)
  run3 <- mlflow_restore_run(client = client, run_id = run1$run_uuid)

  for (run in list(run1, run2, run3)) {
    expect_identical(run$user_id, "user1")

    expect_true(
      all(purrr::transpose(run$tags[[1]]) %in%
        list(
          list(key = "foz", value = "baz"),
          list(key = "foo", value = "bar"),
          list(key = "mlflow.user", value = "user1"),
          list(key = "mlflow.runName", value = run$run_name)
        )
      )
    )
  }
})

test_that("mlflow_set_tag() should return NULL invisibly", {
  mlflow_clear_test_dir("mlruns")
  value <- mlflow_set_tag("foo", "bar")
  expect_null(value)
})

test_that("logging functionality", {
  mlflow_clear_test_dir("mlruns")

  start_time_lower_bound <- Sys.time()
  mlflow_start_run()

  mlflow_log_metric("mse", 24)
  mlflow_log_metric("mse", 25)
  mlflow_log_metric("nan", NaN)
  mlflow_log_metric("inf", Inf)
  mlflow_log_metric("-inf", -Inf)

  mlflow_set_tag("tag_key", "tag_value")
  mlflow_log_param("param_key", "param_value")
  mlflow_log_param("na", NA)
  mlflow_log_param("nan", NaN)

  run <- mlflow_get_run()
  metrics <- run$metrics[[1]]
  nan_value <- metrics$value[metrics$key == "nan"]
  expect_true(is.nan(nan_value))
  pos_inf_value <- metrics$value[metrics$key == "inf"]
  expect_true(pos_inf_value >= 1.7976931348623157e308)
  neg_inf_value <- metrics$value[metrics$key == "-inf"]
  expect_true(neg_inf_value <= -1.7976931348623157e308)
  run_id <- run$run_uuid
  tags <- run$tags[[1]]
  expect_identical("tag_value", tags$value[tags$key == "tag_key"])
  expect_setequal(run$params[[1]]$key, c("na", "nan", "param_key"))
  expect_setequal(run$params[[1]]$value, c("NA", "NaN", "param_value"))

  mlflow_delete_tag("tag_key", run_id)
  run <- mlflow_get_run()
  tags <- run$tags[[1]]
  expect_false("tag_key" %in% tags$key)

  mlflow_end_run()
  end_time_upper_bound <- Sys.time()
  ended_run <- mlflow_get_run(run_id = run_id)
  run_start_time <- ended_run$start_time
  run_end_time <- ended_run$end_time
  expect_true(difftime(run_start_time, start_time_lower_bound) >= 0)
  expect_true(difftime(run_end_time, end_time_upper_bound) <= 0)
  metric_history <- mlflow_get_metric_history("mse", ended_run$run_uuid)
  expect_identical(metric_history$key, c("mse", "mse"))
  expect_identical(metric_history$value, c(24, 25))
  expect_identical(metric_history$step, c(0, 0))
  expect_true(all(difftime(metric_history$timestamp, run_start_time) >= 0))
  expect_true(all(difftime(metric_history$timestamp, run_end_time) <= 0))

  expect_error(
    mlflow_get_run(),
    "`run_id` must be specified when there is no active run\\."
  )
})

test_that("mlflow_log_metric() rounds step and timestamp inputs", {
  mlflow_clear_test_dir("mlruns")
  mlflow_start_run()

  step_inputs <- runif(10, min = -20, max = 100)
  for (step_input in step_inputs) {
    mlflow_log_metric(key = "step_metric",
                      value = runif(1),
                      step = step_input,
                      timestamp = 100)
  }
  expect_setequal(
    mlflow_get_metric_history("step_metric")$step,
    round(step_inputs)
  )

  timestamp_inputs <- runif(10, 1000, 100000)
  for (timestamp_input in timestamp_inputs) {
    mlflow_log_metric(key = "timestamp_metric",
                      value = runif(1),
                      step = 0,
                      timestamp = timestamp_input)
  }
  expect_setequal(
    mlflow_get_metric_history("timestamp_metric")$timestamp,
    purrr::map_vec(round(timestamp_inputs), mlflow:::milliseconds_to_date)
  )
})

test_that("mlflow_log_metric() with step produces expected metric data", {
  mlflow_clear_test_dir("mlruns")
  mlflow_start_run()

  metric_key_1 <- "test_metric_1"
  mlflow_log_metric(key = metric_key_1,
                    value = 1.2,
                    step = -2,
                    timestamp = 300)
  mlflow_log_metric(key = metric_key_1,
                    value = 137.4,
                    timestamp = 100)
  mlflow_log_metric(key = metric_key_1,
                    value = -20,
                    timestamp = 200)

  metric_key_2 <- "test_metric_2"
  mlflow_log_metric(key = metric_key_2,
                    value = 14,
                    step = 120)
  mlflow_log_metric(key = metric_key_2,
                    value = 56,
                    step = 137)
  mlflow_log_metric(key = metric_key_2,
                    value = 20,
                    step = -5)

  run <- mlflow_get_run()
  metrics <- run$metrics[[1]]
  expect_setequal(
    metrics$key,
    c("test_metric_1", "test_metric_2")
  )
  expect_setequal(
    metrics$value,
    c(-20, 56)
  )
  expect_setequal(
    metrics$step,
    c(0, 137)
  )

  metric_history_1 <- mlflow_get_metric_history("test_metric_1")
  expect_setequal(
    metric_history_1$value,
    c(1.2, 137.4, -20)
  )
  expect_setequal(
    metric_history_1$timestamp,
    purrr::map_vec(c(300, 100, 200), mlflow:::milliseconds_to_date)
  )
  expect_setequal(
    metric_history_1$step,
    c(-2, 0, 0)
  )

  metric_history_2 <- mlflow_get_metric_history("test_metric_2")
  expect_setequal(
    metric_history_2$value,
    c(14, 56, 20)
  )
  expect_setequal(
    metric_history_2$step,
    c(120, 137, -5)
  )
})

test_that("mlflow_end_run() behavior", {
  mlflow_clear_test_dir("mlruns")
  expect_error(
    mlflow_end_run(),
    "There is no active run to end\\."
  )

  run <- mlflow_start_run()
  run_id <- mlflow_id(run)
  mlflow_end_run(run_id = run_id)
  expect_error(
    mlflow_get_run(),
    "`run_id` must be specified when there is no active run\\."
  )

  run <- mlflow_start_run()
  run_id <- mlflow_id(run)
  client <- mlflow_client()
  expect_error(
    mlflow_end_run(client = client),
    "`run_id` must be specified when `client` is specified\\."
  )
  mlflow_end_run(client = client, run_id = run_id)
  expect_error(
    mlflow_get_run(),
    "`run_id` must be specified when there is no active run\\."
  )

  mlflow_start_run()
  run <- mlflow_end_run(status = "KILLED")
  expect_identical(
    run$status,
    "KILLED"
  )
})

test_that("with() errors when not passed active run", {
  mlflow_clear_test_dir("mlruns")
  client <- mlflow_client()
  mlflow_set_experiment("exp1")
  run <- mlflow_start_run(client = client)
  expect_error(
    with(run, {
      mlflow_log_metric("mse", 25)
    }),
    # TODO: somehow this isn't matching "`with()` should only be used with `mlflow_start_run()`\\."
    NULL
  )
})

test_that("mlflow_search_runs() works", {
  mlflow_clear_test_dir("mlruns")
  with(mlflow_start_run(), {
    mlflow_log_metric("test", 10)
  })
  with(mlflow_start_run(), {
    mlflow_log_metric("test", 20)
  })
  expect_equal(nrow(mlflow_search_runs(experiment_ids = list("0"))), 2)
  expect_equal(nrow(mlflow_search_runs(experiment_ids = "0")), 2)
  expect_equal(nrow(mlflow_search_runs(filter = "metrics.test > 10", experiment_ids = list("0"))), 1)
  expect_equal(nrow(mlflow_search_runs(filter = "metrics.test < 20", experiment_ids = list("0"))), 1)
  expect_equal(nrow(mlflow_search_runs(filter = "metrics.test > 20", experiment_ids = list("0"))), 0)

  search <- mlflow_search_runs(order_by = "metrics.test", experiment_ids = list("0"))
  expect_equal(search$metrics[[1]]$value[1], 10)
  expect_equal(search$metrics[[2]]$value[1], 20)

  search <- mlflow_search_runs(order_by = list("metrics.test DESC"), experiment_ids = list("0"))
  expect_equal(search$metrics[[1]]$value[1], 20)
  expect_equal(search$metrics[[2]]$value[1], 10)

  mlflow_set_experiment("new-experiment")
  expect_equal(nrow(mlflow_search_runs()), 0)
  with(mlflow_start_run(), {
    mlflow_log_metric("new_experiment_metric", 30)
  })
  expect_equal(nrow(mlflow_search_runs(filter = "metrics.new_experiment_metric = 30")), 1)
})

test_that("mlflow_log_artifact and mlflow_list_artifacts work", {
  with(mlflow_start_run(), {
    # List artifacts for run without artifacts, result should be empty
    empty_artifact_list <- mlflow_list_artifacts()
    # Create some dummy artifact files/directories
    expect_equal(nrow(empty_artifact_list), 0)
    source_dir <- file.path(tempdir(), "temp-directory")
    dir.create(source_dir)
    ## file 1
    file_path1 <- file.path(source_dir, "a-my-file")
    contents <- "File contents\n"
    cat(contents, file = file_path1, sep = "")
    ## file 2
    file_path2 <- file.path(source_dir, "a-my-file-2")
    contents <- "File contents\n"
    cat(contents, file = file_path2, sep = "")

    # Log file, file with path, directory with path argument
    mlflow_log_artifact(file_path1)
    mlflow_log_artifact(file_path2)
    mlflow_log_artifact(file_path1, "directory_for_file")
    mlflow_log_artifact(source_dir, "artifact_subdirectory")
    # Verify logged files
    artifact_list0 <- mlflow_list_artifacts()
    expect_equal(nrow(artifact_list0), 4)
    logged_file0 <- artifact_list0[artifact_list0$path == "a-my-file", ]
    expect_equal(nrow(logged_file0), 1)
    expect_equal(logged_file0$is_dir, FALSE)
    logged_file1 <- artifact_list0[artifact_list0$path == "directory_for_file", ]
    expect_equal(nrow(logged_file1), 1)
    expect_equal(logged_file1$is_dir, TRUE)
    logged_dir0 <- artifact_list0[artifact_list0$path == "artifact_subdirectory", ]
    expect_equal(nrow(logged_dir0), 1)
    expect_equal(logged_dir0$is_dir, TRUE)
    # Verify contents of logged directory
    artifact_list1 <- mlflow_list_artifacts("artifact_subdirectory")
    expect_equal(nrow(artifact_list1), 2)
    logged_file2 <- artifact_list1[artifact_list1$path ==
      paste("artifact_subdirectory", "a-my-file", sep = "/"), ]
    expect_equal(nrow(logged_file2), 1)
    expect_equal(logged_file2$is_dir, FALSE)
    expect_equal(strtoi(logged_file2$file_size), nchar(contents))
    # Verify contents of file logged under directory
    artifact_list2 <- mlflow_list_artifacts("directory_for_file")
    expect_equal(nrow(artifact_list2), 1)
    logged_file3 <- artifact_list2[artifact_list2$path ==
    paste("directory_for_file", "a-my-file", sep = "/"), ]
    expect_equal(nrow(logged_file3), 1)
    expect_equal(logged_file3$is_dir, FALSE)
    expect_equal(strtoi(logged_file3$file_size), nchar(contents))
  })
})

test_that("mlflow_log_batch() works", {
  mlflow_clear_test_dir("mlruns")
  mlflow_start_run()
  mlflow_log_batch(
    metrics = data.frame(key = c("mse", "mse", "rmse", "rmse", "nan", "Inf", "-Inf"),
                         value = c(21, 23, 42, 36, NaN, Inf, -Inf),
                         timestamp = c(100, 200, 300, 300, 400, 500, 600),
                         step = c(-4, 1, 7, 3, 8, 9, 10)),
    params = data.frame(key = c("l1", "optimizer"), value = c(0.01, "adam")),
    tags = data.frame(key = c("model_type", "data_year"),
                      value = c("regression", "2015"))
  )

  run <- mlflow_get_run()
  metrics <- run$metrics[[1]]
  params <- run$params[[1]]
  tags <- run$tags[[1]]

  expect_setequal(
    metrics$key,
    c("mse", "rmse", "nan", "Inf", "-Inf")
  )
  expect_equal(23, metrics$value[metrics$key == "mse"])
  expect_equal(42, metrics$value[metrics$key == "rmse"])
  expect_true(all(is.nan(metrics$value[metrics$key == "nan"])))
  expect_true(all(1.7976931348623157e308 <= (metrics$value[metrics$key == "Inf"])))
  expect_true(all(-1.7976931348623157e308 >= (metrics$value[metrics$key == "-Inf"])))
  expect_setequal(
    metrics$timestamp,
    purrr::map_vec(c(200, 300, 400, 500, 600), mlflow:::milliseconds_to_date)
  )
  expect_setequal(
    metrics$step,
    c(1, 7, 8, 9, 10)
  )

  metric_history <- mlflow_get_metric_history("mse")
  expect_setequal(
    metric_history$value,
    c(21, 23)
  )
  expect_setequal(
    metric_history$timestamp,
    purrr::map_vec(c(100, 200), mlflow:::milliseconds_to_date)
  )
  expect_setequal(
    metric_history$step,
    c(-4, 1)
  )

  expect_setequal(
    params$key,
    c("optimizer", "l1")
  )

  expect_setequal(
    params$value,
    c("adam", "0.01")
  )

  expect_identical("regression", tags$value[tags$key == "model_type"])
  expect_identical("2015", tags$value[tags$key == "data_year"])
})

test_that("mlflow_log_batch() throws for mismatched input data columns", {
  mlflow_clear_test_dir("mlruns")
  mlflow_start_run()
  error_text_regexp <- "*input dataframe must contain exactly the following columns*"

  expect_error(
    mlflow_log_batch(
      metrics = data.frame(key = c("mse"), value = c(10))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      metrics = data.frame(bad_column = c("bad"))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      metrics = data.frame(
       key = c("mse"),
       value = c(10),
       timestamp = c(100),
       step = c(1),
       bad_column = c("bad")
      )
    ),
    regexp = error_text_regexp
  )

  expect_error(
    mlflow_log_batch(
      params = data.frame(key = c("alpha"))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      params = data.frame(bad_column = c("bad"))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      params = data.frame(
       key = c("alpha"),
       value = c(10),
       bad_column = c("bad")
      )
    ),
    regexp = error_text_regexp
  )

  expect_error(
    mlflow_log_batch(
      tags = data.frame(key = c("my_tag"))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      tags = data.frame(bad_column = c("bad"))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      tags = data.frame(
       key = c("my_tag"),
       value = c("some tag info"),
       bad_column = c("bad")
      )
    ),
    regexp = error_text_regexp
  )

  expect_error(
    mlflow_log_batch(
      metrics = data.frame(
       bad_column = c("bad1")
      ),
      params = data.frame(
       another_bad_column = c("bad2")
      ),
      tags = data.frame(
       one_more_bad_column = c("bad3")
      )
    ),
    regexp = error_text_regexp
  )
})

test_that("mlflow_log_batch() throws for missing entries", {
  mlflow_clear_test_dir("mlruns")
  mlflow_start_run()
  error_text_regexp <- "*input dataframe contains a missing \\('NA'\\) entry*"

  expect_error(
    mlflow_log_batch(
      metrics = data.frame(key = c("mse", "rmse", "na_metric"),
                           value = c(21, 42, NA),
                           timestamp = c(100, 200, 300),
                           step = c(-4, 1, 3))
    ),
    regexp = error_text_regexp
  )

  expect_error(
    mlflow_log_batch(
      metrics = data.frame(key = c("mse", "rmse", "na_metric"),
                           value = c(21, 42, 9.2),
                           timestamp = c(NA, 200, 300),
                           step = c(-4, 1, NA))
    ),
    regexp = error_text_regexp
  )

  expect_error(
    mlflow_log_batch(
      params = data.frame(key = c(NA, "alpha"),
                          value = c("0.5", "2"))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      params = data.frame(key = c("n_layers", "alpha"),
                          value = c("4", NA))
    ),
    regexp = error_text_regexp
  )

  expect_error(
    mlflow_log_batch(
      tags = data.frame(key = c("first_tag", NA),
                        value = c("some tag info", "more tag info"))
    ),
    regexp = error_text_regexp
  )
  expect_error(
    mlflow_log_batch(
      tags = data.frame(key = c("first_tag", "second_tag"),
                        value = c(NA, NA))
    ),
    regexp = error_text_regexp
  )

  expect_error(
    mlflow_log_batch(
      metrics = data.frame(key = c("mse", "rmse", "na_metric"),
                           value = c(21, 42, 37),
                           timestamp = c(100, 200, 300),
                           step = c(-4, NA, 3)),
      params = data.frame(key = c("l1", "optimizer"), value = c(NA, "adam")),
      tags = data.frame(key = c(NA, "data_year"),
                        value = c("regression", NA))
    ),
    regexp = error_text_regexp
  )
})

test_that("mlflow observers receive tracking event callbacks", {
  num_observers <- 3L
  tracking_events <- rep(list(list()), num_observers)
  lapply(
    seq_along(tracking_events),
    function(idx) {
      observer <- structure(list(
        register_tracking_event = function(event_name, data) {
          tracking_events[[idx]][[event_name]] <<- append(
            tracking_events[[idx]][[event_name]], list(data)
          )
        }
      ))
      mlflow_register_external_observer(observer)
    }
  )
  client <- mlflow_client()
  experiment_id <- "0"
  run <- mlflow_start_run(client = client, experiment_id = experiment_id)
  mlflow_set_experiment(experiment_id = experiment_id)
  expect_equal(length(tracking_events), num_observers)
  for (idx in seq(num_observers)) {
    expect_equal(
      tracking_events[[idx]]$create_run[[1]]$run_id,
      run$run_id
    )
    expect_equal(
      tracking_events[[idx]]$create_run[[1]]$experiment_id,
      experiment_id
    )
    expect_equal(
      tracking_events[[idx]]$active_experiment_id[[1]]$experiment_id,
      experiment_id
    )
  }

  mlflow_end_run(client = client, run_id = run$run_uuid)
  expect_equal(length(tracking_events), num_observers)
  for (idx in seq(num_observers)) {
    expect_equal(
      tracking_events[[idx]]$set_terminated[[1]]$run_uuid, run$run_uuid
    )
  }
})

test_that("mlflow get metric history performs pagination", {
  mlflow_clear_test_dir("mlruns")
  run <- mlflow_start_run()

  batch_size <- 1000
  for (x in 0:25) {
    # 0th index entries for timestamp will not work due to pagination filter default logic
    # add a `+1` to the start and end values
    start <- (batch_size * x) + 1
    end <- (batch_size * (x + 1))
    metrics <- data.frame(key = rep(c("m1"), batch_size),
                        value = seq.int(from = start, to = end, by = 1),
                        timestamp = seq.int(from = start, to = end, by = 1),
                        step = seq.int(from = start, to = end, by = 1)
                       )
    mlflow_log_batch(metrics = metrics)
  }
  logged <- mlflow_get_metric_history(metric_key = "m1", run_id = run$run_id)

  expect_equal(nrow(logged), 26000)

  first_entry <- head(logged, n = 1)
  expect_equal(first_entry$key, "m1")
  expect_equal(first_entry$value, 1)
  expect_equal(first_entry$step, 1)
  expect_equal(first_entry$timestamp, purrr::map_vec(c(1), mlflow:::milliseconds_to_date)[[1]])

  last_entry <- tail(logged, n = 1)
  expect_equal(last_entry$key, "m1")
  expect_equal(last_entry$value, 26000)
  expect_equal(last_entry$step, 26000)
  expect_equal(last_entry$timestamp, purrr::map_vec(c(26000), mlflow:::milliseconds_to_date)[[1]])
  mlflow_end_run()
})

Try the mlflow package in your browser

Any scripts or data that you put into this service are public.

mlflow documentation built on Nov. 23, 2023, 9:13 a.m.