# Copyright 2023 Observational Health Data Sciences and Informatics
#
# This file is part of MethodEvaluation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#' Synthesize positive controls
#'
#' @details
#' This function will insert additional outcomes for a given set of drug-outcome pairs. It is assumed
#' that these drug-outcome pairs represent negative controls, so the true relative risk before
#' inserting any outcomes should be 1. There are two models for inserting the outcomes during the
#' specified risk window of the drug: a Poisson model assuming multiple outcomes could occurr during a
#' single exposure, and a survival model considering only one outcome per exposure.
#' It is possible to use bulk import to insert the generated outcomes in the database. This requires
#' the environmental variable 'USE_MPP_BULK_LOAD' to be set to 'TRUE'. See \code{
#' ?DatabaseConnector::insertTable} for details on how to configure the bulk upload.
#'
#' @param connectionDetails An R object of type \code{ConnectionDetails} created using
#' the function \code{createConnectionDetails} in the
#' \code{DatabaseConnector} package.
#' @param cdmDatabaseSchema Name of database schema that contains OMOP CDM and
#' vocabulary.
#' @param oracleTempSchema DEPRECATED: use `tempEmulationSchema` instead.
#' @param tempEmulationSchema Some database platforms like Oracle and Impala do not truly support temp tables. To
#' emulate temp tables, provide a schema with write privileges where temp tables
#' can be created.
#' @param exposureDatabaseSchema The name of the database schema that is the location where
#' the exposure data used to define the exposure cohorts is
#' available. If exposureTable = DRUG_ERA,
#' exposureDatabaseSchema is not used by assumed to be
#' cdmSchema. Requires read permissions to this database.
#' @param exposureTable The table name that contains the exposure cohorts. If
#' exposureTable <> DRUG_ERA, then expectation is exposureTable
#' has format of COHORT table: cohort_concept_id, SUBJECT_ID,
#' COHORT_START_DATE, COHORT_END_DATE.
#' @param outcomeDatabaseSchema The name of the database schema that is the location where
#' the data used to define the outcome cohorts is available. If
#' exposureTable = CONDITION_ERA, exposureDatabaseSchema is not
#' used by assumed to be cdmSchema. Requires read permissions
#' to this database.
#' @param outcomeTable The table name that contains the outcome cohorts. When the
#' table name is not CONDITION_ERA This table is expected to
#' have the same format as the COHORT table: SUBJECT_ID,
#' COHORT_START_DATE, COHORT_END_DATE, COHORT_CONCEPT_ID (CDM
#' v4) or COHORT_DEFINITION_ID (CDM v5 and higher).
#' @param createOutputTable Should the output table be created prior to inserting the
#' outcomes? If TRUE and the tables already exists, it will
#' first be deleted. If FALSE, the table is assumed to exist and
#' the outcomes will be inserted. Any existing outcomes with the
#' same IDs will first be deleted.
#' @param outputIdOffset What should be the first new outcome ID that is to be
#' created?
#' @param exposureOutcomePairs A data frame with at least two columns:
#' \itemize{
#' \item {"exposureId" containing the drug_concept_ID or
#' cohort_concept_id of the exposure variable}
#' \item {"outcomeId" containing the condition_concept_ID or
#' cohort_concept_id of the outcome variable}
#' }
#'
#'
#' @param modelType Can be either "poisson" or "survival"
#' @param minOutcomeCountForModel Minimum number of outcome events required to build a model.
#' @param minOutcomeCountForInjection Minimum number of outcome events required to inject a signal.
#' @param minModelCount Minimum number of negative controls having enough outcomes to
#' fit an outcome model.
#' @param covariateSettings An object of type \code{covariateSettings} as created using
#' the \code{createCovariateSettings} function in the
#' \code{FeatureExtraction} package.
#' @param prior The prior used to fit the outcome model. See
#' \code{\link[Cyclops]{createPrior}} for details.
#' @param control The control object used to control the cross-validation used
#' to determine the hyperparameters of the prior (if
#' applicable). See \code{\link[Cyclops]{createControl}} for
#' details.
#' @param precision The allowed ratio between target and injected signal size.
#' @param firstExposureOnly Should signals be injected only for the first exposure? (ie.
#' assuming an acute effect)
#' @param washoutPeriod Number of days at the start of observation for which no
#' signals will be injected, but will be used to determine
#' whether exposure or outcome is the first one, and for
#' extracting covariates to build the outcome model.
#' @param riskWindowStart The start of the risk window relative to the start of the
#' exposure (in days). When 0, risk is assumed to start on the
#' first day of exposure.
#' @param riskWindowEnd The end of the risk window (in days) relative to the endAnchor.
#' @param endAnchor The anchor point for the end of the risk window. Can be
#' "cohort start" or "cohort end".
#' @param addIntentToTreat If true, the signal will not only be injected in the primary
#' time at risk, but also after the time at risk (up until the
#' obseration period end). In both time periods, the target
#' effect size will be enforced. This allows the same positive
#' control synthesis to be used in both on treatment and
#' intent-to-treat analysis variants. However, this will
#' preclude the controls to be used in self-controlled designs
#' that consider the time after exposure. Requires
#' \code{firstExposureOnly = TRUE}.
#' @param firstOutcomeOnly Should only the first outcome per person be considered when
#' modeling the outcome?
#' @param removePeopleWithPriorOutcomes Remove people with prior outcomes?
#' @param maxSubjectsForModel Maximum number of people used to fit an outcome model.
#' @param effectSizes A numeric vector of effect sizes that should be inserted.
#' @param outputDatabaseSchema The name of the database schema that is the location of the
#' tables containing the new outcomesRequires write permissions
#' to this database.
#' @param outputTable The name of the table names that will contain the generated
#' outcome cohorts.
#' @param workFolder Path to a folder where intermediate data will be stored.
#' @param cdmVersion Define the OMOP CDM version used: currently support "4" and
#' "5".
#' @param modelThreads Number of parallel threads to use when fitting outcome
#' models.
#' @param generationThreads Number of parallel threads to use when generating outcomes.
#'
#' @references
#' Schuemie MJ, Hripcsak G, Ryan PB, Madigan D, Suchard MA. Empirical confidence interval calibration
#' for population-level effect estimation studies in observational healthcare data. Proc Natl Acad Sci
#' U S A. 2018 Mar 13;115(11):2571-2577.
#'
#' @return
#' A data.frame listing all the drug-pairs in combination with requested effect sizes and the real
#' inserted effect size (might be different from the requested effect size because of sampling error).
#'
#' @export
synthesizePositiveControls <- function(connectionDetails,
cdmDatabaseSchema,
oracleTempSchema = NULL,
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"),
exposureDatabaseSchema = cdmDatabaseSchema,
exposureTable = "drug_era",
outcomeDatabaseSchema = cdmDatabaseSchema,
outcomeTable = "cohort",
outputDatabaseSchema = outcomeDatabaseSchema,
outputTable = outcomeTable,
createOutputTable = FALSE,
exposureOutcomePairs,
modelType = "poisson",
minOutcomeCountForModel = 100,
minOutcomeCountForInjection = 25,
minModelCount = 5,
covariateSettings = FeatureExtraction::createCovariateSettings(
useDemographicsAgeGroup = TRUE,
useDemographicsGender = TRUE,
useDemographicsIndexYear = TRUE,
useDemographicsIndexMonth = TRUE,
useConditionGroupEraLongTerm = TRUE,
useDrugGroupEraLongTerm = TRUE,
useProcedureOccurrenceLongTerm = TRUE,
useMeasurementLongTerm = TRUE,
useObservationLongTerm = TRUE,
useCharlsonIndex = TRUE,
useDcsi = TRUE,
useChads2Vasc = TRUE,
longTermStartDays = 365,
endDays = 0
),
prior = Cyclops::createPrior("laplace", exclude = 0, useCrossValidation = TRUE),
control = Cyclops::createControl(
cvType = "auto",
startingVariance = 0.1,
seed = 1,
resetCoefficients = TRUE,
noiseLevel = "quiet",
threads = 10
),
firstExposureOnly = FALSE,
washoutPeriod = 183,
riskWindowStart = 0,
riskWindowEnd = 0,
endAnchor = "cohort end",
addIntentToTreat = FALSE,
firstOutcomeOnly = FALSE,
removePeopleWithPriorOutcomes = FALSE,
maxSubjectsForModel = 1e+05,
effectSizes = c(1, 1.25, 1.5, 2, 4),
precision = 0.01,
outputIdOffset = 1000,
workFolder = "./SignalInjectionTemp",
cdmVersion = "5",
modelThreads = 1,
generationThreads = 1) {
if (!is.null(oracleTempSchema) && oracleTempSchema != "") {
warning("The 'oracleTempSchema' argument is deprecated. Use 'tempEmulationSchema' instead.")
tempEmulationSchema <- oracleTempSchema
}
errorMessages <- checkmate::makeAssertCollection()
checkmate::assertClass(connectionDetails, "ConnectionDetails", add = errorMessages)
checkmate::assertCharacter(cdmDatabaseSchema, len = 1, add = errorMessages)
checkmate::assertCharacter(tempEmulationSchema, len = 1, null.ok = TRUE, add = errorMessages)
checkmate::assertCharacter(exposureDatabaseSchema, len = 1, add = errorMessages)
checkmate::assertCharacter(exposureTable, len = 1, add = errorMessages)
checkmate::assertCharacter(outcomeDatabaseSchema, len = 1, add = errorMessages)
checkmate::assertCharacter(outcomeTable, len = 1, add = errorMessages)
checkmate::assertLogical(createOutputTable, len = 1, add = errorMessages)
checkmate::assertDataFrame(exposureOutcomePairs, add = errorMessages)
checkmate::assertNames(colnames(exposureOutcomePairs), must.include = c("exposureId", "outcomeId"), add = errorMessages)
checkmate::assertChoice(modelType, c("poisson", "survival"), add = errorMessages)
checkmate::assertInt(minOutcomeCountForModel, lower = 0, add = errorMessages)
checkmate::assertInt(minOutcomeCountForInjection, lower = 0, add = errorMessages)
checkmate::assertInt(minModelCount, lower = 0, add = errorMessages)
checkmate::assertList(covariateSettings, add = errorMessages)
checkmate::assertClass(prior, "cyclopsPrior", add = errorMessages)
checkmate::assertClass(control, "cyclopsControl", add = errorMessages)
checkmate::assertLogical(firstExposureOnly, len = 1, add = errorMessages)
checkmate::assertInt(washoutPeriod, lower = 0, add = errorMessages)
checkmate::assertInt(riskWindowStart, add = errorMessages)
checkmate::assertInt(riskWindowEnd, add = errorMessages)
checkmate::assertLogical(removePeopleWithPriorOutcomes, len = 1, add = errorMessages)
checkmate::assertInt(maxSubjectsForModel, lower = 0, add = errorMessages)
checkmate::assertNumeric(effectSizes, lower = 1, min.len = 1, add = errorMessages)
checkmate::assertNumeric(precision, lower = 0, len = 1, add = errorMessages)
checkmate::assertInt(outputIdOffset, lower = 0, add = errorMessages)
checkmate::assertCharacter(workFolder, len = 1, add = errorMessages)
checkmate::assertCharacter(cdmVersion, len = 1, add = errorMessages)
checkmate::assertInt(modelThreads, lower = 1, add = errorMessages)
checkmate::assertInt(generationThreads, lower = 1, add = errorMessages)
checkmate::reportAssertions(collection = errorMessages)
if (!firstExposureOnly && addIntentToTreat) {
stop("Cannot have addIntentToTreat = TRUE and firstExposureOnly = FALSE at the same time")
}
if (modelType == "poisson" && addIntentToTreat) {
stop("Intent to treat injection not yet implemented for Poisson models")
}
if (cdmVersion == "4") {
stop("CDM version 4 is not supported")
}
if (!grepl("start$|end$", endAnchor, ignore.case = TRUE)) {
stop("endAnchor should have value 'cohort start' or 'cohort end'")
}
isEnd <- function(anchor) {
return(grepl("end$", anchor, ignore.case = TRUE))
}
if (!file.exists(workFolder)) {
dir.create(workFolder)
}
exposuresFile <- file.path(workFolder, "exposures.rds")
outcomesFile <- file.path(workFolder, "outcomes.rds")
priorOutcomesFile <- file.path(workFolder, "priorOutcomes.rds")
countsFile <- file.path(workFolder, "counts.rds")
result <- tibble(
exposureId = rep(exposureOutcomePairs$exposureId, each = length(effectSizes)),
outcomeId = rep(exposureOutcomePairs$outcomeId, each = length(effectSizes)),
targetEffectSize = rep(effectSizes, nrow(exposureOutcomePairs)),
newOutcomeId = outputIdOffset + (0:(nrow(exposureOutcomePairs) * length(effectSizes) - 1)),
trueEffectSize = 0,
trueEffectSizeFirstExposure = 0,
trueEffectSizeItt = 0,
injectedOutcomes = 0,
modelFolder = "",
outcomesToInjectFile = ""
)
exposureIds <- unique(exposureOutcomePairs$exposureId)
conn <- DatabaseConnector::connect(connectionDetails)
on.exit(DatabaseConnector::disconnect(conn))
# Create exposure cohorts ------------------------------------
ParallelLogger::logInfo("\nCreating risk windows")
renderedSql <- SqlRender::loadRenderTranslateSql("CreateExposedCohorts.sql",
packageName = "MethodEvaluation",
dbms = connectionDetails$dbms,
tempEmulationSchema = tempEmulationSchema,
cdm_database_schema = cdmDatabaseSchema,
exposure_ids = exposureIds,
washout_period = washoutPeriod,
exposure_database_schema = exposureDatabaseSchema,
exposure_table = exposureTable,
first_exposure_only = firstExposureOnly,
risk_window_start = riskWindowStart,
risk_window_end = riskWindowEnd,
add_exposure_days_to_end = isEnd(endAnchor)
)
DatabaseConnector::executeSql(conn, renderedSql)
# Get exposure cohorts from database -------------------------------
if (file.exists(exposuresFile)) {
exposures <- readRDS(exposuresFile)
} else {
ParallelLogger::logInfo("\nRetrieving exposure cohorts")
exposureSql <- SqlRender::loadRenderTranslateSql("GetExposedCohorts.sql",
packageName = "MethodEvaluation",
dbms = connectionDetails$dbms,
tempEmulationSchema = tempEmulationSchema
)
exposures <- DatabaseConnector::querySql(conn, exposureSql, snakeCaseToCamelCase = TRUE)
exposures <- exposures[order(exposures$rowId), ]
saveRDS(exposures, exposuresFile)
}
# Get prior outcomes (if needed) ------------------------------------
if (removePeopleWithPriorOutcomes) {
if (file.exists(priorOutcomesFile)) {
priorOutcomes <- readRDS(priorOutcomesFile)
} else {
ParallelLogger::logInfo("Finding people with prior outcomes")
table <- exposureOutcomePairs
colnames(table) <- SqlRender::camelCaseToSnakeCase(colnames(table))
DatabaseConnector::insertTable(
connection = conn,
tableName = "#exposure_outcome",
data = table,
dropTableIfExists = TRUE,
createTable = TRUE,
tempTable = TRUE,
tempEmulationSchema = tempEmulationSchema
)
outcomeSql <- SqlRender::loadRenderTranslateSql("GetPriorOutcomes.sql",
packageName = "MethodEvaluation",
dbms = connectionDetails$dbms,
tempEmulationSchema = tempEmulationSchema,
cdm_database_schema = cdmDatabaseSchema,
outcome_database_schema = outcomeDatabaseSchema,
outcome_table = outcomeTable,
first_outcome_only = firstOutcomeOnly
)
priorOutcomes <- DatabaseConnector::querySql(conn, outcomeSql, snakeCaseToCamelCase = TRUE)
saveRDS(priorOutcomes, priorOutcomesFile)
sql <- "TRUNCATE TABLE #exposure_outcome; DROP TABLE #exposure_outcome;"
sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema)
DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE)
}
}
# Get outcomes ---------------------------------------
if (file.exists(outcomesFile)) {
outcomeCounts <- readRDS(outcomesFile)
} else {
ParallelLogger::logInfo("Extracting outcome counts")
table <- exposureOutcomePairs
colnames(table) <- SqlRender::camelCaseToSnakeCase(colnames(table))
DatabaseConnector::insertTable(
connection = conn,
tableName = "#exposure_outcome",
data = table,
dropTableIfExists = TRUE,
createTable = TRUE,
tempTable = TRUE,
tempEmulationSchema = tempEmulationSchema
)
outcomeSql <- SqlRender::loadRenderTranslateSql("GetOutcomes.sql",
packageName = "MethodEvaluation",
dbms = connectionDetails$dbms,
tempEmulationSchema = tempEmulationSchema,
cdm_database_schema = cdmDatabaseSchema,
outcome_database_schema = outcomeDatabaseSchema,
outcome_table = outcomeTable,
first_outcome_only = firstOutcomeOnly
)
outcomeCounts <- DatabaseConnector::querySql(conn, outcomeSql, snakeCaseToCamelCase = TRUE)
saveRDS(outcomeCounts, outcomesFile)
sql <- "TRUNCATE TABLE #exposure_outcome; DROP TABLE #exposure_outcome;"
sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema)
DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE)
}
# Generate counts for result object -------------------------------
if (file.exists(countsFile)) {
result <- readRDS(countsFile)
} else {
ParallelLogger::logInfo("Computing counts per exposure - outcome pair")
temp <- select(exposures, .data$rowId, .data$exposureId, .data$eraNumber) %>%
inner_join(select(outcomeCounts, .data$rowId, .data$outcomeId, .data$y, .data$yItt),
by = "rowId"
)
if (modelType == "survival") {
temp <- temp %>%
mutate(
y = .data$y != 0,
yItt = .data$yItt != 0
)
}
generateCounts <- function(outcomeId) {
tempOutcomes <- temp %>%
filter(.data$outcomeId == !!outcomeId)
if (removePeopleWithPriorOutcomes) {
removeRowIds <- priorOutcomes %>%
filter(.data$outcomeId == !!outcomeId) %>%
pull(.data$rowId)
tempOutcomes <- tempOutcomes %>%
filter(!.data$rowId %in% removeRowIds)
tempExposures <- exposures %>%
filter(!.data$rowId %in% removeRowIds)
} else {
tempExposures <- exposures
}
exposureSummary <- tempExposures %>%
group_by(.data$exposureId) %>%
summarize(
exposures = n(),
firstExposures = sum(.data$eraNumber == 1),
.groups = "drop_last"
)
outcomeSummary <- tempOutcomes %>%
group_by(.data$exposureId) %>%
summarize(
observedOutcomes = sum(.data$y),
observedOutcomesFirstExposure = sum(.data$y & .data$eraNumber == 1),
observedOutcomesItt = sum(.data$yItt),
observedOutcomesFirstExposureItt = sum(.data$yItt & .data$eraNumber == 1),
.groups = "drop_last"
)
resultRows <- exposureSummary %>%
left_join(outcomeSummary, by = "exposureId") %>%
mutate(outcomeId = !!outcomeId)
return(resultRows)
}
resultRows <- purrr::map_dfr(unique(result$outcomeId), generateCounts)
result <- left_join(result, resultRows, by = c("exposureId", "outcomeId")) %>%
mutate(
exposures = case_when(
is.na(exposures) ~ as.integer(0),
TRUE ~ as.integer(.data$exposures)
),
firstExposures = case_when(
is.na(firstExposures) ~ as.integer(0),
TRUE ~ as.integer(.data$firstExposures)
),
observedOutcomes = case_when(
is.na(observedOutcomes) ~ as.integer(0),
TRUE ~ as.integer(.data$observedOutcomes)
),
observedOutcomesFirstExposure = case_when(
is.na(observedOutcomesFirstExposure) ~ as.integer(0),
TRUE ~ as.integer(.data$observedOutcomesFirstExposure)
),
observedOutcomesItt = case_when(
is.na(observedOutcomesItt) ~ as.integer(0),
TRUE ~ as.integer(.data$observedOutcomesItt)
),
observedOutcomesFirstExposureItt = case_when(
is.na(observedOutcomesFirstExposureItt) ~ as.integer(0),
TRUE ~ as.integer(.data$observedOutcomesFirstExposureItt)
)
)
saveRDS(result, countsFile)
}
# Build models ---------------------------- Find all exposures for each outcome, then identify unique
# groups of exposures
group <- function(outcomeId) {
exposureIds <- exposureOutcomePairs$exposureId[exposureOutcomePairs$outcomeId == outcomeId]
return(exposureIds[order(exposureIds)])
}
outcomeIds <- unique(exposureOutcomePairs$outcomeId)
groups <- purrr::map(unique(outcomeIds), group)
uniqueGroups <- unique(groups)
saveRDS(uniqueGroups, file.path(workFolder, "uniqueGroups.rds"))
outcomeIdToGroupId <- data.frame(outcomeIds = outcomeIds, groupIds = match(groups, uniqueGroups))
saveRDS(outcomeIdToGroupId, file.path(workFolder, "outcomeIdToGroupId.rds"))
# Fetch covariates for each group of exposures
for (i in 1:length(uniqueGroups)) {
covarFileName <- file.path(workFolder, sprintf("covarsForModel_g%s.zip", i))
if (!file.exists(covarFileName)) {
cohortIds <- uniqueGroups[[i]]
sql <- "SELECT COUNT(*) AS entries FROM (SELECT DISTINCT subject_id, cohort_start_date FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids)) tmp;"
count <- DatabaseConnector::renderTranslateQuerySql(
connection = conn,
sql = sql,
tempEmulationSchema = tempEmulationSchema,
cohort_ids = cohortIds,
snakeCaseToCamelCase = TRUE
)$entries
if (count > maxSubjectsForModel) {
ParallelLogger::logInfo("Sampling exposed cohorts for model(s)")
renderedSql <- SqlRender::loadRenderTranslateSql("SampleExposedCohorts.sql",
packageName = "MethodEvaluation",
dbms = connectionDetails$dbms,
tempEmulationSchema = tempEmulationSchema,
sample_size = maxSubjectsForModel,
cohort_ids = cohortIds
)
DatabaseConnector::executeSql(conn, renderedSql)
sql <- "SELECT row_id FROM #sampled_person"
sampledRowIds <- DatabaseConnector::renderTranslateQuerySql(
connection = conn,
sql = sql,
tempEmulationSchema = tempEmulationSchema,
snakeCaseToCamelCase = TRUE
)
sampledExposuresFile <- file.path(workFolder, sprintf("sampledRowIds_g%s.rds", i))
saveRDS(sampledRowIds, sampledExposuresFile)
} else {
# Don't sample, just copy:
sql <- "SELECT * INTO #sampled_person FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids);"
DatabaseConnector::renderTranslateExecuteSql(
connection = conn,
sql = sql,
tempEmulationSchema = tempEmulationSchema,
cohort_ids = cohortIds,
progressBar = FALSE,
reportOverallTime = FALSE
)
}
ParallelLogger::logInfo("Extracting covariates for fitting outcome model(s)")
covariateData <- FeatureExtraction::getDbCovariateData(
connection = conn,
oracleTempSchema = tempEmulationSchema,
cdmDatabaseSchema = cdmDatabaseSchema,
cohortTable = "#sampled_person",
cohortTableIsTemp = TRUE,
rowIdField = "row_id",
covariateSettings = covariateSettings,
cdmVersion = cdmVersion,
aggregated = FALSE
)
covariateData <- FeatureExtraction::tidyCovariateData(
covariateData = covariateData,
normalize = TRUE,
removeRedundancy = TRUE
)
FeatureExtraction::saveCovariateData(covariateData, covarFileName)
sql <- "TRUNCATE TABLE #sampled_person; DROP TABLE #sampled_person;"
DatabaseConnector::renderTranslateExecuteSql(
connection = conn,
tempEmulationSchema = tempEmulationSchema,
sql = sql,
progressBar = FALSE,
reportOverallTime = FALSE
)
}
}
ParallelLogger::logInfo("Fitting outcome models")
tasks <- list()
modelsWithEnoughOutcomes <- 0
outcomeIds <- unique(exposureOutcomePairs$outcomeId)
for (i in 1:length(outcomeIds)) {
outcomeId <- outcomeIds[i]
groupId <- outcomeIdToGroupId$groupId[outcomeIdToGroupId$outcomeId == outcomeId]
groupExposureIds <- uniqueGroups[[groupId]]
idx <- result$outcomeId == outcomeId &
result$exposureId %in% groupExposureIds &
result$targetEffectSize == effectSizes[1]
if (sum(result$observedOutcomes[idx]) >= minOutcomeCountForModel) {
modelsWithEnoughOutcomes <- modelsWithEnoughOutcomes + 1
modelFolder <- file.path(workFolder, paste0("model_o", outcomeId))
result$modelFolder[result$outcomeId == outcomeId] <- modelFolder
if (!file.exists(modelFolder)) {
task <- list(
outcomeId = outcomeId,
modelFolder = modelFolder,
covarFileName = file.path(workFolder, sprintf("covarsForModel_g%s.zip", groupId)),
sampledExposuresFile = file.path(workFolder, sprintf("sampledRowIds_g%s.rds", groupId)),
groupExposureIds = groupExposureIds
)
tasks[[length(tasks) + 1]] <- task
}
}
}
if (modelsWithEnoughOutcomes < minModelCount) {
stop(
"Not enough negative controls with sufficent outcome count to fit outcome model. Found ",
modelsWithEnoughOutcomes,
" controls with enough outcomes (> ",
minOutcomeCountForModel,
"), but require ",
minModelCount,
" controls."
)
}
if (length(tasks) > 0) {
cluster <- ParallelLogger::makeCluster(modelThreads)
ParallelLogger::clusterApply(
cluster,
tasks,
fitModel,
result,
exposuresFile,
outcomesFile,
priorOutcomesFile,
removePeopleWithPriorOutcomes,
modelType,
prior,
control
)
ParallelLogger::stopCluster(cluster)
}
# Generate new outcomes ----------------------------------------- Fetch covariates for all rows if
# covars for model were based on a sample:
for (i in 1:length(uniqueGroups)) {
sampledExposuresFile <- file.path(workFolder, sprintf("sampledRowIds_g%s.rds", i))
covarFileName <- file.path(workFolder, sprintf("covarsForPrediction_g%s.zip", i))
if (file.exists(sampledExposuresFile) && !file.exists(covarFileName)) {
ParallelLogger::logInfo("Extracting covariates for all rows predicting outcomes")
if (!is(covariateSettings, "covariateSettings")) {
stop("Composite covariate settings not supported")
}
outcomeIds <- outcomeIdToGroupId$outcomeId[outcomeIdToGroupId$groupId == i]
modelCovariateIds <- c()
for (modelFolder in unique(result$modelFolder[result$outcomeId %in% outcomeIds])) {
if (modelFolder != "" && file.exists(file.path(modelFolder, "betas.rds"))) {
betas <- readRDS(file.path(modelFolder, "betas.rds"))
if (is.null(modelCovariateIds)) {
modelCovariateIds <- betas$id
} else {
modelCovariateIds <- c(modelCovariateIds, betas$id)
}
}
}
modelCovariateIds <- unique(modelCovariateIds)
modelCovariateIds <- modelCovariateIds[modelCovariateIds != 0]
ParallelLogger::logInfo(paste(
"Number of unique covariates across outcome models:",
length(modelCovariateIds)
))
if (length(modelCovariateIds) == 0) {
dir.create(covarFileName)
} else {
covariateSettings$includedCovariateIds <- modelCovariateIds
sql <- "SELECT * INTO #selected_person FROM #cohort_person WHERE cohort_definition_id IN (@cohort_ids);"
sql <- SqlRender::render(sql, cohort_ids = uniqueGroups[[i]])
sql <- SqlRender::translate(sql,
targetDialect = connectionDetails$dbms,
tempEmulationSchema = tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE)
covariateData <- FeatureExtraction::getDbCovariateData(
connection = conn,
oracleTempSchema = tempEmulationSchema,
cdmDatabaseSchema = cdmDatabaseSchema,
cohortTable = "#selected_person",
cohortTableIsTemp = TRUE,
rowIdField = "row_id",
covariateSettings = covariateSettings,
cdmVersion = cdmVersion,
aggregated = FALSE
)
covariateData <- FeatureExtraction::tidyCovariateData(
covariateData = covariateData,
normalize = TRUE,
removeRedundancy = FALSE
)
FeatureExtraction::saveCovariateData(covariateData, covarFileName)
sql <- "TRUNCATE TABLE #selected_person; DROP TABLE #selected_person;"
sql <- SqlRender::translate(sql,
targetDialect = connectionDetails$dbms,
tempEmulationSchema = tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE)
}
}
}
ParallelLogger::logInfo("Generating outcomes")
temp <- result[result$observedOutcomes > minOutcomeCountForInjection, c(
"exposureId",
"outcomeId",
"modelFolder"
)]
temp <- temp[temp$modelFolder != "", ]
temp <- unique(temp)
createTask <- function(i) {
groupId <- outcomeIdToGroupId$groupId[outcomeIdToGroupId$outcomeId == temp$outcomeId[i]]
sampledExposuresFile <- file.path(workFolder, sprintf("sampledRowIds_g%s.rds", groupId))
if (file.exists(sampledExposuresFile)) {
covarFileName <- file.path(workFolder, sprintf("covarsForPrediction_g%s.zip", groupId))
} else {
covarFileName <- file.path(workFolder, sprintf("covarsForModel_g%s.zip", groupId))
}
task <- list(
exposureId = temp$exposureId[i],
outcomeId = temp$outcomeId[i],
modelFolder = temp$modelFolder[i],
covarFileName = covarFileName
)
return(task)
}
if (nrow(temp) > 0) {
tasks <- lapply(1:nrow(temp), createTask)
cluster <- ParallelLogger::makeCluster(generationThreads)
results <- ParallelLogger::clusterApply(
cluster,
tasks,
generateOutcomes,
result,
exposuresFile,
outcomesFile,
priorOutcomesFile,
removePeopleWithPriorOutcomes,
modelType,
effectSizes,
precision,
workFolder,
addIntentToTreat
)
ParallelLogger::stopCluster(cluster)
result <- do.call("rbind", results)
}
# Save summary -----------------------------------------------------------------
summaryFile <- .createSummaryFileName(workFolder)
saveRDS(result, summaryFile)
if (!is.null(getOption("skipPositiveControlUpload")) && getOption("skipPositiveControlUpload")) {
return(result)
}
# Insert outcomes into database ------------------------------------------------
ParallelLogger::logInfo("Inserting additional outcomes into database")
fileNames <- result$outcomesToInjectFile[result$outcomesToInjectFile != ""]
outcomesToInject <- lapply(fileNames, readRDS)
outcomesToInject <- do.call("rbind", outcomesToInject)
colnames(outcomesToInject) <- SqlRender::camelCaseToSnakeCase(colnames(outcomesToInject))
if (Sys.getenv("USE_MPP_BULK_LOAD") == "TRUE" ||
Sys.getenv("DATABASE_CONNECTOR_BULK_UPLOAD") == "TRUE") {
tableName <- paste0(
outputDatabaseSchema,
".temp_outcomes_",
paste(sample(letters, 5), collapse = "")
)
tempTable <- FALSE
} else {
tableName <- "#temp_outcomes"
tempTable <- TRUE
}
DatabaseConnector::insertTable(
connection = conn,
tableName = tableName,
data = outcomesToInject,
dropTableIfExists = TRUE,
createTable = TRUE,
tempTable = tempTable,
tempEmulationSchema = tempEmulationSchema,
progressBar = TRUE
)
toCopy <- result[result$modelFolder != "", c("outcomeId", "newOutcomeId")]
colnames(toCopy) <- SqlRender::camelCaseToSnakeCase(colnames(toCopy))
DatabaseConnector::insertTable(
connection = conn,
tableName = "#to_copy",
data = toCopy,
dropTableIfExists = TRUE,
createTable = TRUE,
tempTable = TRUE,
tempEmulationSchema = tempEmulationSchema
)
ParallelLogger::logInfo("Copying negative control outcomes into database")
copySql <- SqlRender::loadRenderTranslateSql("CopyOutcomes.sql",
packageName = "MethodEvaluation",
dbms = connectionDetails$dbms,
cdm_database_schema = cdmDatabaseSchema,
tempEmulationSchema = tempEmulationSchema,
outcome_database_schema = outcomeDatabaseSchema,
outcome_table = outcomeTable,
output_database_schema = outputDatabaseSchema,
output_table = outputTable,
create_output_table = createOutputTable,
temp_outcomes_table = tableName
)
DatabaseConnector::executeSql(conn, copySql)
sql <- "TRUNCATE TABLE #cohort_person; DROP TABLE #cohort_person;"
sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema)
DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE)
sql <- "TRUNCATE TABLE #to_copy; DROP TABLE #to_copy;"
sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema)
DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE)
sql <- "TRUNCATE TABLE @temp_outcomes_table; DROP TABLE @temp_outcomes_table;"
sql <- SqlRender::render(sql, temp_outcomes_table = tableName)
sql <- SqlRender::translate(sql, targetDialect = connectionDetails$dbms, tempEmulationSchema = tempEmulationSchema)
DatabaseConnector::executeSql(conn, sql, progressBar = FALSE, reportOverallTime = FALSE)
return(result)
}
fitModel <- function(task,
result,
exposuresFile,
outcomesFile,
priorOutcomesFile,
removePeopleWithPriorOutcomes,
modelType,
prior,
control) {
ParallelLogger::logInfo("Fitting model for outcome ", task$outcomeId)
exposures <- readRDS(exposuresFile)
outcomes <- readRDS(outcomesFile)
outcomes <- outcomes[outcomes$outcomeId == task$outcomeId, ]
if (file.exists(task$sampledExposuresFile)) {
sampledExposures <- readRDS(task$sampledExposuresFile)
outcomes <- outcomes[outcomes$rowId %in% sampledExposures$rowId, ]
exposures <- exposures[exposures$rowId %in% sampledExposures$rowId, ]
} else {
exposures <- exposures[exposures$exposureId %in% task$groupExposureIds, ]
outcomes <- outcomes[outcomes$rowId %in% exposures$rowId, ]
}
# Dedupe exposures for model fitting, so we don't overfit:
exposures <- exposures[order(exposures$personId, exposures$cohortStartDate), ]
exposures <- exposures[!duplicated(exposures[, c("personId", "cohortStartDate")]), ]
covariateData <- FeatureExtraction::loadCovariateData(task$covarFileName)
covariates <- covariateData$covariates %>%
filter(.data$rowId %in% local(exposures$rowId))
if (removePeopleWithPriorOutcomes) {
priorOutcomes <- readRDS(priorOutcomesFile)
removeRowIds <- priorOutcomes$rowId[priorOutcomes$outcomeId == task$outcomeId]
outcomes <- outcomes[!(outcomes$rowId %in% removeRowIds), ]
exposures <- exposures[!(exposures$rowId %in% removeRowIds), ]
covariates <- covariates %>%
filter(!.data$rowId %in% removeRowIds)
}
outcomes <- merge(exposures, outcomes[, c(
"rowId",
"y",
"timeToEvent"
)], by = c("rowId"), all.x = TRUE)
outcomes <- outcomes[order(outcomes$rowId), ]
outcomes$y[is.na(outcomes$y)] <- 0
names(outcomes)[names(outcomes) == "daysAtRisk"] <- "time"
if (modelType == "survival") {
# For survival, time is either the time to the end of the risk window, or the event
outcomes$y[outcomes$y != 0] <- 1
outcomes$time[outcomes$y != 0] <- outcomes$timeToEvent[outcomes$y != 0]
}
outcomes$time <- outcomes$time + 1
# Note: for survival, using Poisson regression with 1 outcome and censored time as equivalent of
# survival regression:
covariateData$outcomes <- outcomes
cyclopsData <- Cyclops::convertToCyclopsData(covariateData$outcomes,
covariates,
modelType = "pr",
quiet = TRUE
)
fit <- tryCatch(
{
Cyclops::fitCyclopsModel(cyclopsData, prior = prior, control = control)
},
error = function(e) {
e$message
}
)
if (fit$return_flag != "SUCCESS") {
fit <- fit$return_flag
}
if (is.character(fit)) {
ParallelLogger::logInfo(paste("Unable to fit model for outcome", task$outcomeId, ":", fit))
dir.create(task$modelFolder)
write.csv(fit, file.path(task$modelFolder, "Error.txt"))
} else {
betas <- coef(fit)
intercept <- tibble(
beta = betas[1],
id = bit64::as.integer64(0),
covariateName = "(Intercept)",
row.names = NULL
)
betas <- betas[betas != 0]
if (length(betas) > 1) {
betas <- betas[2:length(betas)]
betas <- tibble(beta = betas, covariateId = bit64::as.integer64(attr(betas, "names")))
betas <- betas %>%
inner_join(
covariateData$covariateRef %>%
collect() %>%
mutate(covariateId = bit64::as.integer64(.data$covariateId)),
by = "covariateId"
) %>%
select(.data$beta, id = .data$covariateId, .data$covariateName) %>%
arrange(desc(abs(.data$beta)))
betas <- bind_rows(intercept, betas)
} else {
betas <- intercept
}
dir.create(task$modelFolder)
saveRDS(betas, file.path(task$modelFolder, "betas.rds"))
}
}
generateOutcomes <- function(task,
result,
exposuresFile,
outcomesFile,
priorOutcomesFile,
removePeopleWithPriorOutcomes,
modelType,
effectSizes,
precision,
workFolder,
addIntentToTreat) {
ParallelLogger::logInfo(
"Generating outcomes for exposure ",
task$exposureId,
" and outcome ",
task$outcomeId
)
set.seed(1)
resultSubset <- result[result$exposureId == task$exposureId & result$outcomeId == task$outcomeId, ]
if (file.exists(file.path(task$modelFolder, "Error.txt"))) {
return(resultSubset)
} else {
exposures <- readRDS(exposuresFile)
exposures <- exposures[exposures$exposureId == task$exposureId, ]
predictionFile <- file.path(task$modelFolder, paste0("prediction_e", task$exposureId, ".rds"))
if (file.exists(predictionFile)) {
prediction <- readRDS(predictionFile)
} else {
betas <- readRDS(file.path(task$modelFolder, "betas.rds"))
if (nrow(betas) == 1) {
covariates <- NULL
} else {
covariateData <- FeatureExtraction::loadCovariateData(task$covarFileName)
covariates <- covariateData$covariates %>%
filter(.data$rowId %in% local(exposures$rowId))
}
prediction <- .predict(betas, exposures, covariates, modelType)
saveRDS(prediction, predictionFile)
}
outcomes <- readRDS(outcomesFile)
outcomes <- outcomes[outcomes$outcomeId == task$outcomeId, ]
if (removePeopleWithPriorOutcomes) {
priorOutcomes <- readRDS(priorOutcomesFile)
removeRowIds <- priorOutcomes$rowId[priorOutcomes$outcomeId == task$outcomeId]
exposures <- exposures[!(exposures$rowId %in% removeRowIds), ]
outcomes <- outcomes[!(outcomes$rowId %in% removeRowIds), ]
}
exposures <- merge(exposures, prediction)
exposures <- merge(exposures, outcomes, all.x = TRUE)
exposures$y[is.na(exposures$y)] <- 0
exposures$yItt[is.na(exposures$yItt)] <- 0
for (fxSizeIdx in 1:length(effectSizes)) {
effectSize <- effectSizes[fxSizeIdx]
if (effectSize == 1) {
newOutcomes <- data.frame()
injectedRr <- 1
injectedRrFirstExposure <- 1
injectedRrItt <- 1
} else {
# When sampling, the expected RR size is the target RR, but the actual RR could be different due to
# random error. this code is redoing the sampling until actual RR is equal to the target RR.
targetCount <- resultSubset$observedOutcomes[1] * (effectSize - 1)
time <- exposures$daysAtRisk + 1
newOutcomeCounts <- 0
if (modelType == "poisson") {
# Generate results under Poisson model -------------------------
multiplier <- 1
ratios <- c()
while (round(abs(sum(newOutcomeCounts) - targetCount)) > precision * targetCount) {
newOutcomeCounts <- rpois(
nrow(exposures),
multiplier * exposures$prediction * (effectSize - 1)
)
newOutcomeCounts[newOutcomeCounts > time] <- time[newOutcomeCounts > time]
ratios <- c(ratios, sum(newOutcomeCounts) / targetCount)
if (length(ratios) == 100) {
multiplier <- multiplier * 1 / mean(ratios)
ratios <- c()
ParallelLogger::logDebug(paste(
"Unable to achieve target RR using model as is. Adding multiplier of",
multiplier,
"to force target"
))
}
}
idx <- which(newOutcomeCounts != 0)
temp <- data.frame(
personId = exposures$personId[idx],
cohortStartDate = exposures$cohortStartDate[idx],
time = exposures$daysAtRisk[idx] + 1,
nOutcomes = newOutcomeCounts[idx]
)
outcomeRows <- sum(temp$nOutcomes)
newOutcomes <- data.frame(
personId = rep(0, outcomeRows),
cohortStartDate = rep(as.Date("1900-01-01"), outcomeRows),
timeToEvent = rep(0, outcomeRows)
)
cursor <- 1
for (i in 1:nrow(temp)) {
nOutcomes <- temp$nOutcomes[i]
if (nOutcomes != 0) {
newOutcomes$personId[cursor:(cursor + nOutcomes - 1)] <- temp$personId[i]
newOutcomes$cohortStartDate[cursor:(cursor + nOutcomes - 1)] <- temp$cohortStartDate[i]
newOutcomes$timeToEvent[cursor:(cursor + nOutcomes - 1)] <- sample.int(
size = nOutcomes,
temp$time[i]
) - 1
cursor <- cursor + nOutcomes
}
}
injectedRr <- 1 + (nrow(newOutcomes) / resultSubset$observedOutcomes[1])
# Count outcomes during first episodes:
newOutcomeCountsFirstExposure <- sum(newOutcomeCounts[exposures$eraNumber == 1])
injectedRrFirstExposure <- 1 +
(newOutcomeCountsFirstExposure / resultSubset$observedOutcomesFirstExposure[1])
} else {
# Survival model Generate outcomes under survival model --------------------------------------
result <- injectSurvival(exposures, effectSize, precision, addIntentToTreat)
injectedRr <- result$injectedRr
injectedRrFirstExposure <- result$injectedRrFirstExposure
injectedRrItt <- result$injectedRrItt
newOutcomes <- result$newOutcomes
}
}
ParallelLogger::logInfo(paste(
"Target RR =",
effectSize,
", injected RR =",
injectedRr,
", injected RR during first exposure only =",
injectedRrFirstExposure,
", injected RR during ITT window =",
injectedRrItt
))
newOutcomeId <- resultSubset$newOutcomeId[resultSubset$targetEffectSize == effectSize]
# Write new outcomes to file for later insertion into DB:
if (nrow(newOutcomes) != 0) {
newOutcomes$cohortStartDate <- newOutcomes$cohortStartDate + newOutcomes$timeToEvent
newOutcomes$timeToEvent <- NULL
newOutcomes$cohortDefinitionId <- newOutcomeId
names(newOutcomes)[names(newOutcomes) == "personId"] <- "subjectId"
outcomesToInjectFile <- file.path(workFolder, paste0(
"newOutcomes_e",
task$exposureId,
"_o",
task$outcomeId,
"_rr",
effectSize,
".rds"
))
saveRDS(newOutcomes, outcomesToInjectFile)
resultSubset$outcomesToInjectFile[resultSubset$targetEffectSize == effectSize] <- outcomesToInjectFile
}
idx <- resultSubset$targetEffectSize == effectSize
resultSubset$trueEffectSize[idx] <- injectedRr
resultSubset$trueEffectSizeFirstExposure[idx] <- injectedRrFirstExposure
resultSubset$trueEffectSizeItt[idx] <- injectedRrItt
resultSubset$injectedOutcomes[idx] <- nrow(newOutcomes)
}
return(resultSubset)
}
}
injectSurvival <- function(exposures, effectSize, precision, addIntentToTreat) {
hasOutcome <- exposures$y != 0
observedCount <- sum(hasOutcome)
correctedTargetCount <- observedCount * (effectSize - 1)
survivalTime <- as.numeric(exposures$daysAtRisk)
survivalTime[hasOutcome] <- exposures$timeToEvent[hasOutcome]
ratios <- c()
multiplier <- 1
hasNewOutcome <- c()
ParallelLogger::logTrace("Generating outcomes during time-at-risk")
while (round(abs(sum(hasNewOutcome) - correctedTargetCount)) > precision * correctedTargetCount) {
timeToNewOutcome <- round(rexp(
nrow(exposures),
multiplier * exposures$prediction * (effectSize - 1)
))
hasNewOutcome <- timeToNewOutcome < survivalTime
# Correct for censored time and outcomes:
observedRate <- (observedCount / sum(survivalTime))
timeCensoringAtNewEvents <- (sum(survivalTime[!hasNewOutcome]) + sum(timeToNewOutcome[hasNewOutcome] + 1))
requiredTotalEvents <- observedRate * timeCensoringAtNewEvents * effectSize
correctedTargetCount <- requiredTotalEvents - sum(hasOutcome[!hasNewOutcome])
if (correctedTargetCount < 0) {
warning("Correct target count became negative. ")
} else {
ratios <- c(ratios, sum(hasNewOutcome) / correctedTargetCount)
}
if (length(ratios) == 100) {
ParallelLogger::logDebug(paste(
"Unable to achieve target RR using model as is. Correcting multiplier by",
1 / mean(ratios),
"to force target"
))
multiplier <- mean(c(multiplier, multiplier * 1 / mean(ratios)))
ratios <- c()
}
if (is.na(round(abs(sum(hasNewOutcome) - correctedTargetCount)))) {
warning("NAs produced when generating outcomes")
}
}
rateBefore <- observedCount / sum(survivalTime)
rateAfter <- (sum(hasOutcome[!hasNewOutcome]) + sum(hasNewOutcome)) / (sum(survivalTime[!hasNewOutcome]) + sum(timeToNewOutcome[hasNewOutcome] + 1))
injectedRr <- rateAfter / rateBefore
newOutcomes <- data.frame(
personId = exposures$personId[hasNewOutcome],
cohortStartDate = exposures$cohortStartDate[hasNewOutcome],
timeToEvent = timeToNewOutcome[hasNewOutcome]
)
# Count outcomes during first episodes:
if (max(exposures$eraNumber) == 1) {
injectedRrFirstExposure <- injectedRr
} else {
firstHasNewOutcome <- hasNewOutcome[exposures$eraNumber == 1]
firstTimeToEvent <- timeToNewOutcome[exposures$eraNumber == 1]
firstHasOutcome <- hasOutcome[exposures$eraNumber == 1]
firstTime <- survivalTime[exposures$eraNumber == 1]
firstObservedCount <- sum(firstHasOutcome)
rateBeforeFirstExposure <- firstObservedCount / sum(firstTime)
rateAfterFirstExposure <- (sum(firstHasOutcome[!firstHasNewOutcome]) + sum(firstHasNewOutcome)) / (sum(firstTime[!firstHasNewOutcome]) + sum(firstTimeToEvent[firstHasNewOutcome] + 1))
injectedRrFirstExposure <- rateAfterFirstExposure / rateBeforeFirstExposure
}
if (addIntentToTreat) {
hasOutcomeItt <- exposures$yItt > 0
observedCountItt <- sum(hasOutcomeItt)
correctedTargetCount <- observedCountItt * (effectSize - 1)
survivalTimeItt <- exposures$daysObserved
survivalTimeItt[hasOutcomeItt] <- exposures$timeToEvent[hasOutcomeItt]
survivalTimeIttNew <- survivalTimeItt
survivalTimeIttNew[hasNewOutcome] <- timeToNewOutcome[hasNewOutcome]
rateBeforeItt <- observedCountItt / sum(survivalTimeItt)
ratios <- c()
multiplier <- rateBeforeItt / rateBefore # Adjust for fact that background rate may differ in ITT
hasNewOutcomeItt <- c()
ParallelLogger::logTrace("Generating outcomes for intent-to-treat")
while (round(abs(sum(hasNewOutcomeItt) + sum(hasNewOutcome) - correctedTargetCount)) > precision *
correctedTargetCount) {
timeToNewOutcomeItt <- round(rexp(nrow(exposures), multiplier * exposures$prediction * (effectSize - 1)))
hasNewOutcomeItt <- timeToNewOutcomeItt > exposures$daysAtRisk & timeToNewOutcomeItt < survivalTimeIttNew
# Correct for censored time and outcomes:
correctedTargetCount <- (observedCountItt / sum(survivalTimeItt)) *
(sum(survivalTimeIttNew[!hasNewOutcomeItt]) + sum(timeToNewOutcomeItt[hasNewOutcomeItt] + 1)) *
effectSize - sum(hasOutcomeItt[!hasNewOutcomeItt])
if (correctedTargetCount < 0) {
warning("Correct target count became negative. ")
} else {
ratios <- c(ratios, (sum(hasNewOutcomeItt) + sum(hasNewOutcome)) / correctedTargetCount)
}
if (length(ratios) == 100) {
ParallelLogger::logDebug(paste(
"Unable to achieve target RR using model as is. Correcting multiplier by",
1 / mean(ratios),
"to force target"
))
multiplier <- mean(c(multiplier, multiplier * 1 / mean(ratios)))
ratios <- c()
}
if (is.na(round(abs(sum(hasNewOutcome) - correctedTargetCount)))) {
ParallelLogger::logDebug("Problem")
}
}
rateAfterItt <- (sum(hasOutcomeItt[!hasNewOutcome & !hasNewOutcomeItt]) + sum(hasNewOutcome) + sum(hasNewOutcomeItt)) /
(sum(survivalTimeIttNew[!hasNewOutcomeItt]) + sum(timeToNewOutcomeItt[hasNewOutcomeItt] + 1))
injectedRrItt <- rateAfterItt / rateBeforeItt
newOutcomes <- rbind(newOutcomes, data.frame(
personId = exposures$personId[hasNewOutcomeItt],
cohortStartDate = exposures$cohortStartDate[hasNewOutcomeItt],
timeToEvent = timeToNewOutcomeItt[hasNewOutcomeItt]
))
} else {
injectedRrItt <- NA
}
return(list(
injectedRr = injectedRr,
injectedRrFirstExposure = injectedRrFirstExposure,
injectedRrItt = injectedRrItt,
newOutcomes = newOutcomes
))
}
.createSummaryFileName <- function(folder) {
name <- "summary"
name <- paste(name, ".rds", sep = "")
return(file.path(folder, name))
}
.predict <- function(betas, exposures, covariates, modelType) {
intercept <- betas$beta[1]
if (nrow(betas) == 1) {
betas <- data.frame()
} else {
betas <- betas[2:nrow(betas), ]
colnames(betas)[colnames(betas) == "id"] <- "covariateId"
}
if (nrow(betas) == 0) {
prediction <- data.frame(
rowId = exposures$rowId,
daysAtRisk = exposures$daysAtRisk,
value = exp(intercept)
)
} else {
covariateIdIsInteger64 <- covariates %>%
head(10000) %>%
pull(.data$covariateId) %>%
is("integer64")
if (!covariateIdIsInteger64) {
betas$covariateId <- as.numeric(betas$covariateId)
}
prediction <- covariates %>%
inner_join(select(betas, .data$beta, .data$covariateId), by = "covariateId", copy = TRUE) %>%
mutate(value = .data$beta * .data$covariateValue) %>%
group_by(.data$rowId) %>%
summarise(value = sum(.data$value, na.rm = TRUE)) %>%
select(.data$rowId, .data$value) %>%
ungroup() %>%
collect()
prediction <- prediction %>%
right_join(select(exposures, .data$rowId, .data$daysAtRisk), by = "rowId") %>%
mutate(value = coalesce(.data$value, 0) + intercept) %>%
mutate(value = exp(.data$value))
}
if (modelType == "poisson") {
prediction$value <- prediction$value * (prediction$daysAtRisk + 1)
}
prediction$daysAtRisk <- NULL
colnames(prediction)[colnames(prediction) == "value"] <- "prediction"
return(prediction)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.