#' Function to insert results into a database from csvs
#' @description
#' This function converts a folder with csv results into plp objects and loads them into a plp result database
#'
#' @details
#' The user needs to have plp csv results in a single folder and an existing plp result database
#'
#' @param csvFolder The location to the csv folder with the plp results
#' @param connectionDetails A connection details for the plp results database that the csv results will be inserted into
#' @param databaseSchemaSettings A object created by \code{createDatabaseSchemaSettings} with all the settings specifying the result tables to insert the csv results into
#' @param modelSaveLocation The location to save any models from the csv folder - this should be the same location you picked when inserting other models into the database
#' @param csvTableAppend A string that appends the csv file names
#'
#' @return
#' Returns a data.frame indicating whether the results were inported into the database
#'
#' @export
insertCsvToDatabase <- function(
csvFolder,
connectionDetails,
databaseSchemaSettings,
modelSaveLocation,
csvTableAppend = ''
){
ensure_installed('readr')
ParallelLogger::logInfo('Starting input checks')
csvFileNames <- tryCatch({
dir(csvFolder, pattern = 'csv')
},
error = function(e){ParallelLogger::logInfo(e); return(NULL)}
)
if(is.null(csvFileNames)){
return(invisible(NULL))
}
if(!missing(csvTableAppend)){
csvFileNamesNoAppend <- sub(csvTableAppend, '', csvFileNames)
} else{
csvFileNamesNoAppend <- csvFileNames
}
# check all tables are in folder
# settings/resultsDataModelSpecification.csv table_name
resultNames <- paste0(unique(
readr::read_csv(
system.file(
'settings',
'resultsDataModelSpecification.csv',
package = "PatientLevelPrediction"
)
)$table_name
), '.csv')
if(sum(csvFileNamesNoAppend %in% resultNames) != length(resultNames)){
missingTables <- paste(resultNames[!resultNames %in% csvFileNamesNoAppend], collapse = ',')
ParallelLogger::logInfo(paste0('CSV folder missing these tables: ', missingTables))
return(invisible(NULL))
}
alltables <- getTableNamesPlp(
connectionDetails = connectionDetails,
databaseSchema = databaseSchemaSettings$resultSchema
)
if(!tolower(paste0(databaseSchemaSettings$tablePrefix,'PERFORMANCES')) %in% alltables){
ParallelLogger::logInfo(
paste0(
'performance table: ',paste0(toupper(databaseSchemaSettings$tablePrefix),'PERFORMANCES'),' not found, result database only contains ',
paste(alltables, collapse = ',')
)
)
return(invisible(NULL))
}
ParallelLogger::logInfo('Input checks passed')
ParallelLogger::logInfo('Extracting cohort definitions')
# create cohortDefinitions:
cohortDefinitions <- extractCohortDefinitionsCSV(
csvFolder = csvFolder
)
ParallelLogger::logInfo('Extracting database details')
# create databaseList
databaseList <- extractDatabaseListCSV(
csvFolder = csvFolder
)
ParallelLogger::logInfo('Extracting performance ids')
performanceIds <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('performances', csvFileNames)]))$performance_id
if(length(performanceIds) > 0 ){
for(performanceId in performanceIds){
ParallelLogger::logInfo(
paste0(
'Converting and inserting performance id',
performanceId
)
)
# convert to runPlp
runPlp <- extractObjectFromCsv(
performanceId = performanceId,
csvFolder = csvFolder
)
# load into database
addRunPlpToDatabase(
runPlp = runPlp,
connectionDetails = connectionDetails,
databaseSchemaSettings = databaseSchemaSettings,
cohortDefinitions = cohortDefinitions,
modelSaveLocation = modelSaveLocation,
databaseList = databaseList
)
}
}
diagnosticIds <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('diagnostics', csvFileNames)]))$diagnostic_id
if(length(diagnosticIds) > 0){
for(diagnosticId in diagnosticIds){
ParallelLogger::logInfo(
paste0(
'Converting and inserting diagnostic id',
diagnosticId
)
)
diagnosePlp <- extractDiagnosticFromCsv(
diagnosticId = diagnosticId,
csvFolder = csvFolder
)
if(!is.null(diagnosePlp)){
tryCatch(
{
addDiagnosePlpToDatabase(
diagnosePlp = diagnosePlp,
connectionDetails = connectionDetails,
databaseSchemaSettings = databaseSchemaSettings,
cohortDefinitions = cohortDefinitions,
databaseList = databaseList
)
}, error = function(e){ParallelLogger::logError(e)}
)
}
}
}
return(TRUE)
}
extractCohortDefinitionsCSV <- function(
csvFolder
){
# cohorts: cohort_id, cohort_definition_id, cohort_name
# cohort_definition: cohort_definition_id cohort_name description json sql_command
cohortDefinitionName <- dir(csvFolder, pattern = 'cohort_definition.csv')
cohort_definition <- readr::read_csv(file.path(csvFolder, cohortDefinitionName))
result <- data.frame(
cohortId = cohort_definition$cohort_definition_id,
cohortName = cohort_definition$cohort_name,
json = cohort_definition$json,
sql = cohort_definition$sql_command
)
return(result)
}
extractDatabaseListCSV <- function(
csvFolder
){
# database_meta_data: database_id cdm_source_name cdm_source_abbreviation
# database_details: database_id database_meta_data_id
databaseMetaDataName <- dir(csvFolder, pattern = 'database_meta_data.csv')
databaseMetaData <- readr::read_csv(file.path(csvFolder, databaseMetaDataName))
databaseList <- createDatabaseList(
cdmDatabaseSchemas = databaseMetaData$cdm_source_name,
cdmDatabaseNames = databaseMetaData$cdm_source_abbreviation,
databaseRefIds = databaseMetaData$database_id
)
return(databaseList)
}
getModelDesignSettingTable <- function(modeldesignsRow){
result <- data.frame(
tableName = c('cohorts', 'cohorts',
'population_settings', 'plp_data_settings',
'model_settings', 'covariate_settings', 'sample_settings',
'split_settings', 'feature_engineering_settings',
'tidy_covariates_settings'),
idColumn = c('cohort_id', 'cohort_id',
'population_setting_id', 'plp_data_setting_id',
'model_setting_id', 'covariate_setting_id', 'sample_setting_id',
'split_setting_id', 'feature_engineering_setting_id',
'tidy_covariates_setting_id'),
jsonColumn = c('cohort_definition_id', 'cohort_definition_id',
'population_settings_json', 'plp_data_settings_json',
'model_settings_json', 'covariate_settings_json', 'sample_settings_json',
'split_settings_json', 'feature_engineering_settings_json',
'tidy_covariates_settings_json'),
convertJson = c(rep(F,2), rep(T, 8)),
value = c(modeldesignsRow$target_id, modeldesignsRow$outcome_id,
modeldesignsRow$population_setting_id, modeldesignsRow$plp_data_setting_id,
modeldesignsRow$model_setting_id, modeldesignsRow$covariate_setting_id, modeldesignsRow$sample_setting_id,
modeldesignsRow$split_setting_id, modeldesignsRow$feature_engineering_setting_id ,
modeldesignsRow$tidy_covariates_setting_id),
modelDesignInput = c('targetId', 'outcomeId',
'populationSettings', 'restrictPlpDataSettings',
'modelSettings', 'covariateSettings', 'sampleSettings',
'splitSettings', 'featureEngineeringSettings',
'preprocessSettings')
)
return(result)
}
getModelDesignCsv <- function(
modelDesignSettingTable,
csvFolder = csvFolder
) {
csvFileNames <- dir(csvFolder, pattern = '.csv')
result <- list()
for(i in 1:nrow(modelDesignSettingTable)){
table <- readr::read_csv(file.path(csvFolder, csvFileNames[grep(modelDesignSettingTable$tableName[i], csvFileNames)]))
ind <- table[modelDesignSettingTable$idColumn[i]] == modelDesignSettingTable$value[i]
result[[i]] <- table[ind,][modelDesignSettingTable$jsonColumn[i]]
if(modelDesignSettingTable$convertJson[i]){
result[[i]] <- ParallelLogger::convertJsonToSettings(as.character(result[[i]]))
} else{
# ids need to be integer
result[[i]] <- as.double(result[[i]])
}
}
names(result) <- modelDesignSettingTable$modelDesignInput
modelDesign <- do.call(what = PatientLevelPrediction::createModelDesign, args = result)
return(modelDesign)
}
getPerformanceEvaluationCsv <- function(
performanceId,
csvFolder
){
csvFileNames <- dir(csvFolder, pattern = '.csv')
result <- list(
evaluationStatistics = tryCatch(
{
res <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('evaluation_statistics', csvFileNames)])) %>%
dplyr::filter(.data$performance_id == !!performanceId) %>%
dplyr::select(-"performance_id");
colnames(res) <- SqlRender::snakeCaseToCamelCase( colnames(res));
res
},
error = function(e){ParallelLogger::logInfo(e); return(NULL)}
),
thresholdSummary = tryCatch({
res <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('threshold_summary', csvFileNames)])) %>%
dplyr::filter(.data$performance_id == !!performanceId) %>%
dplyr::select(-"performance_id");
colnames(res) <- SqlRender::snakeCaseToCamelCase( colnames(res));
res
},
error = function(e){ParallelLogger::logInfo(e); return(NULL)}
),
calibrationSummary = tryCatch({
res <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('calibration_summary', csvFileNames)])) %>%
dplyr::filter(.data$performance_id == !!performanceId) %>%
dplyr::select(-"performance_id");
colnames(res) <- SqlRender::snakeCaseToCamelCase( colnames(res));
res
},
error = function(e){ParallelLogger::logInfo(e); return(NULL)}
),
demographicSummary = tryCatch({
res <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('demographic_summary', csvFileNames)])) %>%
dplyr::filter(.data$performance_id == !!performanceId) %>%
dplyr::select(-"performance_id");
colnames(res) <- SqlRender::snakeCaseToCamelCase( colnames(res));
res
},
error = function(e){ParallelLogger::logInfo(e); return(NULL)}
),
predictionDistribution = tryCatch({
res <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('prediction_distribution', csvFileNames)])) %>%
dplyr::filter(.data$performance_id == !!performanceId) %>%
dplyr::select(-"performance_id");
colnames(res) <- SqlRender::snakeCaseToCamelCase( colnames(res));
res
},
error = function(e){ParallelLogger::logInfo(e); return(NULL)}
)
)
return(result)
}
extractObjectFromCsv <- function(
performanceId,
csvFolder
){
csvFileNames <- dir(csvFolder, pattern = '.csv')
# get the model design
# performance_id model_design_id development_database_id validation_database_id target_id outcome_id tar_id plp_data_setting_id population_setting_id model_development execution_date_time plp_version
performances <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('performances', csvFileNames)]))
poi <- performances[performances$performance_id == performanceId,,]
modelDesignId <- poi$model_design_id
modeldesigns <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('model_designs', csvFileNames)]))
# model_design_id target_id outcome_id tar_id plp_data_setting_id population_setting_id model_setting_id covariate_setting_id sample_setting_id split_setting_id feature_engineering_setting_id tidy_covariates_setting_id
modeldesigns <- modeldesigns[modeldesigns$model_design_id == modelDesignId,,]
modelDesignSettingTable <- getModelDesignSettingTable(
modeldesignsRow = modeldesigns
)
modelDesign <- getModelDesignCsv(
modelDesignSettingTable = modelDesignSettingTable,
csvFolder = csvFolder
)
covariateSummary <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('covariate_summary', csvFileNames)])) %>%
dplyr::filter(.data$performance_id == !!poi$performance_id) %>%
dplyr::select(-"performance_id")
colnames(covariateSummary) <- SqlRender::snakeCaseToCamelCase(colnames(covariateSummary))
performanceEvaluation <- getPerformanceEvaluationCsv(
performanceId = poi$performance_id,
csvFolder = csvFolder
)
modelMissing <- F
if(poi$model_development == 1){
modelsName <- dir(csvFolder, pattern = 'models.csv')
models <- readr::read_csv(file.path(csvFolder, modelsName))
models <- models %>%
dplyr::filter(.data$model_design_id == !!poi$model_design_id ) %>%
dplyr::filter(.data$database_id == !!poi$development_database_id)
modelLoc <- strsplit(x = models$plp_model_file, split = '/')[[1]][length(strsplit(x = models$plp_model_file, split = '/')[[1]])]
plpModel <- tryCatch({
PatientLevelPrediction::loadPlpModel(file.path(csvFolder, 'models', modelLoc))
},
error = function(e){ParallelLogger::logInfo(e); return(NULL)}
)
resultClass <- 'runPlp'
if(is.null(modelLoc)){
ParallelLogger::logInfo('Models missing from csv folder - just adding performance')
modelMissing <- T
}
}
if(poi$model_development == 0 | modelMissing){
# database_details: database_id database_meta_data_id
databaseMetaDataName <- dir(csvFolder, pattern = 'database_meta_data.csv')
databaseMetaData <- readr::read_csv(file.path(csvFolder, databaseMetaDataName))
databaseDetailsName <- dir(csvFolder, pattern = 'database_details.csv')
databaseDetails <- readr::read_csv(file.path(csvFolder, databaseDetailsName))
databases <- merge(databaseDetails, databaseMetaData, by.x = 'database_meta_data_id', by.y = 'database_id')
dev <- databases[databases$database_id == poi$development_database_id,,]
val <- databases[databases$database_id == poi$validation_database_id,,]
developmentDatabase <- dev$cdm_source_name
developmentDatabaseId <- dev$database_meta_data_id
validationDatabase <- val$cdm_source_name
validationDatabaseId <- val$database_meta_data_id
attritionName <- dir(csvFolder, pattern = 'attrition.csv')
attrition <- readr::read_csv(file.path(csvFolder, attritionName)) %>%
dplyr::filter(.data$performance_id == !!poi$performance_id) %>%
dplyr::select(-"performance_id")
colnames(attrition) <- SqlRender::snakeCaseToCamelCase(colnames(attrition))
cohortsName <- dir(csvFolder, pattern = 'cohorts.csv')
cohorts <- readr::read_csv(file.path(csvFolder, cohortsName))
plpDataSetName <- dir(csvFolder, pattern = 'plp_data_settings.csv')
plpDataSet <- readr::read_csv(file.path(csvFolder, plpDataSetName))
popSetName <- dir(csvFolder, pattern = 'population_settings.csv')
popSet <- readr::read_csv(file.path(csvFolder, popSetName))
# get the model
plpModel <- list(
model = 'external validation of model',
modelDesign = modelDesign,
validationDetails = list(
analysisId = '',
analysisSource = '',
developmentDatabase = developmentDatabase,
developmentDatabaseId = developmentDatabaseId,
validationDatabase = validationDatabase,
validationDatabaseId = validationDatabaseId,
populationSettings = ParallelLogger::convertJsonToSettings(
as.character(
popSet %>%
dplyr::filter(.data$population_setting_id == !!poi$population_setting_id) %>%
dplyr::select("population_settings_json")
)
),
restrictPlpDataSettings = ParallelLogger::convertJsonToSettings(
as.character(
plpDataSet %>%
dplyr::filter(.data$plp_data_setting_id == !!poi$plp_data_setting_id) %>%
dplyr::select("plp_data_settings_json")
)
),
outcomeId = as.double(
cohorts %>%
dplyr::filter(.data$cohort_id == !!poi$outcome_id) %>%
dplyr::select("cohort_definition_id")
),
targetId = as.double(
cohorts %>%
dplyr::filter(.data$cohort_id == !!poi$target_id) %>%
dplyr::select("cohort_definition_id")
),
attrition = attrition
)
)
attr(plpModel, "predictionFunction") <- 'none'
attr(plpModel, "saveType") <- 'RtoJson'
class(plpModel) <- 'plpModel'
resultClass <- 'externalValidatePlp'
}
result <- list(
executionSummary = list(
PackageVersion = list(
packageVersion = poi$plp_version
),
#TotalExecutionElapsedTime = ,
ExecutionDateTime = poi$execution_date_time
),
model = plpModel,
performanceEvaluation = performanceEvaluation,
covariateSummary = covariateSummary,
analysisRef = list(
analysisId = ''
)
)
class(result) <- resultClass
# return the object
return(result)
}
extractDiagnosticFromCsv <- function(
diagnosticId,
csvFolder
){
# diagnostic_id model_design_id database_id execution_date_time
csvFileNames <- dir(csvFolder, pattern = '.csv')
# get the model design
# performance_id model_design_id development_database_id validation_database_id target_id outcome_id tar_id plp_data_setting_id population_setting_id model_development execution_date_time plp_version
diagnostics <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('diagnostics', csvFileNames)]))
if(length(diagnostics) == 0){
ParallelLogger::logInfo('No diagnostics in csv results')
return(NULL)
}
doi <- diagnostics[diagnostics$diagnostic_id == diagnosticId,,]
if(nrow(doi) == 0){
ParallelLogger::logInfo('No diagnostics in csv results with specified diagnosticId')
return(NULL)
}
modelDesignId <- doi$model_design_id
modeldesigns <- readr::read_csv(file.path(csvFolder, csvFileNames[grep('model_designs', csvFileNames)]))
# model_design_id target_id outcome_id tar_id plp_data_setting_id population_setting_id model_setting_id covariate_setting_id sample_setting_id split_setting_id feature_engineering_setting_id tidy_covariates_setting_id
modeldesigns <- modeldesigns[modeldesigns$model_design_id == modelDesignId,,]
modelDesignSettingTable <- getModelDesignSettingTable(
modeldesignsRow = modeldesigns
)
modelDesign <- getModelDesignCsv(
modelDesignSettingTable = modelDesignSettingTable,
csvFolder = csvFolder
)
databaseMetaDataName <- dir(csvFolder, pattern = 'database_meta_data.csv')
databaseMetaData <- readr::read_csv(file.path(csvFolder, databaseMetaDataName))
databaseDetailsName <- dir(csvFolder, pattern = 'database_details.csv')
databaseDetails <- readr::read_csv(file.path(csvFolder, databaseDetailsName))
databases <- merge(databaseDetails, databaseMetaData, by.x = 'database_meta_data_id', by.y = 'database_id')
db <- databases[databases$database_id == doi$database_id]
databaseSchema <- db$cdm_source_name
databaseId <- db$database_meta_data_id
outcomesName <- dir(csvFolder, pattern = 'diagnostic_outcomes.csv')
outcomes <- readr::read_csv(file.path(csvFolder, outcomesName)) %>%
dplyr::filter(.data$diagnostic_id == !! diagnosticId) %>%
dplyr::select(-"diagnostic_id")
colnames(outcomes) <- SqlRender::snakeCaseToCamelCase(colnames(outcomes))
predictorsName <- dir(csvFolder, pattern = 'diagnostic_predictors.csv')
predictors <- readr::read_csv(file.path(csvFolder, predictorsName)) %>%
dplyr::filter(.data$diagnostic_id == !! diagnosticId) %>%
dplyr::select(-"diagnostic_id")
colnames(predictors) <- SqlRender::snakeCaseToCamelCase(colnames(predictors))
participantsName <- dir(csvFolder, pattern = 'diagnostic_participants.csv')
participants <- readr::read_csv(file.path(csvFolder, participantsName)) %>%
dplyr::filter(.data$diagnostic_id == !! diagnosticId) %>%
dplyr::select(-"diagnostic_id")
colnames(participants) <- SqlRender::snakeCaseToCamelCase(colnames(participants))
summaryName <- dir(csvFolder, pattern = 'diagnostic_summary.csv')
summary <- readr::read_csv(file.path(csvFolder, summaryName)) %>%
dplyr::filter(.data$diagnostic_id == !! diagnosticId) %>%
dplyr::select(-"diagnostic_id")
colnames(summary) <- SqlRender::snakeCaseToCamelCase(colnames(summary))
result <- list(
summary = summary,
participants = participants,
predictors = predictors,
outcomes = outcomes,
designs = NULL,
modelDesign = modelDesign,
databaseSchema = databaseSchema,
databaseId = databaseId
)
class(result) <- 'diagnosePlp'
return(result)
}
getTableNamesPlp <- function(
connectionDetails,
databaseSchema
){
# check some plp tables exists in databaseSchemaSettings
conn <- DatabaseConnector::connect(connectionDetails)
on.exit(DatabaseConnector::disconnect(conn))
result <- DatabaseConnector::getTableNames(
connection = conn,
databaseSchema = databaseSchema
)
return(tolower(result))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.