tests/testthat/test-aggregateCovariate.R

# library(Characterization)
# library(testthat)

context("AggregateCovariate")

tempFolder1 <- tempfile("runAggregate1")
on.exit(unlink(tempFolder1, recursive = TRUE), add = TRUE)
tempFolder2 <- tempfile("runAggregate2")
on.exit(unlink(tempFolder1, recursive = TRUE), add = TRUE)


test_that("createAggregateCovariateSettings", {
  targetIds <- sample(x = 100, size = sample(10, 1))
  outcomeIds <- sample(x = 100, size = sample(10, 1))
  covariateSettings <- FeatureExtraction::createCovariateSettings(
    useDemographicsGender = T,
    useDemographicsAge = T,
    useCharlsonIndex = T
  )

  caseCovariateSettings <- createDuringCovariateSettings(
    useConditionOccurrenceDuring = T
  )

  res <- createAggregateCovariateSettings(
    targetIds = targetIds,
    outcomeIds = outcomeIds,
    minPriorObservation = 10,
    outcomeWashoutDays = 100,
    riskWindowStart = 2, startAnchor = "cohort end",
    riskWindowEnd = 363, endAnchor = "cohort end",
    covariateSettings = covariateSettings,
    caseCovariateSettings = caseCovariateSettings,
    casePreTargetDuration = 180,
    casePostOutcomeDuration = 120
  )

  testthat::expect_equal(
    res$targetIds,
    targetIds
  )
  testthat::expect_equal(
    res$covariateSettings[[1]],
    covariateSettings
  )

  testthat::expect_equal(
    res$minPriorObservation,
    10
  )

  testthat::expect_equal(
    res$outcomeWashoutDays,
    100
  )

  testthat::expect_equal(
    res$riskWindowStart, 2
  )
  testthat::expect_equal(
    res$startAnchor, "cohort end"
  )
  testthat::expect_equal(
    res$riskWindowEnd, 363
  )
  testthat::expect_equal(
    res$endAnchor, "cohort end"
  )

  testthat::expect_equal(
    res$caseCovariateSettings,
    caseCovariateSettings
  )

  testthat::expect_equal(
    res$casePreTargetDuration,
    180
  )

  testthat::expect_equal(
    res$casePostOutcomeDuration,
    120
  )
})

test_that("error when using temporal features", {
  targetIds <- sample(x = 100, size = sample(10, 1))
  outcomeIds <- sample(x = 100, size = sample(10, 1))
  temporalCovariateSettings <- FeatureExtraction::createDefaultTemporalCovariateSettings()

  testthat::expect_error(
    createAggregateCovariateSettings(
      targetIds = targetIds,
      outcomeIds = outcomeIds,
      minPriorObservation = 10,
      outcomeWashoutDays = 100,
      riskWindowStart = 1, startAnchor = "cohort start",
      riskWindowEnd = 365, endAnchor = "cohort start",
      covariateSettings = temporalCovariateSettings,
      minCharacterizationMean = 0.01
    )
  )

  temporalCovariateSettings <- list(
    FeatureExtraction::createDefaultCovariateSettings(),
    FeatureExtraction::createDefaultTemporalCovariateSettings()
  )

  testthat::expect_error(
    createAggregateCovariateSettings(
      targetIds = targetIds,
      outcomeIds = outcomeIds,
      minPriorObservation = 10,
      outcomeWashoutDays = 100,
      riskWindowStart = 1, startAnchor = "cohort start",
      riskWindowEnd = 365, endAnchor = "cohort start",
      covariateSettings = temporalCovariateSettings,
      minCharacterizationMean = 0.01
    )
  )
})

test_that("createAggregateCovariateSettingsList", {
  targetIds <- sample(x = 100, size = sample(10, 1))
  outcomeIds <- sample(x = 100, size = sample(10, 1))
  covariateSettings1 <- FeatureExtraction::createCovariateSettings(
    useDemographicsGender = T,
    useDemographicsAge = T,
    useCharlsonIndex = T
  )
  covariateSettings2 <- FeatureExtraction::createCovariateSettings(
    useConditionOccurrenceAnyTimePrior = TRUE
  )
  covariateSettings <- list(covariateSettings1, covariateSettings2)

  res <- createAggregateCovariateSettings(
    targetIds = targetIds,
    outcomeIds = outcomeIds,
    riskWindowStart = 1, startAnchor = "cohort start",
    riskWindowEnd = 365, endAnchor = "cohort start",
    covariateSettings = covariateSettings
  )

  testthat::expect_equal(
    res$targetIds,
    targetIds
  )
  testthat::expect_equal(
    res$covariateSettings,
    covariateSettings
  )
})

test_that("createExecutionIds", {
  testIds <- createExecutionIds(10)
  testthat::expect_true(length(testIds) == 10)
  testthat::expect_true(length(unique(testIds)) == 10)

  testId1 <- createExecutionIds(1)
  testId2 <- createExecutionIds(1)
  testthat::expect_true(testId1 != testId2)
})


test_that("getAggregateCovariatesJobs", {
  targetIds <- c(1, 2, 4)
  outcomeIds <- c(3)
  covariateSettings <- FeatureExtraction::createCovariateSettings(
    useDemographicsGender = T,
    useDemographicsAge = T,
    useCharlsonIndex = T
  )
  caseCovariateSettings <- createDuringCovariateSettings(
    useConditionOccurrenceDuring = T
  )

  minPriorObservation <- sample(30, 1)

  res <- createAggregateCovariateSettings(
    targetIds = targetIds,
    outcomeIds = outcomeIds,
    minPriorObservation = minPriorObservation,
    outcomeWashoutDays = 1,
    riskWindowStart = 1, startAnchor = "cohort start",
    riskWindowEnd = 5 * 365, endAnchor = "cohort start",
    covariateSettings = covariateSettings,
    caseCovariateSettings = caseCovariateSettings
  )

  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = Characterization::createCharacterizationSettings(
      aggregateCovariateSettings = res
    ),
    threads = 1
  )

  testthat::expect_true(
    sum(c(
      "computeTargetAggregateCovariateAnalyses",
      "computeCaseAggregateCovariateAnalyses"
    ) %in%
      jobDf$functionName) == 2
  )
  testthat::expect_true(nrow(jobDf) == 2)

  testthat::expect_true(
    paste0("tac_1_", minPriorObservation) %in% jobDf$executionFolder
  )
  testthat::expect_true(
    paste0("cac_1_", minPriorObservation, "_1_365_365") %in% jobDf$executionFolder
  )

  settings <- ParallelLogger::convertJsonToSettings(jobDf$settings[1])
  covSettings <- ParallelLogger::convertJsonToSettings(settings$covariateSettingsJson)
  testthat::expect_true(
    covSettings[[1]]$DemographicsGender == T
  )
  testthat::expect_true(
    covSettings[[1]]$CharlsonIndex == T
  )
  testthat::expect_true(
    covSettings[[1]]$DemographicsAge == T
  )


  # now check threads = 2
  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = Characterization::createCharacterizationSettings(
      aggregateCovariateSettings = res
    ),
    threads = 2
  )
  testthat::expect_true(nrow(jobDf) == 4)

  testthat::expect_true(
    sum(c(
      paste0("tac_1_", minPriorObservation),
      paste0("tac_2_", minPriorObservation)
    )
    %in% jobDf$executionFolder) == 2
  )
  testthat::expect_true(
    sum(c(
      paste0("cac_1_", minPriorObservation, "_1_365_365"),
      paste0("cac_2_", minPriorObservation, "_1_365_365")
    )
    %in% jobDf$executionFolder) == 2
  )

  # now check threads = 3
  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = Characterization::createCharacterizationSettings(
      aggregateCovariateSettings = res
    ),
    threads = 3
  )
  testthat::expect_true(nrow(jobDf) == 2 * 3)

  # now check threads = 4
  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = res
    ),
    threads = 4
  )
  testthat::expect_true(nrow(jobDf) == 7)

  # now check threads = 5
  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = res
    ),
    threads = 5
  )
  testthat::expect_true(nrow(jobDf) == 7)

  testthat::expect_true(
    length(unique(unlist(lapply(
      1:nrow(jobDf),
      function(i) {
        ParallelLogger::convertJsonToSettings(jobDf$settings[i])$settingId
      }
    )))) == 2
  )


  # add more settings
  res2 <- createAggregateCovariateSettings(
    targetIds = targetIds,
    outcomeIds = outcomeIds,
    minPriorObservation = minPriorObservation + 1,
    outcomeWashoutDays = 100,
    riskWindowStart = 1, startAnchor = "cohort start",
    riskWindowEnd = 5 * 365, endAnchor = "cohort start",
    covariateSettings = covariateSettings,
    caseCovariateSettings = caseCovariateSettings
  )

  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = list(res, res2)
    ),
    threads = 1
  )
  testthat::expect_true(nrow(jobDf) == 4)
  testthat::expect_true(
    length(unique(unlist(lapply(
      1:nrow(jobDf),
      function(i) {
        ParallelLogger::convertJsonToSettings(jobDf$settings[i])$settingId
      }
    )))) == 4
  )

  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = list(res, res2)
    ),
    threads = 3
  )
  testthat::expect_true(nrow(jobDf) == 12)
  testthat::expect_true(
    length(unique(unlist(lapply(
      1:nrow(jobDf),
      function(i) {
        ParallelLogger::convertJsonToSettings(jobDf$settings[i])$settingId
      }
    )))) == 4
  )


  # test when extractNonCaseCovariates = F
  res3 <- Characterization:::createAggregateCovariateSettings(
    targetIds = 1,
    outcomeIds = 3,
    extractNonCaseCovariates = F
  )
  jobDf <- Characterization:::getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = list(res3)
    ),
    threads = 3
  )
  testthat::expect_true(nrow(jobDf) == 1)

  res4 <- Characterization:::createAggregateCovariateSettings(
    targetIds = 2,
    outcomeIds = 3,
    extractNonCaseCovariates = T
  )
  jobDf <- Characterization:::getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = list(res3, res4)
    ),
    threads = 3
  )

  # add checks
})

test_that("computeTargetAggregateCovariateAnalyses", {
  targetIds <- c(1, 2, 4)
  outcomeIds <- c(3)
  covariateSettings <- FeatureExtraction::createCovariateSettings(
    useDemographicsGender = T,
    useDemographicsAge = T,
    useCharlsonIndex = T
  )
  caseCovariateSettings <- createDuringCovariateSettings(
    useConditionOccurrenceDuring = T
  )

  res <- createAggregateCovariateSettings(
    targetIds = targetIds,
    outcomeIds = outcomeIds,
    minPriorObservation = 30,
    outcomeWashoutDays = 1,
    riskWindowStart = 1, startAnchor = "cohort start",
    riskWindowEnd = 5 * 365, endAnchor = "cohort start",
    covariateSettings = covariateSettings,
    caseCovariateSettings = caseCovariateSettings
  )

  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = res
    ),
    threads = 1
  )

  computeTargetAggregateCovariateAnalyses(
    connectionDetails = connectionDetails,
    cdmDatabaseSchema = "main",
    cdmVersion = 5,
    targetDatabaseSchema = "main",
    targetTable = "cohort",
    settings = ParallelLogger::convertJsonToSettings(jobDf$settings[1]),
    minCharacterizationMean = 0.01,
    databaseId = "madeup",
    outputFolder = tempFolder1
  )
  # check incremental does not run
  testthat::expect_true(
    sum(c(
      "cohort_details.csv",
      "settings.csv",
      "covariates.csv",
      "covariates_continuous.csv",
      "cohort_counts.csv",
      "covariate_ref.csv",
      "analysis_ref.csv"
    ) %in% dir(tempFolder1)) == length(dir(tempFolder1))
  )

  # check cohortCounts is done for all
  cohortDetails <- readr::read_csv(
    file.path(tempFolder1, "cohort_details.csv"),
    show_col_types = F
  )
  testthat::expect_true(
    nrow(unique(cohortDetails)) == nrow(cohortDetails)
  )
  testthat::expect_true(
    nrow(cohortDetails) == 8
  )

  aggCovs <- readr::read_csv(
    file = file.path(tempFolder1, "covariates.csv"),
    show_col_types = F
  )
  # check covariates is unique
  testthat::expect_true(
    nrow(aggCovs) == nrow(unique(aggCovs))
  )

  # check databaseId is added
  testthat::expect_true(
    aggCovs$database_id[1] == "madeup"
  )
})


test_that("computeCaseAggregateCovariateAnalyses", {
  targetIds <- c(1, 2, 4)
  outcomeIds <- c(3)
  covariateSettings <- FeatureExtraction::createCovariateSettings(
    useDemographicsGender = T,
    useDemographicsAge = T,
    useCharlsonIndex = T
  )
  caseCovariateSettings <- createDuringCovariateSettings(
    useConditionOccurrenceDuring = T
  )

  res <- createAggregateCovariateSettings(
    targetIds = targetIds,
    outcomeIds = outcomeIds,
    minPriorObservation = 30,
    outcomeWashoutDays = 1,
    riskWindowStart = 1, startAnchor = "cohort start",
    riskWindowEnd = 5 * 365, endAnchor = "cohort start",
    covariateSettings = covariateSettings,
    caseCovariateSettings = caseCovariateSettings
  )

  jobDf <- getAggregateCovariatesJobs(
    characterizationSettings = createCharacterizationSettings(
      aggregateCovariateSettings = res
    ),
    threads = 1
  )

  computeCaseAggregateCovariateAnalyses(
    connectionDetails = connectionDetails,
    cdmDatabaseSchema = "main",
    cdmVersion = 5,
    targetDatabaseSchema = "main",
    targetTable = "cohort",
    settings = ParallelLogger::convertJsonToSettings(jobDf$settings[2]),
    minCharacterizationMean = 0.01,
    databaseId = "madeup",
    outputFolder = tempFolder2
  )
  # check incremental does not run
  testthat::expect_true(
    sum(c(
      "cohort_details.csv",
      "settings.csv",
      "covariates.csv",
      "covariates_continuous.csv",
      "cohort_counts.csv",
      "covariate_ref.csv",
      "analysis_ref.csv"
    ) %in% dir(tempFolder2)) == length(dir(tempFolder2))
  )

  # check cohortCounts is done for all
  cohortDetails <- readr::read_csv(
    file.path(tempFolder2, "cohort_details.csv"),
    show_col_types = F
  )
  testthat::expect_true(
    nrow(unique(cohortDetails)) == nrow(cohortDetails)
  )
  testthat::expect_true(
    nrow(cohortDetails) == 3 * 5
  )

  aggCovs <- readr::read_csv(
    file = file.path(tempFolder2, "covariates.csv"),
    show_col_types = F
  )
  # check covariates is unique
  testthat::expect_true(
    nrow(aggCovs) == nrow(unique(aggCovs))
  )

  # check databaseId is added
  testthat::expect_true(
    aggCovs$database_id[1] == "madeup"
  )
})

Try the Characterization package in your browser

Any scripts or data that you put into this service are public.

Characterization documentation built on April 4, 2025, 2:02 a.m.