R/Export.R

Defines functions prepareKaplanMeier prepareKm exportDiagnostics exportProfiles calibrateInteractions calibrate exportMainResults enforceMinCellValue exportMetadata exportOutcomes exportExposures exportAnalyses exportResults

Documented in exportResults

# Copyright 2019 Observational Health Data Sciences and Informatics
#
# This file is part of Covid19SusceptibilityAlphaBlockers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#' Export all results to tables
#'
#' @description
#' Outputs all results to a folder called 'export', and zips them.
#'
#' @param outputFolder          Name of local folder to place results; make sure to use forward slashes
#'                              (/). Do not use a folder on a network drive since this greatly impacts
#'                              performance.
#' @param databaseId            A short string for identifying the database (e.g. 'Synpuf').
#' @param databaseName          The full name of the database.
#' @param databaseDescription   A short description (several sentences) of the database.
#' @param minCellCount          The minimum cell count for fields contains person counts or fractions.
#' @param maxCores              How many parallel cores should be used? If more cores are made
#'                              available this can speed up the analyses.
#'
#' @export
exportResults <- function(outputFolder,
                          databaseId,
                          databaseName,
                          databaseDescription,
                          minCellCount = 5,
                          maxCores) {
  exportFolder <- file.path(outputFolder, "export")
  if (!file.exists(exportFolder)) {
    dir.create(exportFolder, recursive = TRUE)
  }

  exportAnalyses(outputFolder = outputFolder,
                 exportFolder = exportFolder)

  exportExposures(outputFolder = outputFolder,
                  exportFolder = exportFolder)

  exportOutcomes(outputFolder = outputFolder,
                 exportFolder = exportFolder)

  exportMetadata(outputFolder = outputFolder,
                 exportFolder = exportFolder,
                 databaseId = databaseId,
                 databaseName = databaseName,
                 databaseDescription = databaseDescription,
                 minCellCount = minCellCount)

  exportMainResults(outputFolder = outputFolder,
                    exportFolder = exportFolder,
                    databaseId = databaseId,
                    minCellCount = minCellCount,
                    maxCores = maxCores)

  exportDiagnostics(outputFolder = outputFolder,
                    exportFolder = exportFolder,
                    databaseId = databaseId,
                    minCellCount = minCellCount,
                    maxCores = maxCores)

  exportProfiles(outputFolder = outputFolder,
                 exportFolder = exportFolder,
                 databaseId = databaseId,
                 minCellCount = minCellCount,
                 maxCores = maxCores)



  # Add all to zip file -------------------------------------------------------------------------------
  ParallelLogger::logInfo("Adding results to zip file")
  zipName <- file.path(exportFolder, sprintf("Results_%s.zip", databaseId))
  files <- list.files(exportFolder, pattern = ".*\\.csv$")
  oldWd <- setwd(exportFolder)
  on.exit(setwd(oldWd))
  DatabaseConnector::createZipFile(zipFile = zipName, files = files)
  ParallelLogger::logInfo("Results are ready for sharing at:", zipName)
}

exportAnalyses <- function(outputFolder, exportFolder) {
  ParallelLogger::logInfo("Exporting analyses")
  ParallelLogger::logInfo("- cohort_method_analysis table")

  tempFileName <- tempfile()

  cmAnalysisListFile <- system.file("settings",
                                    "cmAnalysisList.json",
                                    package = "Covid19SusceptibilityAlphaBlockers")
  cmAnalysisList <- CohortMethod::loadCmAnalysisList(cmAnalysisListFile)
  cmAnalysisToRow <- function(cmAnalysis) {
    ParallelLogger::saveSettingsToJson(cmAnalysis, tempFileName)
    row <- tibble::tibble(analysisId = cmAnalysis$analysisId,
                      description = cmAnalysis$description,
                      definition = readChar(tempFileName, file.info(tempFileName)$size))
    return(row)
  }
  cohortMethodAnalysis <- lapply(cmAnalysisList, cmAnalysisToRow)
  cohortMethodAnalysis <- do.call("rbind", cohortMethodAnalysis)
  cohortMethodAnalysis <- unique(cohortMethodAnalysis)
  unlink(tempFileName)
  colnames(cohortMethodAnalysis) <- SqlRender::camelCaseToSnakeCase(colnames(cohortMethodAnalysis))
  fileName <- file.path(exportFolder, "cohort_method_analysis.csv")
  readr::write_csv(cohortMethodAnalysis, fileName)


  ParallelLogger::logInfo("- covariate_analysis table")
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  getCovariateAnalyses <- function(cmAnalysis) {
    cmDataFolder <- reference$cohortMethodDataFile[reference$analysisId == cmAnalysis$analysisId][1]
    cmData <- CohortMethod::loadCohortMethodData(file.path(outputFolder, "cmOutput", cmDataFolder))
    if (!is.null(cmData$analysisRef)) {
      covariateAnalysis <- collect(cmData$analysisRef)
      covariateAnalysis <- covariateAnalysis[, c("analysisId", "analysisName")]
      colnames(covariateAnalysis) <- c("covariate_analysis_id", "covariate_analysis_name")
      covariateAnalysis$analysis_id <- cmAnalysis$analysisId
      return(covariateAnalysis)
    } else {
      return(data.frame(covariate_analysis_id = 1, covariate_analysis_name = "")[-1,])
    }
  }
  covariateAnalysis <- lapply(cmAnalysisList, getCovariateAnalyses)
  covariateAnalysis <- do.call("rbind", covariateAnalysis)
  fileName <- file.path(exportFolder, "covariate_analysis.csv")
  readr::write_csv(covariateAnalysis, fileName)
}

exportExposures <- function(outputFolder, exportFolder) {
  ParallelLogger::logInfo("Exporting exposures")
  ParallelLogger::logInfo("- exposure_of_interest table")
  pathToCsv <- system.file("settings", "TcosOfInterest.csv", package = "Covid19SusceptibilityAlphaBlockers")
  tcosOfInterest <- read.csv(pathToCsv, stringsAsFactors = FALSE)
  pathToCsv <- system.file("settings", "CohortsToCreate.csv", package = "Covid19SusceptibilityAlphaBlockers")
  cohortsToCreate <- read.csv(pathToCsv)
  createExposureRow <- function(exposureId) {
    atlasName <- as.character(cohortsToCreate$atlasName[cohortsToCreate$cohortId == exposureId])
    name <- as.character(cohortsToCreate$name[cohortsToCreate$cohortId == exposureId])
    cohortFileName <- system.file("cohorts", paste0(name, ".json"), package = "Covid19SusceptibilityAlphaBlockers")
    definition <- readChar(cohortFileName, file.info(cohortFileName)$size)
    return(tibble::tibble(exposureId = exposureId,
                      exposureName = atlasName,
                      definition = definition))
  }
  exposuresOfInterest <- unique(c(tcosOfInterest$targetId, tcosOfInterest$comparatorId))
  exposureOfInterest <- lapply(exposuresOfInterest, createExposureRow)
  exposureOfInterest <- do.call("rbind", exposureOfInterest)
  colnames(exposureOfInterest) <- SqlRender::camelCaseToSnakeCase(colnames(exposureOfInterest))
  fileName <- file.path(exportFolder, "exposure_of_interest.csv")
  readr::write_csv(exposureOfInterest, fileName)
}

exportOutcomes <- function(outputFolder, exportFolder) {
  ParallelLogger::logInfo("Exporting outcomes")
  ParallelLogger::logInfo("- outcome_of_interest table")
  pathToCsv <- system.file("settings", "CohortsToCreate.csv", package = "Covid19SusceptibilityAlphaBlockers")
  cohortsToCreate <- read.csv(pathToCsv)
  createOutcomeRow <- function(outcomeId) {
    atlasName <- as.character(cohortsToCreate$atlasName[cohortsToCreate$cohortId == outcomeId])
    name <- as.character(cohortsToCreate$name[cohortsToCreate$cohortId == outcomeId])
    cohortFileName <- system.file("cohorts", paste0(name, ".json"), package = "Covid19SusceptibilityAlphaBlockers")
    definition <- readChar(cohortFileName, file.info(cohortFileName)$size)
    return(tibble::tibble(outcomeId = outcomeId,
                      outcomeName = atlasName,
                      definition = definition))
  }
  outcomesOfInterest <- getOutcomesOfInterest()
  outcomeOfInterest <- lapply(outcomesOfInterest, createOutcomeRow)
  outcomeOfInterest <- do.call("rbind", outcomeOfInterest)
  colnames(outcomeOfInterest) <- SqlRender::camelCaseToSnakeCase(colnames(outcomeOfInterest))
  fileName <- file.path(exportFolder, "outcome_of_interest.csv")
  readr::write_csv(outcomeOfInterest, fileName)


  ParallelLogger::logInfo("- negative_control_outcome table")
  pathToCsv <- system.file("settings", "NegativeControls.csv", package = "Covid19SusceptibilityAlphaBlockers")
  negativeControls <- read.csv(pathToCsv)
  negativeControls <- negativeControls[tolower(negativeControls$type) == "outcome", ]
  negativeControls <- negativeControls[, c("outcomeId", "outcomeName")]
  colnames(negativeControls) <- SqlRender::camelCaseToSnakeCase(colnames(negativeControls))
  fileName <- file.path(exportFolder, "negative_control_outcome.csv")
  readr::write_csv(negativeControls, fileName)


  synthesisSummaryFile <- file.path(outputFolder, "SynthesisSummary.csv")
  if (file.exists(synthesisSummaryFile)) {
    positiveControls <- read.csv(synthesisSummaryFile, stringsAsFactors = FALSE)
    pathToCsv <- system.file("settings", "NegativeControls.csv", package = "Covid19SusceptibilityAlphaBlockers")
    negativeControls <- read.csv(pathToCsv)
    positiveControls <- merge(positiveControls,
                              negativeControls[, c("outcomeId", "outcomeName")])
    positiveControls$outcomeName <- paste0(positiveControls$outcomeName,
                                           ", RR = ",
                                           positiveControls$targetEffectSize)
    positiveControls <- positiveControls[, c("newOutcomeId",
                                             "outcomeName",
                                             "exposureId",
                                             "outcomeId",
                                             "targetEffectSize")]
    colnames(positiveControls) <- c("outcomeId",
                                    "outcomeName",
                                    "exposureId",
                                    "negativeControlId",
                                    "effectSize")
    colnames(positiveControls) <- SqlRender::camelCaseToSnakeCase(colnames(positiveControls))
    fileName <- file.path(exportFolder, "positive_control_outcome.csv")
    readr::write_csv(positiveControls, fileName)
  }
}

exportMetadata <- function(outputFolder,
                           exportFolder,
                           databaseId,
                           databaseName,
                           databaseDescription,
                           minCellCount) {
  ParallelLogger::logInfo("Exporting metadata")

  getInfo <- function(row) {
    cmData <- CohortMethod::loadCohortMethodData(file.path(outputFolder, "cmOutput", row$cohortMethodDataFile))
    info <- cmData$cohorts %>%
      group_by(.data$treatment) %>%
      summarise(minDate = min(.data$cohortStartDate, na.rm = TRUE),
                maxDate = max(.data$cohortStartDate, na.rm = TRUE)) %>%
      ungroup() %>%
      collect()

    info <- tibble::tibble(targetId = row$targetId,
                       comparatorId = row$comparatorId,
                           targetMinDate = info$minDate[info$treatment == 1],
                           targetMaxDate = info$maxDate[info$treatment == 1],
                           comparatorMinDate = info$minDate[info$treatment == 0],
                           comparatorMaxDate = info$maxDate[info$treatment == 0])
    info$comparisonMinDate <- min(info$targetMinDate, info$comparatorMinDate)
    info$comparisonMaxDate <- min(info$targetMaxDate, info$comparatorMaxDate)
    return(info)
  }
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  reference <- unique(reference[, c("targetId", "comparatorId", "cohortMethodDataFile")])
  reference <- split(reference, reference$cohortMethodDataFile)
  info <- lapply(reference, getInfo)
  info <- bind_rows(info)

  ParallelLogger::logInfo("- database table")
  database <- tibble::tibble(database_id = databaseId,
                         database_name = databaseName,
                         description = databaseDescription,
                         is_meta_analysis = 0)
  fileName <- file.path(exportFolder, "database.csv")
  readr::write_csv(database, fileName)

  ParallelLogger::logInfo("- exposure_summary table")
  minDates <- rbind(tibble::tibble(exposureId = info$targetId,
                                   minDate = info$targetMinDate),
                    tibble::tibble(exposureId = info$comparatorId,
                                   minDate = info$comparatorMinDate))
  minDates <- aggregate(minDate ~ exposureId, minDates, min)
  maxDates <- rbind(tibble::tibble(exposureId = info$targetId,
                               maxDate = info$targetMaxDate),
                    tibble::tibble(exposureId = info$comparatorId,
                               maxDate = info$comparatorMaxDate))
  maxDates <- aggregate(maxDate ~ exposureId, maxDates, max)
  exposureSummary <- merge(minDates, maxDates)
  exposureSummary$databaseId <- databaseId
  colnames(exposureSummary) <- SqlRender::camelCaseToSnakeCase(colnames(exposureSummary))
  fileName <- file.path(exportFolder, "exposure_summary.csv")
  readr::write_csv(exposureSummary, fileName)

  ParallelLogger::logInfo("- comparison_summary table")
  minDates <- aggregate(comparisonMinDate ~ targetId + comparatorId, info, min)
  maxDates <- aggregate(comparisonMaxDate ~ targetId + comparatorId, info, max)
  comparisonSummary <- merge(minDates, maxDates)
  comparisonSummary$databaseId <- databaseId
  colnames(comparisonSummary)[colnames(comparisonSummary) == "comparisonMinDate"] <- "minDate"
  colnames(comparisonSummary)[colnames(comparisonSummary) == "comparisonMaxDate"] <- "maxDate"
  colnames(comparisonSummary) <- SqlRender::camelCaseToSnakeCase(colnames(comparisonSummary))
  fileName <- file.path(exportFolder, "comparison_summary.csv")
  readr::write_csv(comparisonSummary, fileName)

  ParallelLogger::logInfo("- attrition table")
  fileName <- file.path(exportFolder, "attrition.csv")
  if (file.exists(fileName)) {
    unlink(fileName)
  }
  outcomesOfInterest <- getOutcomesOfInterest()
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  reference <- reference[reference$outcomeId %in% outcomesOfInterest, ]
  first <- !file.exists(fileName)
  pb <- txtProgressBar(style = 3)
  for (i in 1:nrow(reference)) {
    outcomeModel <- readRDS(file.path(outputFolder,
                                      "cmOutput",
                                      reference$outcomeModelFile[i]))
    attrition <- outcomeModel$attrition[, c("description", "targetPersons", "comparatorPersons")]
    attrition$sequenceNumber <- 1:nrow(attrition)
    attrition1 <- attrition[, c("sequenceNumber", "description", "targetPersons")]
    colnames(attrition1)[3] <- "subjects"
    attrition1$exposureId <- reference$targetId[i]
    attrition2 <- attrition[, c("sequenceNumber", "description", "comparatorPersons")]
    colnames(attrition2)[3] <- "subjects"
    attrition2$exposureId <- reference$comparatorId[i]
    attrition <- rbind(attrition1, attrition2)
    attrition$targetId <- reference$targetId[i]
    attrition$comparatorId <- reference$comparatorId[i]
    attrition$analysisId <- reference$analysisId[i]
    attrition$outcomeId <- reference$outcomeId[i]
    attrition$databaseId <- databaseId
    attrition <- attrition[, c("databaseId",
                               "exposureId",
                               "targetId",
                               "comparatorId",
                               "outcomeId",
                               "analysisId",
                               "sequenceNumber",
                               "description",
                               "subjects")]
    attrition <- enforceMinCellValue(attrition, "subjects", minCellCount, silent = TRUE)

    colnames(attrition) <- SqlRender::camelCaseToSnakeCase(colnames(attrition))
    write.table(x = attrition,
                file = fileName,
                row.names = FALSE,
                col.names = first,
                sep = ",",
                dec = ".",
                qmethod = "double",
                append = !first)
    first <- FALSE
    if (i %% 100 == 10) {
      setTxtProgressBar(pb, i/nrow(reference))
    }
  }
  setTxtProgressBar(pb, 1)
  close(pb)

  ParallelLogger::logInfo("- covariate table")
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  getCovariates <- function(analysisId) {
    cmDataFolder <- reference$cohortMethodDataFile[analysisId][1]
    cmData <- CohortMethod::loadCohortMethodData(file.path(outputFolder, "cmOutput", cmDataFolder))
    covariateRef <- collect(cmData$covariateRef)
    if (nrow(covariateRef) > 0) {
      covariateRef <- covariateRef[, c("covariateId", "covariateName", "analysisId")]
      colnames(covariateRef) <- c("covariateId", "covariateName", "covariateAnalysisId")
      covariateRef$analysisId <- analysisId
      return(covariateRef)
    } else {
      return(data.frame(analysisId = analysisId, covariateId = 1, covariateName = "", covariateAnalysisId = 1)[-1])
    }
  }
  covariates <- lapply(unique(reference$analysisId), getCovariates)
  covariates <- do.call("rbind", covariates)
  covariates$databaseId <- databaseId
  colnames(covariates) <- SqlRender::camelCaseToSnakeCase(colnames(covariates))
  fileName <- file.path(exportFolder, "covariate.csv")
  readr::write_csv(covariates, fileName)
  rm(covariates)  # Free up memory


  ParallelLogger::logInfo("- cm_follow_up_dist table")
  getResult <- function(i) {
    if (reference$strataFile[i] == "") {
      strataPop <- readRDS(file.path(outputFolder,
                                     "cmOutput",
                                     reference$studyPopFile[i]))
    } else {
      strataPop <- readRDS(file.path(outputFolder,
                                     "cmOutput",
                                     reference$strataFile[i]))
    }

    targetDist <- quantile(strataPop$survivalTime[strataPop$treatment == 1],
                           c(0, 0.1, 0.25, 0.5, 0.85, 0.9, 1))
    comparatorDist <- quantile(strataPop$survivalTime[strataPop$treatment == 0],
                               c(0, 0.1, 0.25, 0.5, 0.85, 0.9, 1))
    row <- tibble::tibble(target_id = reference$targetId[i],
                      comparator_id = reference$comparatorId[i],
                      outcome_id = reference$outcomeId[i],
                      analysis_id = reference$analysisId[i],
                      target_min_days = targetDist[1],
                      target_p10_days = targetDist[2],
                      target_p25_days = targetDist[3],
                      target_median_days = targetDist[4],
                      target_p75_days = targetDist[5],
                      target_p90_days = targetDist[6],
                      target_max_days = targetDist[7],
                      comparator_min_days = comparatorDist[1],
                      comparator_p10_days = comparatorDist[2],
                      comparator_p25_days = comparatorDist[3],
                      comparator_median_days = comparatorDist[4],
                      comparator_p75_days = comparatorDist[5],
                      comparator_p90_days = comparatorDist[6],
                      comparator_max_days = comparatorDist[7])
    return(row)
  }
  outcomesOfInterest <- getOutcomesOfInterest()
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  reference <- reference[reference$outcomeId %in% outcomesOfInterest, ]
  results <- plyr::llply(1:nrow(reference), getResult, .progress = "text")
  results <- do.call("rbind", results)
  results$database_id <- databaseId
  fileName <- file.path(exportFolder, "cm_follow_up_dist.csv")
  readr::write_csv(results, fileName)
  rm(results)  # Free up memory
}

enforceMinCellValue <- function(data, fieldName, minValues, silent = FALSE) {
  toCensor <- !is.na(pull(data, fieldName)) & pull(data, fieldName) < minValues & pull(data, fieldName) != 0
  if (!silent) {
    percent <- round(100 * sum(toCensor)/nrow(data), 1)
    ParallelLogger::logInfo("   censoring ",
                            sum(toCensor),
                            " values (",
                            percent,
                            "%) from ",
                            fieldName,
                            " because value below minimum")
  }
  if (length(minValues) == 1) {
    data[toCensor, fieldName] <- -minValues
  } else {
    data[toCensor, fieldName] <- -minValues[toCensor]
  }
  return(data)
}


exportMainResults <- function(outputFolder,
                              exportFolder,
                              databaseId,
                              minCellCount,
                              maxCores) {
  ParallelLogger::logInfo("Exporting main results")


  ParallelLogger::logInfo("- cohort_method_result table")
  analysesSum <- readr::read_csv(file.path(outputFolder, "analysisSummary.csv"), col_types = readr::cols())
  allControls <- getAllControls(outputFolder)
  ParallelLogger::logInfo("  Performing empirical calibration on main effects")
  cluster <- ParallelLogger::makeCluster(min(4, maxCores))
  subsets <- split(analysesSum,
                   paste(analysesSum$targetId, analysesSum$comparatorId, analysesSum$analysisId))
  rm(analysesSum)  # Free up memory
  results <- ParallelLogger::clusterApply(cluster,
                                          subsets,
                                          calibrate,
                                          allControls = allControls)
  ParallelLogger::stopCluster(cluster)
  mainEffects <- do.call("rbind", subsets)[, -c(2,4,6,8,9:20)]
  rm(subsets)  # Free up memory

  results <- do.call("rbind", results)
  results$databaseId <- databaseId
  results <- enforceMinCellValue(results, "targetSubjects", minCellCount)
  results <- enforceMinCellValue(results, "comparatorSubjects", minCellCount)
  results <- enforceMinCellValue(results, "targetOutcomes", minCellCount)
  results <- enforceMinCellValue(results, "comparatorOutcomes", minCellCount)
  colnames(results) <- SqlRender::camelCaseToSnakeCase(colnames(results))
  fileName <- file.path(exportFolder, "cohort_method_result.csv")
  readr::write_csv(results, fileName)
  rm(results)  # Free up memory

  # Handle main / interaction effects
  if (ncol(mainEffects) > 4) {
    ParallelLogger::logInfo("- cm_main_effect_result table")
    keyCol <- "estimate"
    valueCol <- "value"
    gatherCols <- names(mainEffects)[5:length(names(mainEffects))]

    longTable <- tidyr::gather_(mainEffects, keyCol, valueCol, gatherCols)
    longTable$label <- as.numeric(sub(".*I", "", longTable$estimate))
    longTable$estimate <- sub("I.*", "", longTable$estimate)
    uniqueCovariates <- unique(longTable$label)
    mainEffects <- tidyr::spread(longTable, estimate, value)
    mainEffects <- mainEffects[!is.na(mainEffects$logRr),]
    mainEffects <- data.frame(
      databaseId = databaseId,
      analysisId = mainEffects$analysisId,
      targetId = mainEffects$targetId,
      comparatorId = mainEffects$comparatorId,
      outcomeId = mainEffects$outcomeId,
      covariateId = mainEffects$label,
      coefficient = mainEffects$logRr,
      ci95lb = log(mainEffects$ci95lb),
      ci95ub = log(mainEffects$ci95ub),
      se = mainEffects$seLogRr
    )
    colnames(mainEffects) <- SqlRender::camelCaseToSnakeCase(colnames(mainEffects))
    fileName <- file.path(exportFolder, "cm_main_effects_result.csv")
    write.csv(mainEffects, fileName, row.names = FALSE)
    rm(mainEffects)  # Free up memory
  }

  ParallelLogger::logInfo("- cm_interaction_result table")
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  loadInteractionsFromOutcomeModel <- function(i) {
    outcomeModel <- readRDS(file.path(outputFolder,
                                      "cmOutput",
                                      reference$outcomeModelFile[i]))
    if ("subgroupCounts" %in% names(outcomeModel)) {
      rows <- tibble::tibble(targetId = reference$targetId[i],
                         comparatorId = reference$comparatorId[i],
                         outcomeId = reference$outcomeId[i],
                         analysisId = reference$analysisId[i],
                         interactionCovariateId = outcomeModel$subgroupCounts$subgroupCovariateId,
                         rrr = NA,
                         ci95Lb = NA,
                         ci95Ub = NA,
                         p = NA,
                         i2 = NA,
                         logRrr = NA,
                         seLogRrr = NA,
                         targetSubjects = outcomeModel$subgroupCounts$targetPersons,
                         comparatorSubjects = outcomeModel$subgroupCounts$comparatorPersons,
                         targetDays = outcomeModel$subgroupCounts$targetDays,
                         comparatorDays = outcomeModel$subgroupCounts$comparatorDays,
                         targetOutcomes = outcomeModel$subgroupCounts$targetOutcomes,
                         comparatorOutcomes = outcomeModel$subgroupCounts$comparatorOutcomes)
      if ("outcomeModelInteractionEstimates" %in% names(outcomeModel)) {
        idx <- match(outcomeModel$outcomeModelInteractionEstimates$covariateId,
                     rows$interactionCovariateId)
        rows$rrr[idx] <- exp(outcomeModel$outcomeModelInteractionEstimates$logRr)
        rows$ci95Lb[idx] <- exp(outcomeModel$outcomeModelInteractionEstimates$logLb95)
        rows$ci95Ub[idx] <- exp(outcomeModel$outcomeModelInteractionEstimates$logUb95)
        rows$logRrr[idx] <- outcomeModel$outcomeModelInteractionEstimates$logRr
        rows$seLogRrr[idx] <- outcomeModel$outcomeModelInteractionEstimates$seLogRr
        z <- rows$logRrr[idx]/rows$seLogRrr[idx]
        rows$p[idx] <- 2 * pmin(pnorm(z), 1 - pnorm(z))
      }
      return(rows)
    } else {
      return(NULL)
    }

  }
  interactions <- plyr::llply(1:nrow(reference),
                              loadInteractionsFromOutcomeModel,
                              .progress = "text")
  interactions <- bind_rows(interactions)
  if (nrow(interactions) > 0) {
    ParallelLogger::logInfo("  Performing empirical calibration on interaction effects")
    allControls <- getAllControls(outputFolder)
    negativeControls <- allControls[allControls$targetEffectSize == 1, ]
    cluster <- ParallelLogger::makeCluster(min(4, maxCores))
    subsets <- split(interactions,
                     paste(interactions$targetId, interactions$comparatorId, interactions$analysisId))
    interactions <- ParallelLogger::clusterApply(cluster,
                                                 subsets,
                                                 calibrateInteractions,
                                                 negativeControls = negativeControls)
    ParallelLogger::stopCluster(cluster)
    rm(subsets)  # Free up memory
    interactions <- bind_rows(interactions)
    interactions$databaseId <- databaseId

    interactions <- enforceMinCellValue(interactions, "targetSubjects", minCellCount)
    interactions <- enforceMinCellValue(interactions, "comparatorSubjects", minCellCount)
    interactions <- enforceMinCellValue(interactions, "targetOutcomes", minCellCount)
    interactions <- enforceMinCellValue(interactions, "comparatorOutcomes", minCellCount)
    colnames(interactions) <- SqlRender::camelCaseToSnakeCase(colnames(interactions))
    fileName <- file.path(exportFolder, "cm_interaction_result.csv")
    readr::write_csv(interactions, fileName)
    rm(interactions)  # Free up memory
  }
}

calibrate <- function(subset, allControls) {
  ncs <- subset[subset$outcomeId %in% allControls$outcomeId[allControls$targetEffectSize == 1], ]
  ncs <- ncs[!is.na(ncs$seLogRr), ]
  if (nrow(ncs) > 5) {
    null <- EmpiricalCalibration::fitMcmcNull(ncs$logRr, ncs$seLogRr)
    calibratedP <- EmpiricalCalibration::calibrateP(null = null,
                                                    logRr = subset$logRr,
                                                    seLogRr = subset$seLogRr)
    subset$calibratedP <- calibratedP$p
  } else {
    subset$calibratedP <- rep(NA, nrow(subset))
  }
  pcs <- subset[subset$outcomeId %in% allControls$outcomeId[allControls$targetEffectSize != 1], ]
  pcs <- pcs[!is.na(pcs$seLogRr), ]
  if (nrow(pcs) > 5) {
    controls <- merge(subset, allControls[, c("targetId", "comparatorId", "outcomeId", "targetEffectSize")])
    model <- EmpiricalCalibration::fitSystematicErrorModel(logRr = controls$logRr,
                                                           seLogRr = controls$seLogRr,
                                                           trueLogRr = log(controls$targetEffectSize),
                                                           estimateCovarianceMatrix = FALSE)
    calibratedCi <- EmpiricalCalibration::calibrateConfidenceInterval(logRr = subset$logRr,
                                                                      seLogRr = subset$seLogRr,
                                                                      model = model)
    subset$calibratedRr <- exp(calibratedCi$logRr)
    subset$calibratedCi95Lb <- exp(calibratedCi$logLb95Rr)
    subset$calibratedCi95Ub <- exp(calibratedCi$logUb95Rr)
    subset$calibratedLogRr <- calibratedCi$logRr
    subset$calibratedSeLogRr <- calibratedCi$seLogRr
  } else {
    subset$calibratedRr <- rep(NA, nrow(subset))
    subset$calibratedCi95Lb <- rep(NA, nrow(subset))
    subset$calibratedCi95Ub <- rep(NA, nrow(subset))
    subset$calibratedLogRr <- rep(NA, nrow(subset))
    subset$calibratedSeLogRr <- rep(NA, nrow(subset))
  }
  subset$i2 <- rep(NA, nrow(subset))
  subset <- subset[, c("targetId",
                       "comparatorId",
                       "outcomeId",
                       "analysisId",
                       "rr",
                       "ci95lb",
                       "ci95ub",
                       "p",
                       "i2",
                       "logRr",
                       "seLogRr",
                       "target",
                       "comparator",
                       "targetDays",
                       "comparatorDays",
                       "eventsTarget",
                       "eventsComparator",
                       "calibratedP",
                       "calibratedRr",
                       "calibratedCi95Lb",
                       "calibratedCi95Ub",
                       "calibratedLogRr",
                       "calibratedSeLogRr")]
  colnames(subset) <- c("targetId",
                        "comparatorId",
                        "outcomeId",
                        "analysisId",
                        "rr",
                        "ci95Lb",
                        "ci95Ub",
                        "p",
                        "i2",
                        "logRr",
                        "seLogRr",
                        "targetSubjects",
                        "comparatorSubjects",
                        "targetDays",
                        "comparatorDays",
                        "targetOutcomes",
                        "comparatorOutcomes",
                        "calibratedP",
                        "calibratedRr",
                        "calibratedCi95Lb",
                        "calibratedCi95Ub",
                        "calibratedLogRr",
                        "calibratedSeLogRr")
  return(subset)
}

calibrateInteractions <- function(subset, negativeControls) {
  ncs <- subset[subset$outcomeId %in% negativeControls$outcomeId, ]
  ncs <- ncs[!is.na(pull(ncs, .data$seLogRrr)), ]
  if (nrow(ncs) > 5) {
    null <- EmpiricalCalibration::fitMcmcNull(ncs$logRrr, ncs$seLogRrr)
    calibratedP <- EmpiricalCalibration::calibrateP(null = null,
                                                    logRr = subset$logRrr,
                                                    seLogRr = subset$seLogRrr)
    subset$calibratedP <- calibratedP$p
  } else {
    subset$calibratedP <- rep(NA, nrow(subset))
  }
  return(subset)
}

exportProfiles <- function(outputFolder,
                              exportFolder,
                              databaseId,
                              minCellCount,
                              maxCores) {
  ParallelLogger::logInfo("Exporting profiles")
  fileName <- file.path(exportFolder, "outcome_profile.csv")
  if (file.exists(fileName)) {
    unlink(fileName)
  }
  first <- TRUE
  profileFolder <- file.path(outputFolder, "profile")
  files <- list.files(profileFolder, pattern = "prof_.*.rds", full.names = TRUE)
  pb <- txtProgressBar(style = 3)
  if (length(files) > 0) {
    for (i in 1:length(files)) {
      ids <- gsub("^.*prof_t", "", files[i])
      targetId <- as.numeric(gsub("_c.*", "", ids))
      ids <- gsub("^.*_c", "", ids)
      comparatorId <- as.numeric(gsub("_[aso].*$", "", ids))
      if (grepl("_s", ids)) {
        subgroupId <- as.numeric(gsub("^.*_s", "", gsub("_a[0-9]*.rds", "", ids)))
      } else {
        subgroupId <- NA
      }
      if (grepl("_o", ids)) {
        outcomeId <- as.numeric(gsub("^.*_o", "", gsub("_a[0-9]*.rds", "", ids)))
      } else {
        outcomeId <- NA
      }
      ids <- gsub("^.*_a", "", ids)
      analysisId <- as.numeric(gsub(".rds", "", ids))

      profile <- readRDS(files[i])
      profile$targetId <- targetId
      profile$comparatorId <- comparatorId
      profile$outcomeId <- outcomeId
      profile$analysisId <- analysisId
      profile$databaseId <- databaseId

      colnames(profile) <- SqlRender::camelCaseToSnakeCase(colnames(profile))
      write.table(x = profile,
                  file = fileName,
                  row.names = FALSE,
                  col.names = first,
                  sep = ",",
                  dec = ".",
                  qmethod = "double",
                  append = !first)
      first <- FALSE
      setTxtProgressBar(pb, i/length(files))
    }
  }
  close(pb)
}


exportDiagnostics <- function(outputFolder,
                              exportFolder,
                              databaseId,
                              minCellCount,
                              maxCores) {
  ParallelLogger::logInfo("Exporting diagnostics")
  ParallelLogger::logInfo("- covariate_balance table")
  fileName <- file.path(exportFolder, "covariate_balance.csv")
  if (file.exists(fileName)) {
    unlink(fileName)
  }
  first <- TRUE
  balanceFolder <- file.path(outputFolder, "balance")
  files <- list.files(balanceFolder, pattern = "bal_.*.rds", full.names = TRUE)
  pb <- txtProgressBar(style = 3)
  if (length(files) > 0) {
    for (i in 1:length(files)) {
      ids <- gsub("^.*bal_t", "", files[i])
      targetId <- as.numeric(gsub("_c.*", "", ids))
      ids <- gsub("^.*_c", "", ids)
      comparatorId <- as.numeric(gsub("_[aso].*$", "", ids))
      if (grepl("_s", ids)) {
        subgroupId <- as.numeric(gsub("^.*_s", "", gsub("_a[0-9]*.rds", "", ids)))
      } else {
        subgroupId <- NA
      }
      if (grepl("_o", ids)) {
        outcomeId <- as.numeric(gsub("^.*_o", "", gsub("_a[0-9]*.rds", "", ids)))
      } else {
        outcomeId <- NA
      }
      ids <- gsub("^.*_a", "", ids)
      analysisId <- as.numeric(gsub(".rds", "", ids))
      balance <- readRDS(files[i])
      inferredTargetBeforeSize <- mean(balance$beforeMatchingSumTarget/balance$beforeMatchingMeanTarget,
                                       na.rm = TRUE)
      inferredComparatorBeforeSize <- mean(balance$beforeMatchingSumComparator/balance$beforeMatchingMeanComparator,
                                           na.rm = TRUE)
      inferredTargetAfterSize <- mean(balance$afterMatchingSumTarget/balance$afterMatchingMeanTarget,
                                      na.rm = TRUE)
      inferredComparatorAfterSize <- mean(balance$afterMatchingSumComparator/balance$afterMatchingMeanComparator,
                                          na.rm = TRUE)

      balance$databaseId <- databaseId
      balance$targetId <- targetId
      balance$comparatorId <- comparatorId
      balance$outcomeId <- outcomeId
      balance$analysisId <- analysisId
      balance$interactionCovariateId <- subgroupId
      balance <- balance[, c("databaseId",
                             "targetId",
                             "comparatorId",
                             "outcomeId",
                             "analysisId",
                             "interactionCovariateId",
                             "covariateId",
                             "beforeMatchingMeanTarget",
                             "beforeMatchingMeanComparator",
                             "beforeMatchingStdDiff",
                             "afterMatchingMeanTarget",
                             "afterMatchingMeanComparator",
                             "afterMatchingStdDiff")]
      colnames(balance) <- c("databaseId",
                             "targetId",
                             "comparatorId",
                             "outcomeId",
                             "analysisId",
                             "interactionCovariateId",
                             "covariateId",
                             "targetMeanBefore",
                             "comparatorMeanBefore",
                             "stdDiffBefore",
                             "targetMeanAfter",
                             "comparatorMeanAfter",
                             "stdDiffAfter")
      balance$targetMeanBefore[is.na(balance$targetMeanBefore)] <- 0
      balance$comparatorMeanBefore[is.na(balance$comparatorMeanBefore)] <- 0
      balance$stdDiffBefore <- round(balance$stdDiffBefore, 3)
      balance$targetMeanAfter[is.na(balance$targetMeanAfter)] <- 0
      balance$comparatorMeanAfter[is.na(balance$comparatorMeanAfter)] <- 0

      balance$targetSizeBefore <- inferredTargetBeforeSize
      balance$targetSizeBefore[is.na(inferredTargetBeforeSize)] <- 0

      balance$comparatorSizeBefore <- inferredComparatorBeforeSize
      balance$comparatorSizeBefore[is.na(inferredComparatorBeforeSize)] <- 0

      balance$targetSizeAfter <- inferredTargetAfterSize
      balance$targetSizeAfter[is.na(inferredTargetAfterSize)] <- 0

      balance$comparatorSizeAfter <- inferredComparatorAfterSize
      balance$comparatorSizeAfter[is.na(inferredComparatorAfterSize)] <- 0

      balance$stdDiffAfter <- round(balance$stdDiffAfter, 3)
      balance <- enforceMinCellValue(balance,
                                     "targetMeanBefore",
                                     minCellCount/inferredTargetBeforeSize,
                                     TRUE)
      balance <- enforceMinCellValue(balance,
                                     "comparatorMeanBefore",
                                     minCellCount/inferredComparatorBeforeSize,
                                     TRUE)
      balance <- enforceMinCellValue(balance,
                                     "targetMeanAfter",
                                     minCellCount/inferredTargetAfterSize,
                                     TRUE)
      balance <- enforceMinCellValue(balance,
                                     "comparatorMeanAfter",
                                     minCellCount/inferredComparatorAfterSize,
                                     TRUE)

      balance <- enforceMinCellValue(balance,
                                     "targetSizeBefore",
                                     minCellCount,
                                     TRUE)
      balance <- enforceMinCellValue(balance,
                                     "targetSizeAfter",
                                     minCellCount,
                                     TRUE)
      balance <- enforceMinCellValue(balance,
                                     "comparatorSizeBefore",
                                     minCellCount,
                                     TRUE)
      balance <- enforceMinCellValue(balance,
                                     "comparatorSizeAfter",
                                     minCellCount,
                                     TRUE)

      balance$targetMeanBefore <- round(balance$targetMeanBefore, 3)
      balance$comparatorMeanBefore <- round(balance$comparatorMeanBefore, 3)
      balance$targetMeanAfter <- round(balance$targetMeanAfter, 3)
      balance$comparatorMeanAfter <- round(balance$comparatorMeanAfter, 3)

      balance$targetSizeBefore <- round(balance$targetSizeBefore, 0)
      balance$comparatorSizeBefore <- round(balance$comparatorSizeBefore, 0)
      balance$targetSizeAfter <- round(balance$targetSizeAfter, 0)
      balance$comparatorSizeAfter <- round(balance$comparatorSizeAfter, 0)

      # balance <- balance[balance$targetMeanBefore != 0 & balance$comparatorMeanBefore != 0 & balance$targetMeanAfter !=
      #                      0 & balance$comparatorMeanAfter != 0 & balance$stdDiffBefore != 0 & balance$stdDiffAfter !=
      #                      0, ]
      balance <- balance[!is.na(balance$targetId), ]
      colnames(balance) <- SqlRender::camelCaseToSnakeCase(colnames(balance))
      write.table(x = balance,
                  file = fileName,
                  row.names = FALSE,
                  col.names = first,
                  sep = ",",
                  dec = ".",
                  qmethod = "double",
                  append = !first)
      first <- FALSE
      setTxtProgressBar(pb, i/length(files))
    }
  }
  close(pb)

  ParallelLogger::logInfo("- preference_score_dist table")
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  preparePlot <- function(row, reference) {
    idx <- reference$analysisId == row$analysisId &
      reference$targetId == row$targetId &
      reference$comparatorId == row$comparatorId
    psFileName <- file.path(outputFolder,
                            "cmOutput",
                            reference$sharedPsFile[idx][1])
    if (file.exists(psFileName)) {
      ps <- readRDS(psFileName)
      if (length(unique(ps$treatment)) == 2 &&
          min(ps$propensityScore) < max(ps$propensityScore)) {
        ps <- CohortMethod:::computePreferenceScore(ps)

        pop1 <- ps$preferenceScore[ps$treatment == 1]
        pop0 <- ps$preferenceScore[ps$treatment == 0]

        bw1 <- ifelse(length(pop1) > 1, "nrd0", 0.1)
        bw0 <- ifelse(length(pop0) > 1, "nrd0", 0.1)

        d1 <- density(pop1, bw = bw1, from = 0, to = 1, n = 100)
        d0 <- density(pop0, bw = bw0, from = 0, to = 1, n = 100)

        result <- tibble::tibble(databaseId = databaseId,
                             targetId = row$targetId,
                             comparatorId = row$comparatorId,
                             analysisId = row$analysisId,
                             preferenceScore = d1$x,
                             targetDensity = d1$y,
                             comparatorDensity = d0$y)
        return(result)
      }
    }
    return(NULL)
  }
  subset <- unique(reference[reference$sharedPsFile != "",
                             c("targetId", "comparatorId", "analysisId")])
  data <- plyr::llply(split(subset, 1:nrow(subset)),
                      preparePlot,
                      reference = reference,
                      .progress = "text")
  data <- do.call("rbind", data)
  fileName <- file.path(exportFolder, "preference_score_dist.csv")
  if (!is.null(data)) {
    colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data))
  }
  readr::write_csv(data, fileName)


  ParallelLogger::logInfo("- propensity_model table")
  getPsModel <- function(row, reference) {
    idx <- reference$analysisId == row$analysisId &
      reference$targetId == row$targetId &
      reference$comparatorId == row$comparatorId
    psFileName <- file.path(outputFolder,
                            "cmOutput",
                            reference$sharedPsFile[idx][1])
    if (file.exists(psFileName)) {
      ps <- readRDS(psFileName)
      metaData <- attr(ps, "metaData")
      if (is.null(metaData$psError)) {
        cmDataFile <- file.path(outputFolder,
                                "cmOutput",
                                reference$cohortMethodDataFile[idx][1])
        cmData <- CohortMethod::loadCohortMethodData(cmDataFile)
        model <- CohortMethod::getPsModel(ps, cmData)
        model$covariateId[is.na(model$covariateId)] <- 0
        Andromeda::close(cmData)
        model$databaseId <- databaseId
        model$targetId <- row$targetId
        model$comparatorId <- row$comparatorId
        model$analysisId <- row$analysisId
        model <- model[, c("databaseId", "targetId", "comparatorId", "analysisId", "covariateId", "coefficient")]
        return(model)
      }
    }
    return(NULL)
  }
  subset <- unique(reference[reference$sharedPsFile != "",
                             c("targetId", "comparatorId", "analysisId")])
  data <- plyr::llply(split(subset, 1:nrow(subset)),
                      getPsModel,
                      reference = reference,
                      .progress = "text")
  data <- do.call("rbind", data)
  fileName <- file.path(exportFolder, "propensity_model.csv")
  if (!is.null(data)) {
    colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data))
  }
  readr::write_csv(data, fileName)


  ParallelLogger::logInfo("- kaplan_meier_dist table")
  ParallelLogger::logInfo("  Computing KM curves")
  reference <- readRDS(file.path(outputFolder, "cmOutput", "outcomeModelReference.rds"))
  outcomesOfInterest <- getOutcomesOfInterest()
  reference <- reference[reference$outcomeId %in% outcomesOfInterest, ]
  reference <- reference[, c("strataFile",
                             "studyPopFile",
                             "targetId",
                             "comparatorId",
                             "outcomeId",
                             "analysisId")]
  tempFolder <- file.path(exportFolder, "temp")
  if (!file.exists(tempFolder)) {
    dir.create(tempFolder)
  }
  cluster <- ParallelLogger::makeCluster(min(4, maxCores))
  tasks <- split(reference, seq(nrow(reference)))
  ParallelLogger::clusterApply(cluster,
                               tasks,
                               prepareKm,
                               outputFolder = outputFolder,
                               tempFolder = tempFolder,
                               databaseId = databaseId,
                               minCellCount = minCellCount)
  ParallelLogger::stopCluster(cluster)
  ParallelLogger::logInfo("  Writing to single csv file")
  saveKmToCsv <- function(file, first, outputFile) {
    data <- readRDS(file)
    if (!is.null(data)) {
      colnames(data) <- SqlRender::camelCaseToSnakeCase(colnames(data))
    }
    write.table(x = data,
                file = outputFile,
                row.names = FALSE,
                col.names = first,
                sep = ",",
                dec = ".",
                qmethod = "double",
                append = !first)
  }
  outputFile <- file.path(exportFolder, "kaplan_meier_dist.csv")
  files <- list.files(tempFolder, "km_.*.rds", full.names = TRUE)
  if (length(files) > 0) {
    saveKmToCsv(files[1], first = TRUE, outputFile = outputFile)
    if (length(files) > 1) {
      plyr::l_ply(files[2:length(files)], saveKmToCsv, first = FALSE, outputFile = outputFile, .progress = "text")
    }
  }
  unlink(tempFolder, recursive = TRUE)
}

prepareKm <- function(task,
                      outputFolder,
                      tempFolder,
                      databaseId,
                      minCellCount) {
  ParallelLogger::logTrace("Preparing KM plot for target ",
                           task$targetId,
                           ", comparator ",
                           task$comparatorId,
                           ", outcome ",
                           task$outcomeId,
                           ", analysis ",
                           task$analysisId)
  outputFileName <- file.path(tempFolder, sprintf("km_t%s_c%s_o%s_a%s.rds",
                                                  task$targetId,
                                                  task$comparatorId,
                                                  task$outcomeId,
                                                  task$analysisId))
  if (file.exists(outputFileName)) {
    return(NULL)
  }
  popFile <- task$strataFile
  if (popFile == "") {
    popFile <- task$studyPopFile
  }
  population <- readRDS(file.path(outputFolder,
                                  "cmOutput",
                                  popFile))
  if (nrow(population) == 0  || length(unique(population$treatment)) != 2) {
    # Can happen when matching and treatment is predictable
    return(NULL)
  }
  data <- prepareKaplanMeier(population)
  if (is.null(data)) {
    # No shared strata
    return(NULL)
  }
  data$targetId <- task$targetId
  data$comparatorId <- task$comparatorId
  data$outcomeId <- task$outcomeId
  data$analysisId <- task$analysisId
  data$databaseId <- databaseId
  data <- enforceMinCellValue(data, "targetAtRisk", minCellCount)
  data <- enforceMinCellValue(data, "comparatorAtRisk", minCellCount)
  saveRDS(data, outputFileName)
}

prepareKaplanMeier <- function(population) {
  dataCutoff <- 0.9
  population$y <- 0
  population$y[population$outcomeCount != 0] <- 1
  if (is.null(population$stratumId) || length(unique(population$stratumId)) == nrow(population)/2) {
    sv <- survival::survfit(survival::Surv(survivalTime, y) ~ treatment, population, conf.int = TRUE)
    idx <- summary(sv, censored = T)$strata == "treatment=1"
    survTarget <- tibble::tibble(time = sv$time[idx],
                             targetSurvival = sv$surv[idx],
                             targetSurvivalLb = sv$lower[idx],
                             targetSurvivalUb = sv$upper[idx])
    idx <- summary(sv, censored = T)$strata == "treatment=0"
    survComparator <- tibble::tibble(time = sv$time[idx],
                                 comparatorSurvival = sv$surv[idx],
                                 comparatorSurvivalLb = sv$lower[idx],
                                 comparatorSurvivalUb = sv$upper[idx])
    data <- merge(survTarget, survComparator, all = TRUE)
  } else {
    population$stratumSizeT <- 1
    strataSizesT <- aggregate(stratumSizeT ~ stratumId, population[population$treatment == 1, ], sum)
    if (max(strataSizesT$stratumSizeT) == 1) {
      # variable ratio matching: use propensity score to compute IPTW
      if (is.null(population$propensityScore)) {
        stop("Variable ratio matching detected, but no propensity score found")
      }
      weights <- aggregate(propensityScore ~ stratumId, population, mean)
      if (max(weights$propensityScore) > 0.99999) {
        return(NULL)
      }
      weights$weight <- weights$propensityScore / (1 - weights$propensityScore)
    } else {
      # stratification: infer probability of treatment from subject counts
      strataSizesC <- aggregate(stratumSizeT ~ stratumId, population[population$treatment == 0, ], sum)
      colnames(strataSizesC)[2] <- "stratumSizeC"
      weights <- merge(strataSizesT, strataSizesC)
      if (nrow(weights) == 0) {
        warning("No shared strata between target and comparator")
        return(NULL)
      }
      weights$weight <- weights$stratumSizeT/weights$stratumSizeC
    }
    population <- merge(population, weights[, c("stratumId", "weight")])
    population$weight[population$treatment == 1] <- 1
    idx <- population$treatment == 1
    survTarget <- CohortMethod:::adjustedKm(weight = population$weight[idx],
                                            time = population$survivalTime[idx],
                                            y = population$y[idx])
    survTarget$targetSurvivalUb <- survTarget$s^exp(qnorm(0.975)/log(survTarget$s) * sqrt(survTarget$var)/survTarget$s)
    survTarget$targetSurvivalLb <- survTarget$s^exp(qnorm(0.025)/log(survTarget$s) * sqrt(survTarget$var)/survTarget$s)
    survTarget$targetSurvivalLb[survTarget$s > 0.9999] <- survTarget$s[survTarget$s > 0.9999]
    survTarget$targetSurvival <- survTarget$s
    survTarget$s <- NULL
    survTarget$var <- NULL
    idx <- population$treatment == 0
    survComparator <- CohortMethod:::adjustedKm(weight = population$weight[idx],
                                                time = population$survivalTime[idx],
                                                y = population$y[idx])
    survComparator$comparatorSurvivalUb <- survComparator$s^exp(qnorm(0.975)/log(survComparator$s) *
                                                                  sqrt(survComparator$var)/survComparator$s)
    survComparator$comparatorSurvivalLb <- survComparator$s^exp(qnorm(0.025)/log(survComparator$s) *
                                                                  sqrt(survComparator$var)/survComparator$s)
    survComparator$comparatorSurvivalLb[survComparator$s > 0.9999] <- survComparator$s[survComparator$s >
                                                                                         0.9999]
    survComparator$comparatorSurvival <- survComparator$s
    survComparator$s <- NULL
    survComparator$var <- NULL
    data <- merge(survTarget, survComparator, all = TRUE)
  }
  data <- data[, c("time", "targetSurvival", "targetSurvivalLb", "targetSurvivalUb", "comparatorSurvival", "comparatorSurvivalLb", "comparatorSurvivalUb")]
  cutoff <- quantile(population$survivalTime, dataCutoff)
  data <- data[data$time <= cutoff, ]
  if (cutoff <= 300) {
    xBreaks <- seq(0, cutoff, by = 50)
  } else if (cutoff <= 600) {
    xBreaks <- seq(0, cutoff, by = 100)
  } else {
    xBreaks <- seq(0, cutoff, by = 250)
  }

  targetAtRisk <- c()
  comparatorAtRisk <- c()
  for (xBreak in xBreaks) {
    targetAtRisk <- c(targetAtRisk,
                      sum(population$treatment == 1 & population$survivalTime >= xBreak))
    comparatorAtRisk <- c(comparatorAtRisk,
                          sum(population$treatment == 0 & population$survivalTime >=
                                xBreak))
  }
  data <- merge(data, tibble::tibble(time = xBreaks,
                                 targetAtRisk = targetAtRisk,
                                 comparatorAtRisk = comparatorAtRisk), all = TRUE)
  if (is.na(data$targetSurvival[1])) {
    data$targetSurvival[1] <- 1
    data$targetSurvivalUb[1] <- 1
    data$targetSurvivalLb[1] <- 1
  }
  if (is.na(data$comparatorSurvival[1])) {
    data$comparatorSurvival[1] <- 1
    data$comparatorSurvivalUb[1] <- 1
    data$comparatorSurvivalLb[1] <- 1
  }
  idx <- which(is.na(data$targetSurvival))
  while (length(idx) > 0) {
    data$targetSurvival[idx] <- data$targetSurvival[idx - 1]
    data$targetSurvivalLb[idx] <- data$targetSurvivalLb[idx - 1]
    data$targetSurvivalUb[idx] <- data$targetSurvivalUb[idx - 1]
    idx <- which(is.na(data$targetSurvival))
  }
  idx <- which(is.na(data$comparatorSurvival))
  while (length(idx) > 0) {
    data$comparatorSurvival[idx] <- data$comparatorSurvival[idx - 1]
    data$comparatorSurvivalLb[idx] <- data$comparatorSurvivalLb[idx - 1]
    data$comparatorSurvivalUb[idx] <- data$comparatorSurvivalUb[idx - 1]
    idx <- which(is.na(data$comparatorSurvival))
  }
  data$targetSurvival <- round(data$targetSurvival, 4)
  data$targetSurvivalLb <- round(data$targetSurvivalLb, 4)
  data$targetSurvivalUb <- round(data$targetSurvivalUb, 4)
  data$comparatorSurvival <- round(data$comparatorSurvival, 4)
  data$comparatorSurvivalLb <- round(data$comparatorSurvivalLb, 4)
  data$comparatorSurvivalUb <- round(data$comparatorSurvivalUb, 4)

  # Remove duplicate (except time) entries:
  data <- data[order(data$time), ]
  data <- data[!duplicated(data[, -1]), ]
  return(data)
}
ohdsi-studies/Covid19SusceptibilityAlphaBlockers documentation built on Dec. 25, 2021, 4:45 p.m.