Nothing
# @file uploadToDatabase.R
#
# Copyright 2025 Observational Health Data Sciences and Informatics
#
# This file is part of PatientLevelPrediction
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
insertRunPlpToSqlite <- function(
runPlp,
externalValidatePlp = NULL,
diagnosePlp = NULL) {
sqliteLocation <- tempdir()
rlang::check_installed("RSQLite")
# create sqlite database
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = "sqlite",
server = file.path(sqliteLocation, "databaseFile.sqlite")
)
createPlpResultTables(
connectionDetails = connectionDetails,
targetDialect = "sqlite",
resultSchema = "main",
deleteTables = TRUE,
createTables = TRUE,
tablePrefix = ""
)
addRunPlpToDatabase(
runPlp = runPlp,
connectionDetails = connectionDetails,
databaseSchemaSettings = createDatabaseSchemaSettings(resultSchema = "main"),
cohortDefinitions = NULL, # cohortDefinitions,
databaseList = NULL,
modelSaveLocation = sqliteLocation
)
# add validation results if entered
if (!is.null(externalValidatePlp)) {
if (inherits(x = externalValidatePlp, what = "list")) {
for (i in 1:length(externalValidatePlp)) {
tryCatch(
{
addRunPlpToDatabase(
runPlp = externalValidatePlp[[i]],
connectionDetails = connectionDetails,
databaseSchemaSettings = createDatabaseSchemaSettings(resultSchema = "main"),
cohortDefinitions = NULL, # cohortDefinitions,
databaseList = NULL,
modelSaveLocation = sqliteLocation
)
},
error = function(e) {
ParallelLogger::logError(e)
}
)
}
}
}
# add diagnosis results if entered
if (!is.null(diagnosePlp)) {
tryCatch(
{
addDiagnosePlpToDatabase(
diagnosePlp = diagnosePlp,
connectionDetails = connectionDetails,
databaseSchemaSettings = createDatabaseSchemaSettings(resultSchema = "main"),
cohortDefinitions = NULL, # cohortDefinitions,
databaseList = NULL
)
},
error = function(e) {
ParallelLogger::logError(e)
}
)
}
return(file.path(sqliteLocation, "databaseFile.sqlite"))
}
#' Create sqlite database with the results
#' @description
#' This function create an sqlite database with the PLP result schema and inserts all results
#'
#' @details
#' This function can be used upload PatientLevelPrediction results into an sqlite database
#'
#' @param resultLocation (string) location of directory where the main package results were saved
#' @param cohortDefinitions A set of one or more cohorts extracted using ROhdsiWebApi::exportCohortDefinitionSet()
#' @param databaseList A list created by \code{createDatabaseList} to specify the databases
#' @param sqliteLocation (string) location of directory where the sqlite database will be saved
#'
#' @return
#' Returns the location of the sqlite database file
#' @examplesIf rlang::is_installed("RSQLite") && rlang::is_installed("Eunomia") && rlang::is_installed("curl") && curl::has_internet()
#' \donttest{ \dontshow{ # takes too long }
#' plpData <- getEunomiaPlpData()
#' saveLoc <- file.path(tempdir(), "insertResultsToSqlite")
#' results <- runPlp(plpData, outcomeId = 3, analysisId = 1, saveDirectory = saveLoc)
#' databaseFile <- insertResultsToSqlite(saveLoc, cohortDefinitions = NULL,
#' sqliteLocation = file.path(saveLoc, "sqlite"))
#' # check there is some data in the database
#' library(DatabaseConnector)
#' connectionDetails <- createConnectionDetails(
#' dbms = "sqlite",
#' server = databaseFile)
#' conn <- connect(connectionDetails)
#' # All tables should be created
#' getTableNames(conn, databaseSchema = "main")
#' # There is data in the tables
#' querySql(conn, "SELECT * FROM main.model_designs limit 10")
#' # clean up
#' unlink(saveLoc, recursive = TRUE)
#' }
#' @export
insertResultsToSqlite <- function(
resultLocation,
cohortDefinitions = NULL,
databaseList = NULL,
sqliteLocation = file.path(resultLocation, "sqlite")) {
if (!dir.exists(sqliteLocation)) {
dir.create(sqliteLocation, recursive = TRUE)
}
rlang::check_installed("RSQLite")
# create sqlite database
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = "sqlite",
server = file.path(sqliteLocation, "databaseFile.sqlite")
)
# create tables if they dont exist
createPlpResultTables(
connectionDetails = connectionDetails,
targetDialect = "sqlite",
resultSchema = "main",
deleteTables = TRUE,
createTables = TRUE,
tablePrefix = ""
)
# run insert models
addMultipleRunPlpToDatabase(
connectionDetails = connectionDetails,
databaseSchemaSettings = createDatabaseSchemaSettings(resultSchema = "main"),
cohortDefinitions = cohortDefinitions,
databaseList = databaseList,
resultLocation = resultLocation,
modelSaveLocation = sqliteLocation
)
# run insert diagnosis
addMultipleDiagnosePlpToDatabase(
connectionDetails = connectionDetails,
databaseSchemaSettings = createDatabaseSchemaSettings(resultSchema = "main"),
cohortDefinitions = cohortDefinitions,
databaseList = databaseList,
resultLocation = resultLocation
)
return(file.path(sqliteLocation, "databaseFile.sqlite"))
}
#' Create the results tables to store PatientLevelPrediction models and results into a database
#' @description
#' This function executes a large set of SQL statements to create tables that can store models and results
#'
#' @details
#' This function can be used to create (or delete) PatientLevelPrediction result tables
#'
#' @param connectionDetails The database connection details
#' @param targetDialect The database management system being used
#' @param resultSchema The name of the database schema that the result tables will be created.
#' @param deleteTables If true any existing tables matching the PatientLevelPrediction result tables names will be deleted
#' @param createTables If true the PatientLevelPrediction result tables will be created
#' @param tablePrefix A string that appends to the PatientLevelPrediction result tables
#' @param tempEmulationSchema The temp schema used when the database management system is oracle
#'
#' @param testFile (used for testing) The location of an sql file with the table creation code
#'
#' @return
#' Returns NULL but creates or deletes the required tables in the specified database schema(s).
#' @examplesIf rlang::is_installed("RSQLite")
#' # create a sqlite database with the PatientLevelPrediction result tables
#' connectionDetails <- DatabaseConnector::createConnectionDetails(
#' dbms = "sqlite",
#' server = file.path(tempdir(), "test.sqlite"))
#' createPlpResultTables(connectionDetails = connectionDetails,
#' targetDialect = "sqlite",
#' resultSchema = "main",
#' tablePrefix = "plp_")
#' # delete the tables
#' createPlpResultTables(connectionDetails = connectionDetails,
#' targetDialect = "sqlite",
#' resultSchema = "main",
#' deleteTables = TRUE,
#' createTables = FALSE,
#' tablePrefix = "plp_")
#' # clean up the database file
#' unlink(file.path(tempdir(), "test.sqlite"))
#' @export
createPlpResultTables <- function(
connectionDetails,
targetDialect = "postgresql",
resultSchema,
deleteTables = TRUE,
createTables = TRUE,
tablePrefix = "",
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"),
testFile = NULL) {
conn <- DatabaseConnector::connect(connectionDetails = connectionDetails)
on.exit(DatabaseConnector::disconnect(conn))
tablesExists <- sum(tolower(getPlpResultTables()) %in% tolower(DatabaseConnector::getTableNames(connection = conn, databaseSchema = resultSchema)))
tablesExists <- tablesExists == length(getPlpResultTables())
if (!tablesExists) {
ParallelLogger::logInfo("All or some PLP result tables do not exist, tables being recreated")
if (deleteTables) {
ParallelLogger::logInfo("Deleting existing tables")
tableNames <- getPlpResultTables()
deleteTables(
conn = conn,
databaseSchema = resultSchema,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema,
tableNames = tableNames,
tablePrefix = tablePrefix
)
}
if (createTables) {
ParallelLogger::logInfo("Creating PLP results tables")
if (tablePrefix != "") {
tablePrefix <- paste0(toupper(gsub("_", "", gsub(" ", "", tablePrefix))), "_")
}
pathToSql <- system.file(
paste("sql/", targetDialect,
sep = ""
),
"PlpResultTables.sql",
package = "PatientLevelPrediction"
)
if (!file.exists(pathToSql)) {
# if no dbms specific file use sql_server
pathToSql <- system.file(
paste("sql/", "sql_server",
sep = ""
),
"PlpResultTables.sql",
package = "PatientLevelPrediction"
)
}
sql <- readChar(pathToSql, file.info(pathToSql)$size)
renderedSql <- SqlRender::render(
sql = sql[1],
my_schema = resultSchema,
string_to_append = tablePrefix
)
renderedSql <- SqlRender::translate(
sql = renderedSql,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema
)
DatabaseConnector::executeSql(conn, renderedSql)
}
} else {
ParallelLogger::logInfo("PLP result tables already exist")
}
if (!(createTables == FALSE && deleteTables == TRUE)) {
# then migrate, unless only deleting
ParallelLogger::logInfo("PLP result migration being applied")
migrateDataModel(
connectionDetails = connectionDetails, # input is connection
databaseSchema = resultSchema,
tablePrefix = tablePrefix
)
} else {
ParallelLogger::logInfo("Deleting PLP migration tables")
migrationTableNames <- c("MIGRATION", "PACKAGE_VERSION")
deleteTables(
conn = conn,
databaseSchema = resultSchema,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema,
tableNames = migrationTableNames,
tablePrefix = tablePrefix
)
}
}
#' Populate the PatientLevelPrediction results tables
#' @description
#' This function formats and uploads results that have been generated via an ATLAS prediction package into a database
#'
#' @details
#' This function can be used upload PatientLevelPrediction results into a database
#'
#' @param connectionDetails A connection details created by using the
#' function \code{createConnectionDetails} in the
#' \code{DatabaseConnector} package.
#' @param databaseSchemaSettings A object created by \code{createDatabaseSchemaSettings} with all the settings specifying the result tables
#' @param cohortDefinitions A set of one or more cohorts extracted using ROhdsiWebApi::exportCohortDefinitionSet()
#' @param databaseList (Optional) A list created by \code{createDatabaseList} to specify the databases
#' @param resultLocation (string) location of directory where the main package results were saved
#' @param resultLocationVector (only used when resultLocation is missing) a vector of locations with development or validation results
#' @param modelSaveLocation The location of the file system for saving the models in a subdirectory
#'
#' @return
#' Returns NULL but uploads all the results in resultLocation to the PatientLevelPrediction result tables in resultSchema
#'
#' @keywords internal
#' @noRd
addMultipleRunPlpToDatabase <- function(
connectionDetails,
databaseSchemaSettings = createDatabaseSchemaSettings(resultSchema = "main"),
cohortDefinitions,
databaseList = NULL,
resultLocation = NULL,
resultLocationVector,
modelSaveLocation) {
conn <- DatabaseConnector::connect(connectionDetails = connectionDetails)
on.exit(DatabaseConnector::disconnect(conn))
# for each development result add it to the database:
if (missing(resultLocationVector)) {
resultLocationVector <- getResultLocations(resultLocation)
}
if (length(resultLocationVector) == 0) {
ParallelLogger::logInfo("No results found")
return(NULL)
}
for (runPlpLocation in resultLocationVector) {
ParallelLogger::logInfo(paste0("Inserting result @ ", runPlpLocation, " into database"))
# TODO edit csv here
runPlp <- tryCatch(
{
PatientLevelPrediction::loadPlpResult(runPlpLocation)
},
error = function(e) {
ParallelLogger::logInfo(e)
return(NULL)
}
)
if (!is.null(runPlp)) {
ParallelLogger::logInfo("result loaded")
# Add runPlp to the database
tryCatch(
{
addRunPlpToDatabase(
runPlp = runPlp,
connectionDetails = connectionDetails,
databaseSchemaSettings = databaseSchemaSettings,
cohortDefinitions = cohortDefinitions,
databaseList = databaseList,
modelSaveLocation = modelSaveLocation
)
},
error = function(e) {
ParallelLogger::logInfo("result upload failed: ")
ParallelLogger::logInfo(e)
}
)
} # model not null
} # per model
} # end funct
#' Create the PatientLevelPrediction database result schema settings
#' @description
#' This function specifies where the results schema is and lets you pick a different schema for the cohorts and databases
#'
#' @details
#' This function can be used to specify the database settings used to upload PatientLevelPrediction results into a database
#'
#' @param resultSchema (string) The name of the database schema with the result tables.
#' @param tablePrefix (string) A string that appends to the PatientLevelPrediction result tables
#' @param targetDialect (string) The database management system being used
#' @param tempEmulationSchema (string) The temp schema used when the database management system is oracle
#' @param cohortDefinitionSchema (string) The name of the database schema with the cohort definition tables (defaults to resultSchema).
#' @param tablePrefixCohortDefinitionTables (string) A string that appends to the cohort definition tables
#' @param databaseDefinitionSchema (string) The name of the database schema with the database definition tables (defaults to resultSchema).
#' @param tablePrefixDatabaseDefinitionTables (string) A string that appends to the database definition tables
#'
#' @return
#' Returns a list of class 'plpDatabaseResultSchema' with all the database settings
#' @examples
#' createDatabaseSchemaSettings(resultSchema = "cdm",
#' tablePrefix = "plp_")
#'
#' @export
createDatabaseSchemaSettings <- function(
resultSchema = "main",
tablePrefix = "",
targetDialect = "sqlite",
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"),
cohortDefinitionSchema = resultSchema,
tablePrefixCohortDefinitionTables = tablePrefix,
databaseDefinitionSchema = resultSchema,
tablePrefixDatabaseDefinitionTables = tablePrefix) {
if (!inherits(x = resultSchema, what = "character")) {
stop("resultSchema must be a string")
}
if (tablePrefix != "") {
tablePrefix <- paste0(toupper(gsub("_", "", gsub(" ", "", tablePrefix))), "_")
}
if (tablePrefixCohortDefinitionTables != "") {
tablePrefixCohortDefinitionTables <- paste0(toupper(gsub("_", "", gsub(" ", "", tablePrefixCohortDefinitionTables))), "_")
}
if (tablePrefixDatabaseDefinitionTables != "") {
tablePrefixDatabaseDefinitionTables <- paste0(toupper(gsub("_", "", gsub(" ", "", tablePrefixDatabaseDefinitionTables))), "_")
}
result <- list(
resultSchema = resultSchema,
tablePrefix = tablePrefix,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema,
cohortDefinitionSchema = cohortDefinitionSchema, # could be removed
tablePrefixCohortDefinitionTables = tablePrefixCohortDefinitionTables, # could be removed
databaseDefinitionSchema = databaseDefinitionSchema, # could be removed
tablePrefixDatabaseDefinitionTables = tablePrefixDatabaseDefinitionTables # could be removed
)
class(result) <- "plpDatabaseResultSchema"
return(result)
}
#' Create a list with the database details and database meta data entries
#' @description
#' This function creates a list with the database details and database meta data entries used in the study
#'
#' @details
#' This function is used when inserting database details into the PatientLevelPrediction database results schema
#'
#' @param cdmDatabaseSchemas (string vector) A vector of the cdmDatabaseSchemas used in the study - if the schemas are not unique per database please also specify databaseRefId
#' @param cdmDatabaseNames Sharable names for the databases
#' @param databaseRefIds (string vector) Unique database identifiers - what you specified as cdmDatabaseId in \code{PatientLevelPrediction::createDatabaseDetails()} when developing the models
#'
#' @return
#' Returns a data.frame with the database details
#'
#' @keywords internal
#' @noRd
createDatabaseList <- function(
cdmDatabaseSchemas,
cdmDatabaseNames,
databaseRefIds = NULL) {
if (missing(cdmDatabaseSchemas)) {
stop("Need to specify cdmDatabaseSchemas")
}
if (is.null(databaseRefIds)) {
ParallelLogger::logInfo("No databaseRefId specified so using schema as unique database identifier")
databaseRefIds <- removeInvalidString(cdmDatabaseSchemas)
}
if (missing(cdmDatabaseNames)) {
cdmDatabaseNames <- removeInvalidString(cdmDatabaseSchemas)
}
result <- lapply(
1:length(cdmDatabaseSchemas),
function(i) {
list(
databaseDetails = list(
databaseMetaDataId = databaseRefIds[i]
),
databaseMetaData = list(
databaseId = databaseRefIds[i],
cdmSourceName = cdmDatabaseSchemas[i],
cdmSourceAbbreviation = cdmDatabaseNames[i],
cdmHolder = "", # could get this from CDM_source inside runPlp in future
sourceDesciption = "",
sourceDocumentReference = "",
cdmEtlReference = "",
sourceReleaseDate = "",
cdmReleaseDate = "",
cdmVersion = "",
vocabularyVersion = "",
maxObsPeriodEndDate = ""
)
)
}
)
names(result) <- databaseRefIds # cdmDatabaseSchemas
# using id as schema may not be unique
# id uses schema if it is not set
return(result)
}
#' Function to add the run plp (development or validation) to database
#' @description
#' This function adds a runPlp or external validation result into a database
#'
#' @details
#' This function is used when inserting results into the PatientLevelPrediction database results schema
#'
#' @param runPlp An object of class \code{runPlp} or class \code{externalValidatePlp}
#' @param connectionDetails A connection details created by using the
#' function \code{createConnectionDetails} in the
#' \code{DatabaseConnector} package.
#' @param databaseSchemaSettings A object created by \code{createDatabaseSchemaSettings} with all the settings specifying the result tables
#' @param cohortDefinitions A set of one or more cohorts extracted using ROhdsiWebApi::exportCohortDefinitionSet()
#' @param modelSaveLocation The location of the directory that models will be saved to
#' @param databaseList (Optional) If you want to change the database name then used \code{createDatabaseList} to specify the database settings but use the same cdmDatabaseId was model development/validation
#'
#' @return
#' Returns a data.frame with the database details
#'
#' @keywords internal
#' @noRd
addRunPlpToDatabase <- function(
runPlp,
connectionDetails,
databaseSchemaSettings,
cohortDefinitions,
modelSaveLocation,
databaseList = NULL) {
conn <- DatabaseConnector::connect(connectionDetails = connectionDetails)
on.exit(DatabaseConnector::disconnect(conn))
modelDesignId <- insertModelDesignInDatabase(
object = runPlp$model$modelDesign,
conn = conn,
databaseSchemaSettings = databaseSchemaSettings,
cohortDefinitions = cohortDefinitions
)
# Add model if runPlp
if (inherits(runPlp, "runPlp")) {
includesModel <- TRUE
developmentDatabase <- runPlp$model$trainDetails$developmentDatabase
validationDatabase <- runPlp$model$trainDetails$developmentDatabase
developmentDatabaseRefId <- runPlp$model$trainDetails$developmentDatabaseId
validationDatabaseRefId <- runPlp$model$trainDetails$developmentDatabaseId
populationSettings <- runPlp$model$modelDesign$populationSettings
targetId <- runPlp$model$modelDesign$targetId
outcomeId <- runPlp$model$modelDesign$outcomeId
restrictPlpDataSettings <- runPlp$model$modelDesign$restrictPlpDataSettings
modelDevelopment <- 1 # added
attrition <- runPlp$model$trainDetails$attrition
} else {
includesModel <- FALSE
developmentDatabase <- runPlp$model$validationDetails$developmentDatabase
validationDatabase <- runPlp$model$validationDetails$validationDatabase
developmentDatabaseRefId <- runPlp$model$validationDetails$developmentDatabaseId
validationDatabaseRefId <- runPlp$model$validationDetails$validationDatabaseId
populationSettings <- runPlp$model$validationDetails$populationSettings
targetId <- runPlp$model$validationDetails$targetId
outcomeId <- runPlp$model$validationDetails$outcomeId
restrictPlpDataSettings <- runPlp$model$validationDetails$restrictPlpDataSettings
modelDevelopment <- 0 # added
attrition <- runPlp$model$validationDetails$attrition
}
# Add databases
developmentDatabaseId <- addDatabase(
conn = conn,
databaseSchemaSettings = databaseSchemaSettings,
databaseList = databaseList,
databaseSchema = developmentDatabase,
databaseId = developmentDatabaseRefId
)
validationDatabaseId <- addDatabase(
conn = conn,
databaseSchemaSettings = databaseSchemaSettings,
databaseList = databaseList,
databaseSchema = validationDatabase,
databaseId = validationDatabaseRefId
)
# add nodel if the result contains it
if (includesModel) {
insertModelInDatabase(
model = runPlp$model,
conn = conn,
databaseSchemaSettings = databaseSchemaSettings,
databaseId = developmentDatabaseId,
modelDesignId = modelDesignId,
modelSaveLocation = modelSaveLocation
)
}
# get validation settings
validationTarId <- addTar(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
targetDialect = databaseSchemaSettings$targetDialect,
startDay = populationSettings$riskWindowStart,
startAnchor = populationSettings$startAnchor,
endDay = populationSettings$riskWindowEnd,
endAnchor = populationSettings$endAnchor,
tablePrefix = databaseSchemaSettings$tablePrefix,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
validationTargetId <- addCohort(
conn = conn,
resultSchema = databaseSchemaSettings$cohortDefinitionSchema,
targetDialect = databaseSchemaSettings$targetDialect,
cohortDefinition = getCohortDef(cohortDefinitions, targetId),
tablePrefix = databaseSchemaSettings$tablePrefixCohortDefinitionTables,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
validationOutcomeId <- addCohort(
conn = conn,
resultSchema = databaseSchemaSettings$cohortDefinitionSchema,
targetDialect = databaseSchemaSettings$targetDialect,
cohortDefinition = getCohortDef(cohortDefinitions, outcomeId),
tablePrefix = databaseSchemaSettings$tablePrefixCohortDefinitionTables,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
validationPopulationId <- addPopulationSetting(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
targetDialect = databaseSchemaSettings$targetDialect,
json = populationSettings,
tablePrefix = databaseSchemaSettings$tablePrefix,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
validationPlpDataId <- addPlpDataSetting(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
targetDialect = databaseSchemaSettings$targetDialect,
json = restrictPlpDataSettings,
tablePrefix = databaseSchemaSettings$tablePrefix,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
# Add performance
insertPerformanceInDatabase(
performanceEvaluation = runPlp$performanceEvaluation,
covariateSummary = runPlp$covariateSummary,
attrition = attrition,
executionDateTime = format(runPlp$executionSummary$ExecutionDateTime, format = "%Y-%m-%d"),
plpVersion = runPlp$executionSummary$PackageVersion$packageVersion,
conn = conn,
databaseSchemaSettings = databaseSchemaSettings,
modelDesignId = modelDesignId,
developmentDatabaseId = developmentDatabaseId,
validationDatabaseId = validationDatabaseId,
validationTarId = validationTarId,
validationPopulationId = validationPopulationId,
validationPlpDataId = validationPlpDataId,
validationTargetId = validationTargetId,
validationOutcomeId = validationOutcomeId,
modelDevelopment = modelDevelopment
)
return(invisible(NULL))
}
###################
# INSERT MODEL
####################
insertModelInDatabase <- function(
model,
conn,
databaseSchemaSettings,
databaseId,
modelDesignId,
modelSaveLocation) {
# save the model to the file system
modelLocation <- file.path(
modelSaveLocation,
"models",
paste0("folder-", modelDesignId, "-", databaseId)
)
if (!dir.exists(modelLocation)) {
dir.create(modelLocation, recursive = TRUE)
}
# savign all the model as preprocessing was too large
# for database
savePlpModel(
plpModel = model,
dirPath = modelLocation
)
# need hyperParamSearch for shiny app but the other parts
# are too large to store into the database
trainDetails <- list(hyperParamSearch = model$trainDetails$hyperParamSearch)
# create this function
modelId <- addModel(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
targetDialect = databaseSchemaSettings$targetDialect,
analysisId = ifelse(
is.null(model$trainDetails$analysisId),
"missing",
model$trainDetails$analysisId
),
modelDesignId = modelDesignId,
databaseId = databaseId,
modelType = model$trainDetails$modelName,
plpModelFile = modelLocation, # save the model to a location and add location here
trainDetails = as.character(ParallelLogger::convertSettingsToJson(trainDetails)),
preprocessing = "",
executionDateTime = format(model$trainDetails$trainingDate, format = "%Y-%m-%d"),
trainingTime = model$trainDetails$trainingTime,
intercept = ifelse(is.list(model$model) & attr(model, "saveType") != "xgboost", model$model$coefficients$betas[1], 0), # using the param useIntercept?
tablePrefix = databaseSchemaSettings$tablePrefix,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
ParallelLogger::logInfo(
paste0(
"modelId: ", modelId,
" inserted for modelDesignId ", modelDesignId,
"and databaseId ", databaseId
)
)
return(invisible(modelId))
}
addModel <- function(
conn,
resultSchema,
targetDialect,
tablePrefix,
analysisId,
modelDesignId,
databaseId,
modelType,
plpModelFile,
trainDetails,
preprocessing,
executionDateTime,
trainingTime,
intercept,
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) {
if (is.null(analysisId)) {
stop("analysisId is null")
}
if (is.null(modelDesignId)) {
stop("modelDesignId is null")
}
if (is.null(databaseId)) {
stop("databaseId is null")
}
if (is.null(plpModelFile)) {
stop("plpModelFile is null")
}
if (is.null(executionDateTime)) {
stop("executionDateTime is null")
}
if (is.null(intercept)) {
stop("intercept is null")
}
# process json to make it ordered...
# TODO
result <- checkTable(
conn = conn,
resultSchema = resultSchema,
tablePrefix = tablePrefix,
targetDialect = targetDialect,
tableName = "models",
columnNames = c(
"model_design_id",
"database_id"
),
values = c(
modelDesignId,
databaseId
),
tempEmulationSchema = tempEmulationSchema
)
if (nrow(result) == 0) {
# model
sql <- "INSERT INTO @my_schema.@string_to_appendmodels(
analysis_id,
model_design_id,
database_id,
model_type,
plp_model_file,
train_details,
preprocessing,
execution_date_time,
training_time,
intercept
) VALUES
('@analysis_id',
@model_design_id,
@database_id,
'@model_type',
'@plp_model_file',
'@train_details',
'@preprocessing',
'@execution_date_time',
'@training_time',
@intercept
)"
sql <- SqlRender::render(
sql,
my_schema = resultSchema,
analysis_id = analysisId,
model_design_id = modelDesignId,
database_id = databaseId,
model_type = modelType,
plp_model_file = plpModelFile,
train_details = trainDetails,
preprocessing = preprocessing,
execution_date_time = executionDateTime,
training_time = trainingTime,
intercept = intercept,
string_to_append = tablePrefix
)
sql <- SqlRender::translate(
sql,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql)
# getId of new
result <- checkTable(
conn = conn,
resultSchema = resultSchema,
tablePrefix = tablePrefix,
targetDialect = targetDialect,
tableName = "models",
columnNames = c(
"model_design_id",
"database_id"
),
values = c(
modelDesignId,
databaseId
),
tempEmulationSchema = tempEmulationSchema
)
}
return(result$modelId[1])
}
# ======================
# Helpers
# ======================
# get a vector with all the result table names
getPlpResultTables <- function() {
return(
c(
"CALIBRATION_SUMMARY",
"COVARIATE_SUMMARY",
"DEMOGRAPHIC_SUMMARY",
"EVALUATION_STATISTICS",
"PREDICTION_DISTRIBUTION",
"THRESHOLD_SUMMARY",
"ATTRITION", # new
"DIAGNOSTIC_SUMMARY",
"DIAGNOSTIC_PARTICIPANTS",
"DIAGNOSTIC_PREDICTORS",
"DIAGNOSTIC_OUTCOMES",
"DIAGNOSTIC_DESIGNS",
"DIAGNOSTICS", # new
"RECALIBRATIONS", # new
"PERFORMANCES",
"MODELS",
"MODEL_DESIGNS",
"MODEL_SETTINGS",
"COVARIATE_SETTINGS",
"POPULATION_SETTINGS",
"FEATURE_ENGINEERING_SETTINGS",
"SPLIT_SETTINGS",
"PLP_DATA_SETTINGS", # new
"SAMPLE_SETTINGS",
"TIDY_COVARIATES_SETTINGS", # new
"TARS",
"DATABASE_DETAILS",
"DATABASE_META_DATA",
"COHORT_DEFINITION",
"COHORTS"
)
)
}
getResultLocations <- function(resultLocation) {
# get the model locations...
resultLocs <- dir(
resultLocation,
pattern = "Analysis_",
full.names = TRUE
)
# automatically find Results folder, to handle both plpResult/ and validationResult/
analysisFolders <- list.files(resultLocation, full.names = TRUE)
resultLocs <- unlist(lapply(analysisFolders, function(folder) {
candidate <- list.files(folder, pattern = "^(plpResult|validationResult)$", full.names = TRUE)
if (length(candidate) > 0) candidate else NULL }))
if (dir.exists(file.path(resultLocation, "Validation"))) {
validationDatabases <- dir(file.path(resultLocation, "Validation"))
valLocs <- dir(
unlist(
lapply(
validationDatabases,
function(x) {
dir(file.path(resultLocation, "Validation", x),
pattern = "Analysis_",
full.names = TRUE
)
}
)
),
full.names = TRUE
)
resultLocs <- c(resultLocs, valLocs)
}
return(resultLocs)
}
deleteTables <- function(
conn,
databaseSchema,
targetDialect,
tempEmulationSchema,
tableNames,
tablePrefix) {
if (tablePrefix != "") {
tableNames <- tolower(paste0(gsub("_", "", gsub(" ", "", tablePrefix)), "_", tableNames))
}
alltables <- tolower(DatabaseConnector::getTableNames(
connection = conn,
databaseSchema = databaseSchema
))
for (tb in tableNames) {
if (tb %in% alltables) {
if (targetDialect != "sqlite") {
sql <- "TRUNCATE TABLE @my_schema.@table"
sql <- SqlRender::render(
sql,
my_schema = databaseSchema,
table = tb
)
sql <- SqlRender::translate(
sql,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql)
} else {
sql <- "DELETE FROM @my_schema.@table"
sql <- SqlRender::render(
sql,
my_schema = databaseSchema,
table = tb
)
sql <- SqlRender::translate(
sql,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql)
}
sql <- "DROP TABLE @my_schema.@table"
sql <- SqlRender::render(
sql,
my_schema = databaseSchema,
table = tb
)
sql <- SqlRender::translate(
sql,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql)
}
}
}
## Template Helpers
enc <- function(x) {
return(paste0("'", x, "'"))
}
cleanNum <- function(x) {
types <- unlist(lapply(1:ncol(x), function(i) class(x[, i])))
ids <- which(types %in% c("numeric", "integer"))
for (id in ids) {
okVals <- is.finite(x[, id])
if (sum(okVals) != length(okVals)) {
x[!okVals, id] <- NA
}
}
return(x)
}
checkTable <- function(conn,
resultSchema,
tablePrefix = "",
targetDialect,
tableName,
columnNames,
values,
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) {
vals <- paste0(paste0(columnNames, " = ", values), collapse = " and ")
sql <- "SELECT * from @my_schema.@string_to_append@table where @input_vals;"
sql <- SqlRender::render(sql,
my_schema = resultSchema,
table = tableName,
input_vals = vals,
string_to_append = tablePrefix
)
sql <- SqlRender::translate(sql,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema
)
result <- DatabaseConnector::querySql(conn, sql, snakeCaseToCamelCase = TRUE)
return(result)
}
checkJson <- function(conn,
resultSchema,
tablePrefix = "",
targetDialect,
tableName,
jsonColumnName,
id,
json,
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) {
sql <- "SELECT * from @my_schema.@string_to_append@table;"
sql <- SqlRender::render(sql,
my_schema = resultSchema,
table = tableName,
string_to_append = tablePrefix
)
sql <- SqlRender::translate(sql,
targetDialect = targetDialect,
tempEmulationSchema = tempEmulationSchema
)
result <- DatabaseConnector::querySql(conn, sql, snakeCaseToCamelCase = TRUE)
resultId <- NULL
if (nrow(result) > 0) {
colId <- result[, jsonColumnName] == json
if (sum(colId) > 0) {
resultId <- result[colId, id][1]
}
}
return(resultId)
}
getCohortDef <- function(cohortDefinitions, cohortId) {
if (!is.null(cohortDefinitions)) {
if (sum(cohortDefinitions$cohortId == cohortId) > 0) {
return(cohortDefinitions[cohortDefinitions$cohortId == cohortId, ])
}
}
return(
data.frame(
cohortId = cohortId,
cohortName = paste0("Cohort: ", cohortId),
json = "{}"
)
)
}
# adds json from package unless json is specified
addCohort <- function(
conn,
resultSchema,
targetDialect,
tablePrefix = "",
cohortDefinition, # this is the R data.frame of the cohortDefinition
tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) {
# make sure the json has been converted
json <- cohortDefinition$json
if (!inherits(x = json, what = "character")) {
ParallelLogger::logInfo("converting json to character")
json <- as.character(json) # now convert to character
}
# reduce the size to save
if (!targetDialect %in% c("sqlite", "postgres")) {
json <- substr(json, 1, 4000) # TESTING - FIX THIS [TODO]
}
# check whether cohort already in COHORT_DEFINITION table:
result <- checkTable(
conn = conn,
resultSchema = resultSchema,
tablePrefix = tablePrefix,
targetDialect = targetDialect,
tableName = "cohort_definition",
columnNames = c("cohort_name"),
values = c(paste0("'", gsub("'", "", cohortDefinition$cohortName), "'")),
tempEmulationSchema = tempEmulationSchema
)
addNew <- FALSE
if (nrow(result) > 0) {
addNew <- json %in% result$json
ParallelLogger::logInfo(paste0("json in jsons:", addNew))
}
if (addNew) {
cohortDefinitionId <- result$cohortDefinitionId[result$json %in% json]
ParallelLogger::logInfo(paste0("Cohort ", gsub("'", "", cohortDefinition$cohortName), " exists in cohort_definition with cohort id", result$cohortDefinitionId[result$json %in% json]))
} else {
ParallelLogger::logInfo(paste0("Adding cohort ", gsub("'", "", cohortDefinition$cohortName)))
data <- data.frame(
cohortName = gsub("'", "", cohortDefinition$cohortName),
cohortDefinitionId = cohortDefinition$cohortId,
json = json
)
DatabaseConnector::insertTable(
connection = conn,
databaseSchema = resultSchema,
tableName = paste0(tablePrefix, "cohort_definition"),
data = data,
dropTableIfExists = FALSE,
createTable = FALSE,
tempTable = FALSE,
progressBar = TRUE,
camelCaseToSnakeCase = TRUE,
tempEmulationSchema = tempEmulationSchema
)
# now check and get id
result <- checkTable(
conn = conn,
resultSchema = resultSchema,
tablePrefix = tablePrefix,
targetDialect = targetDialect,
tableName = "cohort_definition",
columnNames = c("cohort_name", "cohort_definition_id"),
values = c(paste0("'", gsub("'", "", cohortDefinition$cohortName), "'"), cohortDefinition$cohortId),
tempEmulationSchema = tempEmulationSchema
)
jsonInd <- result$json %in% json
cohortDefinitionId <- result$cohortDefinitionId[jsonInd]
}
# now add to cohorts table
result <- checkTable(
conn = conn,
resultSchema = resultSchema,
tablePrefix = tablePrefix,
targetDialect = targetDialect,
tableName = "cohorts",
columnNames = c("cohort_definition_id", "cohort_name"),
values = c(cohortDefinitionId, paste0("'", gsub("'", "", cohortDefinition$cohortName), "'")),
tempEmulationSchema = tempEmulationSchema
)
if (nrow(result) > 0) {
ParallelLogger::logInfo(paste0("Cohort ", gsub("'", "", cohortDefinition$cohortName), " exists in cohorts with cohort id", result$cohortId))
} else {
ParallelLogger::logInfo(paste0("Adding cohort ", gsub("'", "", cohortDefinition$cohortName)))
data <- data.frame(
cohortDefinitionId = cohortDefinitionId,
cohortName = gsub("'", "", cohortDefinition$cohortName)
)
DatabaseConnector::insertTable(
connection = conn,
databaseSchema = resultSchema,
tableName = paste0(tablePrefix, "cohorts"),
data = data,
dropTableIfExists = FALSE,
createTable = FALSE,
tempTable = FALSE,
progressBar = TRUE,
camelCaseToSnakeCase = TRUE,
tempEmulationSchema = tempEmulationSchema
)
# now check and get id
result <- checkTable(
conn = conn,
resultSchema = resultSchema,
tablePrefix = tablePrefix,
targetDialect = targetDialect,
tableName = "cohorts",
columnNames = c("cohort_definition_id", "cohort_name"),
values = c(cohortDefinitionId, paste0("'", gsub("'", "", cohortDefinition$cohortName), "'")),
tempEmulationSchema = tempEmulationSchema
)
}
return(result$cohortId[1])
}
addDatabase <- function(
conn,
databaseSchemaSettings,
databaseList = NULL, # list with the database details
databaseId = NULL, # the database id
databaseSchema # the database schema
) {
if (is.null(databaseId)) {
databaseId <- removeInvalidString(databaseSchema)
}
# get the database tables for the databaseId
if (is.null(databaseList)) {
databaseDataFrames <- createDatabaseList(cdmDatabaseSchemas = databaseSchema, databaseRefIds = databaseId)[[1]]
} else {
if (databaseId %in% names(databaseList)) {
databaseDataFrames <- databaseList[[as.character(databaseId)]]
} else {
ParallelLogger::logInfo("database ID not found in databaseList so added new entry")
databaseDataFrames <- createDatabaseList(cdmDatabaseSchemas = databaseSchema, databaseRefIds = databaseId)[[1]]
}
}
# check the database_meta_data
result <- checkTable(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
tablePrefix = databaseSchemaSettings$tablePrefix,
targetDialect = databaseSchemaSettings$targetDialect,
tableName = "database_meta_data",
columnNames = c("database_id"),
values = c(paste0("'", databaseDataFrames$databaseMetaData$databaseId, "'")),
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
if (nrow(result) > 0) {
ParallelLogger::logInfo(paste0("Database meta data ", databaseDataFrames$database_meta_data$databaseId, " already exists"))
} else {
sql <- "INSERT INTO @my_schema.@string_to_appenddatabase_meta_data(
database_id,
cdm_source_name,
cdm_source_abbreviation
)
VALUES ('@database_id','@cdm_source_name', '@cdm_source_abbreviation');"
sql <- SqlRender::render(
sql,
my_schema = databaseSchemaSettings$resultSchema,
database_id = databaseDataFrames$databaseMetaData$databaseId,
cdm_source_name = databaseDataFrames$databaseMetaData$cdmSourceName,
cdm_source_abbreviation = databaseDataFrames$databaseMetaData$cdmSourceAbbreviation,
string_to_append = databaseSchemaSettings$tablePrefix
)
sql <- SqlRender::translate(sql,
targetDialect = databaseSchemaSettings$targetDialect,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql)
result <- checkTable(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
tablePrefix = databaseSchemaSettings$tablePrefix,
targetDialect = databaseSchemaSettings$targetDialect,
tableName = "database_meta_data",
columnNames = c(
"database_id", "cdm_source_name",
"cdm_source_abbreviation"
),
values = c(
paste0("'", databaseDataFrames$databaseMetaData$databaseId, "'"),
paste0("'", databaseDataFrames$databaseMetaData$cdmSourceName, "'"),
paste0("'", databaseDataFrames$databaseMetaData$cdmSourceAbbreviation, "'")
),
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
}
result <- checkTable(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
tablePrefix = databaseSchemaSettings$tablePrefix,
targetDialect = databaseSchemaSettings$targetDialect,
tableName = "database_details",
columnNames = c("database_meta_data_id"),
values = c(paste0("'", databaseDataFrames$databaseDetails$databaseMetaDataId, "'")),
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
if (nrow(result) > 0) {
ParallelLogger::logInfo(paste0("Database", result$databaseId, " already exists"))
} else {
sql <- "INSERT INTO @my_schema.@string_to_appenddatabase_details(database_meta_data_id)
VALUES ('@database_meta_data_id');"
sql <- SqlRender::render(sql,
my_schema = databaseSchemaSettings$resultSchema,
database_meta_data_id = databaseDataFrames$databaseDetails$databaseMetaDataId,
string_to_append = databaseSchemaSettings$tablePrefix
)
sql <- SqlRender::translate(sql,
targetDialect = databaseSchemaSettings$targetDialect,
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
DatabaseConnector::executeSql(conn, sql)
result <- checkTable(
conn = conn,
resultSchema = databaseSchemaSettings$resultSchema,
tablePrefix = databaseSchemaSettings$tablePrefix,
targetDialect = databaseSchemaSettings$targetDialect,
tableName = "database_details",
columnNames = c("database_meta_data_id"),
values = c(paste0("'", databaseDataFrames$databaseDetails$databaseMetaDataId, "'")),
tempEmulationSchema = databaseSchemaSettings$tempEmulationSchema
)
}
return(result$databaseId[1])
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.