# @file RunAnalyses.R
#
# Copyright 2022 Observational Health Data Sciences and Informatics
#
# This file is part of IcTemporalPatternDiscovery
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#' Run a list of analyses
#'
#' @details
#' Run a list of analyses for the exposure-outcomes of interest. This function will run all specified
#' analyses against all hypotheses of interest, meaning that the total number of outcome models is
#' `length(ictpdAnalysisList) * length(exposureOutcomeList)`. When you provide several analyses it
#' will determine whether any of the analyses have anything in common, and will take advantage of this
#' fact. For example, if we specify several analyses that only differ in the way the outcome model is
#' fitted, then this function will extract the data and fit the propensity model only once, and re-use
#' this in all the analysis.
#'
#' @param connectionDetails An R object of type \code{connectionDetails} created using the
#' function \code{createConnectionDetails} in the
#' \code{DatabaseConnector} package.
#' @param cdmDatabaseSchema The name of the database schema that contains the OMOP CDM
#' instance. Requires read permissions to this database. On SQL
#' Server, this should specifiy both the database and the schema,
#' so for example 'cdm_instance.dbo'.
#' @param oracleTempSchema For Oracle only: the name of the database schema where you
#' want all temporary tables to be managed. Requires
#' create/insert permissions to this database.
#' @param exposureDatabaseSchema The name of the database schema that is the location where the
#' exposure data used to define the exposure cohorts is
#' available. If exposureTable = DRUG_ERA, exposureDatabaseSchema
#' is not used by assumed to be cdmSchema. Requires read
#' permissions to this database.
#' @param exposureTable The tablename that contains the exposure cohorts. If
#' exposureTable <> DRUG_ERA, then expectation is exposureTable
#' has format of COHORT table: COHORT_DEFINITION_ID, SUBJECT_ID,
#' COHORT_START_DATE, COHORT_END_DATE.
#' @param outcomeDatabaseSchema The name of the database schema that is the location where the
#' data used to define the outcome cohorts is available. If
#' exposureTable = CONDITION_ERA, exposureDatabaseSchema is not
#' used by assumed to be cdmSchema. Requires read permissions to
#' this database.
#' @param outcomeTable The tablename that contains the outcome cohorts. If
#' outcomeTable <> CONDITION_OCCURRENCE, then expectation is
#' outcomeTable has format of COHORT table: COHORT_DEFINITION_ID,
#' SUBJECT_ID, COHORT_START_DATE, COHORT_END_DATE.
#' @param outputFolder Name of the folder where all the outputs will written to.
#' @param ictpdAnalysisList A list of objects of type \code{ictpdAnalysis} as created
#' using the \code{\link{createIctpdAnalysis}} function.
#' @param exposureOutcomeList A list of objects of type \code{exposureOutcome} as created
#' using the \code{\link{createExposureOutcome}} function.
#' @param cdmVersion Define the OMOP CDM version used: currently support "4" and
#' "5".
#' @param getDbIctpdDataThreads The number of parallel threads to use to load the data from
#' the database.
#' @param calculateStatisticsIcThreads The number of threads used to perform the IC statistics
#' computations.
#'
#' @export
runIctpdAnalyses <- function(connectionDetails,
cdmDatabaseSchema,
oracleTempSchema = cdmDatabaseSchema,
exposureDatabaseSchema = cdmDatabaseSchema,
exposureTable = "drug_era",
outcomeDatabaseSchema = cdmDatabaseSchema,
outcomeTable = "condition_era",
cdmVersion = 4,
outputFolder = "./IctpdOutput",
ictpdAnalysisList,
exposureOutcomeList,
getDbIctpdDataThreads = 1,
calculateStatisticsIcThreads = 1) {
for (exposureOutcome in exposureOutcomeList) {
stopifnot(class(exposureOutcome) == "exposureOutcome")
}
for (ictpdAnalysis in ictpdAnalysisList) {
stopifnot(class(ictpdAnalysis) == "ictpdAnalysis")
}
uniqueAnalysisIds <- unlist(unique(ParallelLogger::selectFromList(ictpdAnalysisList, "analysisId")))
if (length(uniqueAnalysisIds) != length(ictpdAnalysisList)) {
stop("Duplicate analysis IDs are not allowed")
}
if (!file.exists(outputFolder)) {
dir.create(outputFolder)
}
### Create reference table ###
resultsReference <- data.frame()
loadingArgsList <- unique(ParallelLogger::selectFromList(ictpdAnalysisList, c(
"getDbIctpdDataArgs",
"exposureType",
"outcomeType"
)))
for (i in 1:length(loadingArgsList)) {
loadingArgs <- loadingArgsList[[i]]
ictpdDataFile <- .createIctpdDataFileName(outputFolder, i)
for (exposureOutcome in exposureOutcomeList) {
exposureId <- .selectByType(loadingArgs$exposureType, exposureOutcome$exposureId, "exposure")
outcomeId <- .selectByType(loadingArgs$outcomeType, exposureOutcome$outcomeId, "outcome")
ictpdAnalysisSubset <- ParallelLogger::matchInList(ictpdAnalysisList, loadingArgs)
for (ictpdAnalysis in ictpdAnalysisSubset) {
ictpdResultsFile <- .createIctpdResultsFileName(outputFolder, ictpdAnalysis$analysisId)
resultsReferenceRow <- data.frame(
analysisId = ictpdAnalysis$analysisId,
exposureId = exposureId,
outcomeId = outcomeId,
ictpdDataFile = ictpdDataFile,
ictpdResultsFile = ictpdResultsFile,
stringsAsFactors = FALSE
)
resultsReference <- rbind(resultsReference, resultsReferenceRow)
}
}
}
saveRDS(resultsReference, file.path(outputFolder, "resultsReference.rds"))
writeLines("*** Creating ictpdData objects ***")
objectsToCreate <- list()
for (ictpdDataFile in unique(resultsReference$ictpdDataFile)) {
if (ictpdDataFile != "" && !file.exists((ictpdDataFile))) {
refRow <- resultsReference[resultsReference$ictpdDataFile == ictpdDataFile, ][1, ]
analysisRow <- ParallelLogger::matchInList(
ictpdAnalysisList,
list(analysisId = refRow$analysisId)
)[[1]]
getDbIctpdDataArgs <- analysisRow$getDbIctpdDataArgs
# Cheap trick to find all unique combinations of exposureId and outcomeId:
subset <- resultsReference[resultsReference$ictpdDataFile == ictpdDataFile, ]
subset$dummy <- 1
exposureOutcomePairs <- aggregate(dummy ~ exposureId + outcomeId, data = subset, mean)
args <- list(
connectionDetails = connectionDetails,
cdmDatabaseSchema = cdmDatabaseSchema,
oracleTempSchema = oracleTempSchema,
exposureDatabaseSchema = exposureDatabaseSchema,
exposureTable = exposureTable,
outcomeDatabaseSchema = outcomeDatabaseSchema,
outcomeTable = outcomeTable,
cdmVersion = cdmVersion,
exposureOutcomePairs = exposureOutcomePairs
)
args <- append(args, getDbIctpdDataArgs)
objectsToCreate[[length(objectsToCreate) + 1]] <- list(
args = args,
ictpdDataFile = ictpdDataFile
)
}
}
createIctpdDataObject <- function(params) {
ictpdData <- do.call("getDbIctpdData", params$args)
saveRDS(ictpdData, params$ictpdDataFile)
}
if (length(objectsToCreate) != 0) {
cluster <- ParallelLogger::makeCluster(getDbIctpdDataThreads)
ParallelLogger::clusterRequire(cluster, "IcTemporalPatternDiscovery")
dummy <- ParallelLogger::clusterApply(cluster, objectsToCreate, createIctpdDataObject)
ParallelLogger::stopCluster(cluster)
}
writeLines("*** Computing IC statistics ***")
objectsToCreate <- list()
for (ictpdResultsFile in unique(resultsReference$ictpdResultsFile)) {
if (ictpdResultsFile != "" && !file.exists((ictpdResultsFile))) {
refRow <- resultsReference[resultsReference$ictpdResultsFile == ictpdResultsFile, ][1, ]
analysisRow <- ParallelLogger::matchInList(
ictpdAnalysisList,
list(analysisId = refRow$analysisId)
)[[1]]
args <- analysisRow$calculateStatisticsIcArgs
ictpdDataFile <- refRow$ictpdDataFile
objectsToCreate[[length(objectsToCreate) + 1]] <- list(
args = args,
ictpdDataFile = ictpdDataFile,
ictpdResultsFile = ictpdResultsFile
)
}
}
createIctpdResults <- function(params) {
ictpdData <- readRDS(params$ictpdDataFile)
args <- list(ictpdData = ictpdData)
args <- append(args, params$args)
ictpdResults <- do.call("calculateStatisticsIc", args)
saveRDS(ictpdResults, params$ictpdResultsFile)
}
if (length(objectsToCreate) != 0) {
cluster <- ParallelLogger::makeCluster(calculateStatisticsIcThreads)
ParallelLogger::clusterRequire(cluster, "IcTemporalPatternDiscovery")
dummy <- ParallelLogger::clusterApply(cluster, objectsToCreate, createIctpdResults)
ParallelLogger::stopCluster(cluster)
}
invisible(resultsReference)
}
.createIctpdDataFileName <- function(folder, loadingArgsId) {
name <- paste("IctpdData_l", loadingArgsId, ".rds", sep = "")
return(file.path(folder, name))
}
.createIctpdResultsFileName <- function(folder, analysisId) {
name <- paste("IctpdResults_a", analysisId, ".rds", sep = "")
return(file.path(folder, name))
}
.selectByType <- function(type, value, label) {
if (is.null(type)) {
if (is.list(value)) {
stop(paste("Multiple ", label, "s specified, but none selected in analyses.", sep = ""))
}
return(value)
} else {
if (!is.list(value) || is.null(value[type])) {
stop(paste(label, "type not found:", type))
}
return(value[type])
}
}
#' Create a summary report of the analyses
#'
#' @param resultsReference A data.frame as created by the \code{\link{runIctpdAnalyses}} function.
#'
#' @export
summarizeAnalyses <- function(resultsReference) {
result <- data.frame()
for (ictpdResultsFile in unique(resultsReference$ictpdResultsFile)) {
ictpdResults <- readRDS(ictpdResultsFile)$results
analysisId <- resultsReference$analysisId[resultsReference$ictpdResultsFile == ictpdResultsFile][1]
ictpdResults$analysisId <- analysisId
result <- rbind(result, ictpdResults)
}
return(result)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.