Nothing
# Copyright 2026 Observational Health Data Sciences and Informatics
#
# This file is part of CohortMethod
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# options(andromedaTempFolder = "s:/andromedaTemp")
# outputFolder <- "s:/temp/cohortMethodVignette2"
# maxCores <- 8
# library(dplyr)
#' Export cohort method results to CSV files
#'
#' @details
#' This requires that [runCmAnalyses()] has been executed first. It exports
#' all the results in the `outputFolder` to CSV files for sharing with other
#' sites.
#'
#' @param outputFolder The folder where runCmAnalyses() generated all results.
#' @param exportFolder The folder where the CSV files will written.
#' @param databaseId A unique ID for the database. This will be appended to
#' most tables.
#' @param minCellCount To preserve privacy: the minimum number of subjects contributing
#' to a count before it can be included in the results. If the
#' count is below this threshold, it will be set to `-minCellCount`.
#' @param maxCores How many parallel cores should be used?
#'
#' @return
#' Does not return anything. Is called for the side-effect of populating the `exportFolder`
#' with CSV files.
#'
#' @export
exportToCsv <- function(outputFolder,
exportFolder = file.path(outputFolder, "export"),
databaseId,
minCellCount = 5,
maxCores = 1) {
errorMessages <- checkmate::makeAssertCollection()
checkmate::assertCharacter(outputFolder, len = 1, add = errorMessages)
checkmate::assertDirectoryExists(outputFolder, add = errorMessages)
checkmate::assertFileExists(file.path(outputFolder, "cmAnalysesSpecifications.rds"), add = errorMessages)
checkmate::assertFileExists(file.path(outputFolder, "resultsSummary.rds"), add = errorMessages)
checkmate::assertCharacter(exportFolder, len = 1, add = errorMessages)
checkmate::assertAtomic(databaseId, len = 1, add = errorMessages)
checkmate::assertInt(minCellCount, lower = 0, add = errorMessages)
checkmate::assertInt(maxCores, lower = 1, add = errorMessages)
checkmate::reportAssertions(collection = errorMessages)
if (!file.exists(exportFolder)) {
dir.create(exportFolder, recursive = TRUE)
}
start <- Sys.time()
message("Exporting results to CSV")
targetComparator <- exportTargetComparator(
outputFolder = outputFolder,
exportFolder = exportFolder
)
exportCohortMethodAnalyses(
outputFolder = outputFolder,
exportFolder = exportFolder
)
exportFromCohortMethodData(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId
)
exportTargetComparatorOutcomes(
outputFolder = outputFolder,
exportFolder = exportFolder,
targetComparator = targetComparator
)
exportAttrition(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator,
minCellCount = minCellCount
)
exportCmFollowUpDist(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator,
minCellCount = minCellCount
)
exportCohortMethodResults(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator,
minCellCount = minCellCount
)
exportCmInteractionResults(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator,
minCellCount = minCellCount
)
exportLikelihoodProfiles(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator
)
exportCovariateBalance(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator,
minCellCount = minCellCount
)
exportSharedCovariateBalance(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator,
minCellCount = minCellCount
)
exportPreferenceScoreDistribution(
outputFolder = outputFolder,
exportFolder = exportFolder,
targetComparator = targetComparator,
databaseId = databaseId
)
exportPropensityModel(
outputFolder = outputFolder,
exportFolder = exportFolder,
targetComparator = targetComparator,
databaseId = databaseId
)
exportKaplanMeier(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator,
minCellCount = minCellCount,
maxCores = maxCores
)
exportDiagnosticsSummary(
outputFolder = outputFolder,
exportFolder = exportFolder,
databaseId = databaseId,
targetComparator = targetComparator
)
# Add all to zip file -------------------------------------------------------------------------------
message("Adding results to zip file")
zipName <- file.path(exportFolder, sprintf("Results_%s.zip", databaseId))
files <- list.files(exportFolder, pattern = ".*\\.csv$")
oldWd <- setwd(exportFolder)
on.exit(setwd(oldWd))
DatabaseConnector::createZipFile(zipFile = zipName, files = files)
delta <- Sys.time() - start
message("Exporting to CSV took ", signif(delta, 3), " ", attr(delta, "units"))
message("Results are ready for sharing at:", zipName)
}
getTargetComparatorId <- function(targetId, comparatorId, nestingCohortId, targetComparator) {
tcId <- targetComparator |>
filter(
.data$targetId == !!targetId,
.data$comparatorId == !!comparatorId,
if (is.null(!!nestingCohortId) || is.na(!!nestingCohortId))
is.na(.data$nestingCohortId)
else
.data$nestingCohortId == !!nestingCohortId
) |>
pull(.data$targetComparatorId)
return(tcId)
}
writeToCsv <- function(data, fileName, append = FALSE) {
colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data))
# Workaround for issue https://github.com/tidyverse/vroom/issues/519:
readr::local_edition(1)
readr::write_csv(x = data, file = fileName, append = append)
}
enforceMinCellValue <- function(data, fieldName, minValues, silent = FALSE) {
values <- pull(data, fieldName)
toCensor <- !is.na(values) & (values < minValues) & (values != 0)
if (!silent) {
percent <- round(100 * sum(toCensor) / nrow(data), 1)
message(
" censoring ",
sum(toCensor),
" values (",
percent,
"%) from ",
fieldName,
" because value below minimum"
)
}
if (all(is.na(toCensor)) || all(is.na(minValues))) {
data[, fieldName] <- NA
} else if (length(minValues) == 1) {
data[toCensor, fieldName] <- -minValues
} else {
data[toCensor, fieldName] <- -minValues[toCensor]
}
return(data)
}
createEmptyResult <- function(tableName) {
# Workaround for issue https://github.com/tidyverse/vroom/issues/519:
readr::local_edition(1)
columns <- readr::read_csv(
file = system.file("csv", "resultsDataModelSpecification.csv", package = "CohortMethod"),
show_col_types = FALSE) |>
SqlRender::snakeCaseToCamelCaseNames() |>
filter(.data$tableName == !!tableName) |>
pull(.data$columnName) |>
SqlRender::snakeCaseToCamelCase()
result <- vector(length = length(columns))
names(result) <- columns
result <- as_tibble(t(result), name_repair = "check_unique")
result <- result[FALSE, ]
return(result)
}
exportCohortMethodAnalyses <- function(outputFolder, exportFolder) {
message("- cm__analysis table")
cmAnalysesSpecificationsFile <- file.path(outputFolder, "cmAnalysesSpecifications.rds")
cmAnalysesSpecifications <- readRDS(cmAnalysesSpecificationsFile)
cmAnalysisToRow <- function(cmAnalysis) {
row <- tibble(
analysisId = cmAnalysis$analysisId,
description = cmAnalysis$description,
definition = as.character(cmAnalysis$toJson())
)
return(row)
}
cohortMethodAnalysis <- lapply(cmAnalysesSpecifications$cmAnalysisList, cmAnalysisToRow)
cohortMethodAnalysis <- bind_rows(cohortMethodAnalysis) |>
distinct()
fileName <- file.path(exportFolder, "cm_analysis.csv")
writeToCsv(cohortMethodAnalysis, fileName)
}
exportTargetComparator <- function(outputFolder, exportFolder) {
cmAnalysesSpecificationsFile <- file.path(outputFolder, "cmAnalysesSpecifications.rds")
cmAnalysesSpecifications <- readRDS(cmAnalysesSpecificationsFile)
targetComparatorOutcomesList <- cmAnalysesSpecifications$targetComparatorOutcomesList
# targetComparatorOutcomes = targetComparatorOutcomesList[[1]]
createHash <- function(targetComparatorOutcomes) {
row <- tibble(
targetId = targetComparatorOutcomes$targetId,
comparatorId = targetComparatorOutcomes$comparatorId,
nestingCohortId = if (is.null(targetComparatorOutcomes$nestingCohortId))
as.numeric(NA)
else
targetComparatorOutcomes$nestingCohortId
)
hashString <- paste(row$targetId, row$comparatorId, row$nestingCohortId)
hash <- as.integer(as.numeric(paste0("0x", digest::digest(hashString, algo = "murmur32", serialize = FALSE))) - 2^31)
row$targetComparatorId <- hash
return(row)
}
targetComparator <- lapply(targetComparatorOutcomesList, createHash)
targetComparator <- bind_rows(targetComparator)
fileName <- file.path(exportFolder, "cm_target_comparator.csv")
writeToCsv(targetComparator, fileName)
return(targetComparator)
}
exportFromCohortMethodData <- function(outputFolder, exportFolder, databaseId) {
# Combining processing of tables so we only have to load cmData objects once
message("- covariate_analysis and covariate tables")
reference <- getFileReference(outputFolder)
cmDataFiles <- reference |>
distinct(.data$cohortMethodDataFile) |>
filter(.data$cohortMethodDataFile != "") |>
pull()
covariateAnalysis <- list()
covariates <- list()
for (cmDataFile in cmDataFiles) {
cmData <- CohortMethod::loadCohortMethodData(file.path(outputFolder, cmDataFile))
rows <- reference |>
filter(.data$cohortMethodDataFile == !!cmDataFile)
analysisIds <- rows |>
distinct(.data$analysisId)
covariateAnalysis[[length(covariateAnalysis) + 1]] <- cmData$analysisRef |>
select(
covariateAnalysisId = "analysisId",
covariateAnalysisName = "analysisName"
) |>
collect() |>
cross_join(analysisIds)
covariates[[length(covariates) + 1]] <- cmData$covariateRef |>
select(
"covariateId",
"covariateName",
covariateAnalysisId = "analysisId"
) |>
collect() |>
cross_join(analysisIds) |>
mutate(databaseId = !!databaseId)
}
covariateAnalysis <- bind_rows(covariateAnalysis) |>
distinct(.data$covariateAnalysisId, .data$analysisId, .keep_all = TRUE)
if (nrow(covariateAnalysis) == 0) {
covariateAnalysis <- createEmptyResult("cm_covariate_analysis")
}
fileName <- file.path(exportFolder, "cm_covariate_analysis.csv")
writeToCsv(covariateAnalysis, fileName)
covariates <- bind_rows(covariates) |>
distinct(.data$covariateId, .data$analysisId, .keep_all = TRUE)
if (nrow(covariates) == 0) {
covariates <- createEmptyResult("cm_covariate")
}
fileName <- file.path(exportFolder, "cm_covariate.csv")
writeToCsv(covariates, fileName)
}
exportTargetComparatorOutcomes <- function(outputFolder, exportFolder, targetComparator) {
message("- target_comparator_outcome table")
cmAnalysesSpecificationsFile <- file.path(outputFolder, "cmAnalysesSpecifications.rds")
cmAnalysesSpecifications <- readRDS(cmAnalysesSpecificationsFile)
convertOutcomeToTable <- function(outcome) {
table <- tibble(
outcomeId = outcome$outcomeId,
outcomeOfInterest = as.integer(outcome$outcomeOfInterest),
trueEffectSize = if (is.null(outcome$trueEffectSize)) as.numeric(NA) else outcome$trueEffectSize
)
return(table)
}
# tcos = cmAnalysesSpecifications$targetComparatorOutcomesList[[2]]
convertToTable <- function(tcos) {
tcId <- getTargetComparatorId(targetId = tcos$targetId,
comparatorId = tcos$comparatorId,
nestingCohortId = tcos$nestingCohortId,
targetComparator = targetComparator)
table <- lapply(tcos$outcomes, convertOutcomeToTable) |>
bind_rows() |>
mutate(
targetComparatorId = tcId
)
return(table)
}
table <- lapply(cmAnalysesSpecifications$targetComparatorOutcomesList, convertToTable)
table <- bind_rows(table)
fileName <- file.path(exportFolder, "cm_target_comparator_outcome.csv")
writeToCsv(table, fileName)
}
exportAttrition <- function(outputFolder,
exportFolder,
databaseId,
targetComparator,
minCellCount) {
message("- attrition table")
fileName <- file.path(exportFolder, "cm_attrition.csv")
if (file.exists(fileName)) {
unlink(fileName)
}
reference <- getFileReference(outputFolder) |>
filter(.data$outcomeOfInterest) |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId"))
first <- !file.exists(fileName)
pb <- txtProgressBar(style = 3)
for (i in seq_len(nrow(reference))) {
outcomeModel <- readRDS(file.path(outputFolder, reference$outcomeModelFile[i]))
attrition <- outcomeModel$attrition |>
select("description", "targetPersons", "comparatorPersons") |>
mutate(sequenceNumber = row_number())
attritionTarget <- attrition |>
select("sequenceNumber", "description", subjects = "targetPersons") |>
mutate(exposureId = reference$targetId[i])
attritionComparator <- attrition |>
select("sequenceNumber", "description", subjects = "comparatorPersons") |>
mutate(exposureId = reference$comparatorId[i])
attrition <- bind_rows(attritionTarget, attritionComparator) |>
mutate(
targetComparatorId = reference$targetComparatorId[i],
analysisId = reference$analysisId[i],
outcomeId = reference$outcomeId[i],
databaseId = databaseId
) |>
enforceMinCellValue("subjects", minCellCount, silent = TRUE)
writeToCsv(attrition, fileName, append = !first)
first <- FALSE
if (i %% 100 == 10) {
setTxtProgressBar(pb, i / nrow(reference))
}
}
if (first) {
results <- createEmptyResult("cm_attrition")
writeToCsv(results, fileName)
}
setTxtProgressBar(pb, 1)
close(pb)
}
exportCmFollowUpDist <- function(outputFolder,
exportFolder,
databaseId,
targetComparator,
minCellCount) {
message("- cm_follow_up_dist table")
# row = rows[1, ]
getFollowUpDist <- function(row) {
if (row$strataFile == "") {
strataPop <- readRDS(file.path(outputFolder, row$studyPopFile))
} else {
strataPop <- readRDS(file.path(outputFolder, row$strataFile))
}
targetDist <- quantile(
strataPop$survivalTime[strataPop$treatment == 1],
c(0, 0.1, 0.25, 0.5, 0.85, 0.9, 1)
)
comparatorDist <- quantile(
strataPop$survivalTime[strataPop$treatment == 0],
c(0, 0.1, 0.25, 0.5, 0.85, 0.9, 1)
)
if (sum(strataPop$treatment == 1) == 0) {
targetMinMaxDates <- tibble(
minDate = as.Date(NA),
maxDate = as.Date(NA)
)
} else {
targetMinMaxDates <- strataPop |>
filter(.data$treatment == 1) |>
summarise(
minDate = min(.data$cohortStartDate),
maxDate = max(.data$cohortStartDate)
)
}
if (sum(strataPop$treatment == 0) == 0) {
comparatorMinMaxDates <- tibble(
minDate = as.Date(NA),
maxDate = as.Date(NA)
)
} else {
comparatorMinMaxDates <- strataPop |>
filter(.data$treatment == 0) |>
summarise(
minDate = min(.data$cohortStartDate),
maxDate = max(.data$cohortStartDate)
)
}
table <- tibble(
target_comparator_id = row$targetComparatorId,
outcome_id = row$outcomeId,
analysis_id = row$analysisId,
target_min_days = targetDist[1],
target_p10_days = targetDist[2],
target_p25_days = targetDist[3],
target_median_days = targetDist[4],
target_p75_days = targetDist[5],
target_p90_days = targetDist[6],
target_max_days = targetDist[7],
comparator_min_days = comparatorDist[1],
comparator_p10_days = comparatorDist[2],
comparator_p25_days = comparatorDist[3],
comparator_median_days = comparatorDist[4],
comparator_p75_days = comparatorDist[5],
comparator_p90_days = comparatorDist[6],
comparator_max_days = comparatorDist[7],
targetMinDate = targetMinMaxDates$minDate,
targetMaxDate = targetMinMaxDates$maxDate,
comparatorMinDate = comparatorMinMaxDates$minDate,
comparatorMaxDate = comparatorMinMaxDates$maxDate
)
return(table)
}
reference <- getFileReference(outputFolder)
rows <- getFileReference(outputFolder) |>
filter(.data$outcomeOfInterest) |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId"))
results <- lapply(split(rows, 1:nrow(rows)), getFollowUpDist)
results <- bind_rows(results)
results$database_id <- databaseId
if (nrow(results) == 0) {
results <- createEmptyResult("cm_follow_up_dist")
}
fileName <- file.path(exportFolder, "cm_follow_up_dist.csv")
writeToCsv(results, fileName)
}
exportCohortMethodResults <- function(outputFolder,
exportFolder,
databaseId,
targetComparator,
minCellCount) {
message("- cm__result table")
results <- getResultsSummary(outputFolder)
results <- results |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId")) |>
select(
"analysisId",
"targetComparatorId",
"outcomeId",
"rr",
"ci95Lb",
"ci95Ub",
"p",
"oneSidedP",
"targetSubjects",
"comparatorSubjects",
"targetDays",
"comparatorDays",
"targetOutcomes",
"comparatorOutcomes",
"logRr",
"seLogRr",
"llr",
"calibratedRr",
"calibratedCi95Lb",
"calibratedCi95Ub",
"calibratedP",
"calibratedOneSidedP",
"calibratedLogRr",
"calibratedSeLogRr",
"targetEstimator"
) |>
mutate(databaseId = !!databaseId) |>
enforceMinCellValue("targetSubjects", minCellCount) |>
enforceMinCellValue("comparatorSubjects", minCellCount) |>
enforceMinCellValue("targetOutcomes", minCellCount) |>
enforceMinCellValue("comparatorOutcomes", minCellCount)
fileName <- file.path(exportFolder, "cm_result.csv")
writeToCsv(results, fileName)
}
exportCmInteractionResults <- function(outputFolder,
exportFolder,
databaseId,
targetComparator,
minCellCount) {
message("- cm_interaction_result table")
results <- getInteractionResultsSummary(outputFolder)
if (nrow(results) == 0) {
results <- createEmptyResult("cm_interaction_result")
} else {
results <- results |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId")) |>
select(
"analysisId",
"targetComparatorId",
"outcomeId",
"interactionCovariateId",
"rr",
"ci95Lb",
"ci95Ub",
"p",
"targetSubjects",
"comparatorSubjects",
"targetDays",
"comparatorDays",
"targetOutcomes",
"comparatorOutcomes",
"logRr",
"seLogRr",
"calibratedRr",
"calibratedCi95Lb",
"calibratedCi95Ub",
"calibratedP",
"calibratedLogRr",
"calibratedSeLogRr",
"targetEstimator"
) |>
mutate(databaseId = !!databaseId) |>
enforceMinCellValue("targetSubjects", minCellCount) |>
enforceMinCellValue("comparatorSubjects", minCellCount) |>
enforceMinCellValue("targetOutcomes", minCellCount) |>
enforceMinCellValue("comparatorOutcomes", minCellCount)
}
fileName <- file.path(exportFolder, "cm_interaction_result.csv")
writeToCsv(results, fileName)
}
exportLikelihoodProfiles <- function(outputFolder,
exportFolder,
databaseId,
targetComparator) {
message("- likelihood_profile table")
reference <- getFileReference(outputFolder)
reference <- reference |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId"))
fileName <- file.path(exportFolder, "cm_likelihood_profile.csv")
if (file.exists(fileName)) {
unlink(fileName)
}
first <- TRUE
pb <- txtProgressBar(style = 3)
for (i in seq_len(nrow(reference))) {
if (reference$outcomeModelFile[i] != "") {
outcomeModel <- readRDS(file.path(outputFolder, reference$outcomeModelFile[i]))
profile <- outcomeModel$logLikelihoodProfile
if (!is.null(profile)) {
profile <- profile |>
transmute(
logRr = .data$point,
logLikelihood = .data$value - max(.data$value),
gradient = .data$derivative
) |>
mutate(
targetComparatorId = reference$targetComparatorId[i],
outcomeId = reference$outcomeId[i],
analysisId = reference$analysisId[i],
databaseId = !!databaseId
)
writeToCsv(profile, fileName, append = !first)
first <- FALSE
}
}
if (first) {
results <- createEmptyResult("cm_likelihood_profile")
writeToCsv(results, fileName)
}
setTxtProgressBar(pb, i / nrow(reference))
}
setTxtProgressBar(pb, 1)
close(pb)
}
exportCovariateBalance <- function(outputFolder,
exportFolder,
databaseId,
targetComparator,
minCellCount) {
message("- covariate_balance table")
cmAnalysesSpecificationsFile <- file.path(outputFolder, "cmAnalysesSpecifications.rds")
cmAnalysesSpecifications <- readRDS(cmAnalysesSpecificationsFile)
cmDiagnosticThresholds <- cmAnalysesSpecifications$cmDiagnosticThresholds
reference <- getFileReference(outputFolder) |>
filter(.data$balanceFile != "") |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId"))
balanceFiles <- reference |>
distinct(.data$balanceFile) |>
pull()
fileName <- file.path(exportFolder, "cm_covariate_balance.csv")
if (file.exists(fileName)) {
unlink(fileName)
}
first <- TRUE
pb <- txtProgressBar(style = 3)
for (i in seq_along(balanceFiles)) {
rows <- reference |>
filter(.data$balanceFile == !!balanceFiles[i])
balance <- readRDS(file.path(outputFolder, balanceFiles[i]))
balance <- tibble(
databaseId = !!databaseId,
targetComparatorId = rows$targetComparatorId[1],
outcomeId = rows$outcomeId,
analysisId = unique(rows$analysisId)
) |>
cross_join(tidyBalance(balance, minCellCount, cmDiagnosticThresholds))
writeToCsv(balance, fileName, append = !first)
first <- FALSE
setTxtProgressBar(pb, i / length(balanceFiles))
}
if (first) {
results <- createEmptyResult("cm_covariate_balance")
writeToCsv(results, fileName)
}
setTxtProgressBar(pb, 1)
close(pb)
}
exportSharedCovariateBalance <- function(outputFolder,
exportFolder,
databaseId,
targetComparator,
minCellCount) {
message("- shared_covariate_balance table")
cmAnalysesSpecificationsFile <- file.path(outputFolder, "cmAnalysesSpecifications.rds")
cmAnalysesSpecifications <- readRDS(cmAnalysesSpecificationsFile)
cmDiagnosticThresholds <- cmAnalysesSpecifications$cmDiagnosticThresholds
reference <- getFileReference(outputFolder) |>
filter(.data$sharedBalanceFile != "") |>
distinct(.data$sharedBalanceFile, .data$analysisId, .data$targetId, .data$comparatorId, .data$nestingCohortId) |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId"))
sharedBalanceFiles <- reference |>
distinct(.data$sharedBalanceFile) |>
pull()
fileName <- file.path(exportFolder, "cm_shared_covariate_balance.csv")
if (file.exists(fileName)) {
unlink(fileName)
}
first <- TRUE
pb <- txtProgressBar(style = 3)
for (i in seq_along(sharedBalanceFiles)) {
rows <- reference |>
filter(.data$sharedBalanceFile == sharedBalanceFiles[i])
balance <- readRDS(file.path(outputFolder, sharedBalanceFiles[i]))
balance <- tibble(
databaseId = !!databaseId,
targetComparatorId = rows$targetComparatorId[1],
analysisId = unique(rows$analysisId)
) |>
cross_join(tidyBalance(balance, minCellCount, cmDiagnosticThresholds))
writeToCsv(balance, fileName, append = !first)
first <- FALSE
setTxtProgressBar(pb, i / length(sharedBalanceFiles))
}
if (first) {
results <- createEmptyResult("cm_shared_covariate_balance")
writeToCsv(results, fileName)
}
setTxtProgressBar(pb, 1)
close(pb)
}
tidyBalance <- function(balance, minCellCount, cmDiagnosticThresholds) {
inferredTargetBeforeSize <- mean(balance$beforeMatchingSumTarget / balance$beforeMatchingMeanTarget, na.rm = TRUE)
inferredComparatorBeforeSize <- mean(balance$beforeMatchingSumComparator / balance$beforeMatchingMeanComparator, na.rm = TRUE)
inferredTargetAfterSize <- mean(balance$afterMatchingSumTarget / balance$afterMatchingMeanTarget, na.rm = TRUE)
inferredComparatorAfterSize <- mean(balance$afterMatchingSumComparator / balance$afterMatchingMeanComparator, na.rm = TRUE)
# Need to recompute balanced indicator because cmDiagnosticsThreshold may have different values from
# computeCovariateBalanceArgs:
isBalanced <- function(sdm, sdmVariance) {
if (is.null(cmDiagnosticThresholds$sdmAlpha)) {
return(if_else(abs(sdm) < cmDiagnosticThresholds$sdmThreshold, 1, 0))
} else {
correctedAlpha <- cmDiagnosticThresholds$sdmAlpha / sum(!is.na(sdmVariance))
p <- computeBalanceP(sdm, sdmVariance, cmDiagnosticThresholds$sdmThreshold)
return(if_else(p > correctedAlpha, 1, 0))
}
}
balance <- balance |>
mutate(beforeMatchingBalanced = isBalanced(.data$beforeMatchingStdDiff, .data$beforeMatchingSdmVariance),
afterMatchingBalanced = isBalanced(.data$afterMatchingStdDiff, .data$afterMatchingSdmVariance)) |>
select("covariateId",
targetMeanBefore = "beforeMatchingMeanTarget",
comparatorMeanBefore = "beforeMatchingMeanComparator",
stdDiffBefore = "beforeMatchingStdDiff",
stdDiffVarBefore = "beforeMatchingSdmVariance",
balancedBefore = "beforeMatchingBalanced",
targetMeanAfter = "afterMatchingMeanTarget",
comparatorMeanAfter = "afterMatchingMeanComparator",
stdDiffAfter = "afterMatchingStdDiff",
stdDiffVarAfter = "afterMatchingSdmVariance",
balancedAfter = "afterMatchingBalanced",
meanBefore = "beforeMatchingMean",
meanAfter = "afterMatchingMean",
"targetStdDiff",
"comparatorStdDiff",
"targetComparatorStdDiff",
) |>
mutate(
targetMeanBefore = ifelse(is.na(.data$targetMeanBefore), 0, .data$targetMeanBefore),
comparatorMeanBefore = ifelse(is.na(.data$comparatorMeanBefore), 0, .data$comparatorMeanBefore),
stdDiffBefore = ifelse(is.na(.data$stdDiffBefore), 0, .data$stdDiffBefore),
stdDiffVarBefore = ifelse(is.na(.data$stdDiffVarBefore), 0, .data$stdDiffVarBefore),
balancedBefore = ifelse(is.na(.data$balancedBefore), 1, .data$balancedBefore),
targetMeanAfter = ifelse(is.na(.data$targetMeanAfter), 0, .data$targetMeanAfter),
comparatorMeanAfter = ifelse(is.na(.data$comparatorMeanAfter), 0, .data$comparatorMeanAfter),
stdDiffAfter = ifelse(is.na(.data$stdDiffAfter), 0, .data$stdDiffAfter),
stdDiffVarAfter = ifelse(is.na(.data$stdDiffVarAfter), 0, .data$stdDiffVarAfter),
balancedAfter = ifelse(is.na(.data$balancedAfter), 1, .data$balancedAfter),
meanBefore = ifelse(is.na(.data$meanBefore), 0, .data$meanBefore),
meanAfter = ifelse(is.na(.data$stdDiffAfter), 0, .data$meanAfter),
targetStdDiff = ifelse(is.na(.data$targetStdDiff), 0, .data$targetStdDiff),
comparatorStdDiff = ifelse(is.na(.data$comparatorStdDiff), 0, .data$comparatorStdDiff),
targetComparatorStdDiff = ifelse(is.na(.data$targetComparatorStdDiff), 0, .data$targetComparatorStdDiff)
) |>
filter(!(round(.data$targetMeanBefore) == 0 &
round(.data$comparatorMeanBefore, 3) == 0 &
round(.data$stdDiffBefore, 3) == 0 &
round(.data$targetMeanAfter, 3) == 0 &
round(.data$comparatorMeanAfter, 3) == 0 &
round(.data$stdDiffAfter, 3) == 0 &
round(.data$meanBefore, 3) == 0 &
round(.data$meanAfter, 3) == 0 &
round(.data$targetStdDiff, 3) == 0 &
round(.data$comparatorStdDiff, 3) == 0 &
round(.data$targetComparatorStdDiff, 3) == 0)
) |>
enforceMinCellValue("targetMeanBefore",
minCellCount / inferredTargetBeforeSize,
silent = TRUE
) |>
enforceMinCellValue("comparatorMeanBefore",
minCellCount / inferredComparatorBeforeSize,
silent = TRUE
) |>
enforceMinCellValue("targetMeanAfter",
minCellCount / inferredTargetAfterSize,
silent = TRUE
) |>
enforceMinCellValue("comparatorMeanAfter",
minCellCount / inferredComparatorAfterSize,
silent = TRUE
) |>
enforceMinCellValue("meanBefore",
minCellCount / inferredComparatorAfterSize,
silent = TRUE
) |>
enforceMinCellValue("meanAfter",
minCellCount / inferredComparatorAfterSize,
silent = TRUE
) |>
mutate(
targetMeanBefore = round(.data$targetMeanBefore, 3),
comparatorMeanBefore = round(.data$comparatorMeanBefore, 3),
stdDiffBefore = round(.data$stdDiffBefore, 3),
targetMeanAfter = round(.data$targetMeanAfter, 3),
comparatorMeanAfter = round(.data$comparatorMeanAfter, 3),
stdDiffAfter = round(.data$stdDiffAfter, 3),
meanBefore = round(.data$meanBefore, 3),
meanAfter = round(.data$meanAfter, 3),
targetStdDiff = round(.data$targetStdDiff, 3),
comparatorStdDiff = round(.data$comparatorStdDiff, 3),
targetComparatorStdDiff = round(.data$targetComparatorStdDiff, 3)
)
return(balance)
}
exportPreferenceScoreDistribution <- function(outputFolder,
exportFolder,
databaseId,
targetComparator) {
message("- preference_score_dist table")
reference <- getFileReference(outputFolder) |>
filter(.data$sharedPsFile != "") |>
distinct(.data$sharedPsFile, .data$analysisId, .data$targetId, .data$comparatorId, .data$nestingCohortId) |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId"))
# rows <- split(reference, reference$sharedPsFile)[[2]]
preparePlot <- function(rows) {
ps <- readRDS(file.path(outputFolder, rows$sharedPsFile[1]))
if (nrow(ps) > 0 &&
min(ps$propensityScore) < max(ps$propensityScore) &&
sum(ps$treatment == 1) > 1 &&
sum(ps$treatment == 0) > 1 ) {
ps <- computePreferenceScore(ps)
d1 <- density(ps$preferenceScore[ps$treatment == 1], from = 0, to = 1, n = 100)
d0 <- density(ps$preferenceScore[ps$treatment == 0], from = 0, to = 1, n = 100)
result <- rows |>
select(
"analysisId",
"targetComparatorId"
) |>
mutate(databaseId = !!databaseId) |>
cross_join(
tibble(
preferenceScore = d1$x,
targetDensity = d1$y,
comparatorDensity = d0$y
)
)
return(result)
} else {
return(NULL)
}
}
data <- lapply(split(reference, reference$sharedPsFile), preparePlot)
data <- bind_rows(data)
if (nrow(data) == 0) {
data <- createEmptyResult("cm_preference_score_dist")
}
fileName <- file.path(exportFolder, "cm_preference_score_dist.csv")
writeToCsv(data, fileName)
}
exportPropensityModel <- function(outputFolder,
exportFolder,
databaseId,
targetComparator) {
message("- propensity_model table")
reference <- getFileReference(outputFolder) |>
filter(.data$sharedPsFile != "") |>
distinct(.data$sharedPsFile, .data$analysisId, .data$targetId, .data$comparatorId, .data$nestingCohortId) |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId"))
# rows <- split(reference, reference$sharedPsFile)[[1]]
prepareData <- function(rows) {
ps <- readRDS(file.path(outputFolder, rows$sharedPsFile[1]))
metaData <- attr(ps, "metaData")
if (is.null(metaData$psError) || metaData$psError == "OK") {
model <- metaData$psModelCoef
model <- tibble(
covariateId = names(metaData$psModelCoef),
coefficient = as.vector(metaData$psModelCoef)
) |>
filter(.data$coefficient != 0) |>
mutate(covariateId = ifelse(.data$covariateId == "(Intercept)", 0, .data$covariateId)) |>
mutate(covariateId = as.numeric(.data$covariateId))
rows <- rows |>
select("targetComparatorId", "analysisId") |>
mutate(databaseId = !!databaseId) |>
cross_join(model)
return(rows)
} else if (grepl("High correlation", metaData$psError)) {
model <- metaData$psHighCorrelation |>
mutate(coefficient = .data$correlation * 1e6) |>
select("covariateId", "coefficient")
rows <- rows |>
select("targetComparatorId", "analysisId") |>
mutate(databaseId = !!databaseId) |>
cross_join(model)
} else {
return(NULL)
}
}
data <- lapply(split(reference, reference$sharedPsFile), prepareData)
data <- bind_rows(data)
if (nrow(data) == 0) {
data <- createEmptyResult("cm_propensity_model")
}
fileName <- file.path(exportFolder, "cm_propensity_model.csv")
writeToCsv(data, fileName)
}
exportKaplanMeier <- function(outputFolder,
exportFolder,
databaseId,
targetComparator,
minCellCount,
maxCores) {
message("- kaplan_meier_dist table")
message(" Computing KM curves")
reference <- getFileReference(outputFolder) |>
filter(.data$outcomeOfInterest) |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId")) |>
select(
"strataFile",
"studyPopFile",
"targetComparatorId",
"outcomeId",
"analysisId"
)
tempFolder <- file.path(exportFolder, "temp")
if (!file.exists(tempFolder)) {
dir.create(tempFolder)
}
if (nrow(reference) > 0) {
tasks <- split(reference, seq_len(nrow(reference)))
cluster <- ParallelLogger::makeCluster(min(length(tasks), 4, maxCores))
ParallelLogger::clusterApply(cluster,
tasks,
prepareKm,
outputFolder = outputFolder,
tempFolder = tempFolder,
databaseId = databaseId,
minCellCount = minCellCount
)
ParallelLogger::stopCluster(cluster)
}
message(" Writing to single csv file")
outputFile <- file.path(exportFolder, "cm_kaplan_meier_dist.csv")
files <- list.files(tempFolder, "km_.*.rds", full.names = TRUE)
first <- TRUE
pb <- txtProgressBar(style = 3)
for (i in seq_along(files)) {
data <- readRDS(files[i])
writeToCsv(data, outputFile, append = !first)
first <- FALSE
if (i %% 100 == 10) {
setTxtProgressBar(pb, i / length(files))
}
}
if (first) {
data <- createEmptyResult("cm_kaplan_meier_dist")
writeToCsv(data, outputFile)
}
setTxtProgressBar(pb, 1)
close(pb)
unlink(tempFolder, recursive = TRUE)
}
# task = tasks[[1]]
prepareKm <- function(task,
outputFolder,
tempFolder,
databaseId,
minCellCount) {
ParallelLogger::logTrace(
"Preparing KM plot for target_comparator ",
task$targetComparatorId,
", outcome ",
task$outcomeId,
", analysis ",
task$analysisId
)
outputFileName <- file.path(tempFolder, sprintf(
"km_tc%s_o%s_a%s.rds",
task$targetComparatorId,
task$outcomeId,
task$analysisId
))
if (file.exists(outputFileName)) {
return(NULL)
}
popFile <- task$strataFile
if (popFile == "") {
popFile <- task$studyPopFile
}
population <- readRDS(file.path(
outputFolder,
popFile
))
if (nrow(population) == 0) {
# Can happen when matching and treatment is predictable
return(NULL)
}
data <- prepareKaplanMeierForExport(population)
if (is.null(data)) {
# No shared strata
return(NULL)
}
data$targetComparatorId <- task$targetComparatorId
data$outcomeId <- task$outcomeId
data$analysisId <- task$analysisId
data$databaseId <- databaseId
data <- enforceMinCellValue(data, "targetAtRisk", minCellCount)
data <- enforceMinCellValue(data, "comparatorAtRisk", minCellCount)
saveRDS(data, outputFileName)
}
prepareKaplanMeierForExport <- function(population) {
data <- prepareKaplanMeier(population, dataCutoff = 0.9)
dataWide <- full_join(
data |>
filter(.data$treatment == 1) |>
select(
timeDay = "time",
targetSurvival = "survival",
targetSurvivalLb = "lower",
targetSurvivalUb = "upper",
targetAtRisk = "nAtRisk"
),
data |>
filter(.data$treatment == 0) |>
select(
timeDay = "time",
comparatorSurvival = "survival",
comparatorSurvivalLb = "lower",
comparatorSurvivalUb = "upper",
comparatorAtRisk = "nAtRisk"
),
by = join_by("timeDay")
)
if (nrow(dataWide) > 0) {
while (sum(is.na(dataWide$targetSurvival)) > 0) {
dataWide <- dataWide |>
mutate(targetSurvival = if_else(is.na(.data$targetSurvival),
lag(.data$targetSurvival, order_by = .data$timeDay, default = 1),
.data$targetSurvival),
targetSurvivalLb = if_else(is.na(.data$targetSurvivalLb),
lag(.data$targetSurvivalLb, order_by = .data$timeDay, default = 1),
.data$targetSurvivalLb),
targetSurvivalUb = if_else(is.na(.data$targetSurvivalUb),
lag(.data$targetSurvivalUb, order_by = .data$timeDay, default = 1),
.data$targetSurvivalUb)) |>
ungroup()
}
while (sum(is.na(dataWide$comparatorSurvival)) > 0) {
dataWide <- dataWide |>
mutate(comparatorSurvival = if_else(is.na(.data$comparatorSurvival),
lag(.data$comparatorSurvival, order_by = .data$timeDay, default = 1),
.data$comparatorSurvival),
comparatorSurvivalLb = if_else(is.na(.data$comparatorSurvivalLb),
lag(.data$comparatorSurvivalLb, order_by = .data$timeDay, default = 1),
.data$comparatorSurvivalLb),
comparatorSurvivalUb = if_else(is.na(.data$comparatorSurvivalUb),
lag(.data$comparatorSurvivalUb, order_by = .data$timeDay, default = 1),
.data$comparatorSurvivalUb)) |>
ungroup()
}
}
# Remove duplicate (except time) entries:
dataWide <- dataWide[order(dataWide$timeDay), ]
dataWide <- dataWide[!duplicated(dataWide[, -1]), ]
return(dataWide)
}
exportDiagnosticsSummary <- function(outputFolder,
exportFolder,
databaseId,
targetComparator) {
message("- diagnostics_summary table")
results <- getDiagnosticsSummary(outputFolder) |>
inner_join(targetComparator, by = join_by("targetId", "comparatorId", "nestingCohortId")) |>
select(-"nestingCohortId", -"targetId", -"comparatorId") |>
mutate(databaseId = !!databaseId,
mdrr = if_else(is.infinite(.data$mdrr), as.numeric(NA), .data$mdrr),
unblind = as.integer(.data$unblind),
unblindForEvidenceSynthesis = as.integer(.data$unblindForEvidenceSynthesis))
# Add deprecated fields:
results <- results |>
mutate(attritionFraction = as.numeric(NA),
attritionDiagnostic = "NOT EVALUATED")
if (nrow(results) == 0) {
results <- createEmptyResult("cm_diagnostics_summary")
}
fileName <- file.path(exportFolder, "cm_diagnostics_summary.csv")
writeToCsv(results, fileName)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.