# @file CdmAtlasCutover
#
# Copyright 2019 Observational Health Data Sciences and Informatics
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @author Observational Health Data Sciences and Informatics
# @author Ajit Londhe
#' @title buildCdmSource
#' @author Ajit Londhe
#' @details
#' Creates a CDM source object for use in the package with necessary connection information.
#' If using \code{\link{insertCdmSources}} then connectionString will need to be defined.
#' @examples
#' ## When building a CDM source to insert, and to make it top priority:
#'
#' buildCdmSource(sourceKey = "TestDb_V123", sourceName = "Test Database v123", dbms = "redshift", cdmDatabaseSchema = "cdm",
#' resultsDatabaseSchema = "ohdsi_results", vocabDatabaseSchema = "cdm", priority = 1,
#' connectionString = "jdbc:redshift://some-redshift-cluster:5439/testdb_v123?user=some-user&password=some-password")
#' @return
#' A CDM source object with the necessary parameters to fulfill SOURCE and SOURCE_DAIMON
#'
#' @param sourceKey A unique string identifier for the data source
#' @param sourceName The name of the CDM source
#' @param dbms The dbms of the CDM source
#' @param cdmDatabaseSchema The name of the schema where CDM data sits
#' @param resultsDatabaseSchema The name of the schema where results data sits
#' @param vocabDatabaseSchema The name of the schema where the vocabulary sits
#' @param sourceId An optional value to pre-specify the database index identifier for the data source.
#' If left NULL, this is either auto-generated by the Repo database, or the next available
#' source_id is obtained.
#' @param connectionString The JDBC connection string to the CDM source; REQUIRED if trying to insert a CDM source
#' (\code{\link{insertCdmSources}}), unnecessary when trying to remove a CDM source
#' (\code{\link{removeCdmSources}})
#' @param priority 1 = the source become the one Atlas uses by default (e.g. to show record counts)
#' 0 = the source will not become the one Atlas uses by default (e.g. to show record counts)
#' @param user (OPTIONAL) The user name for the connection to the CDM. Only used if encryption of source credentials is enabled.
#' @param password (OPTIONAL) The password for the connection to the CDM. Only used if encryption of source credentials is enabled.
#'
#' @param connectionDetails A connectionDetails object created using DatabaseConnector of this source
#' @export
buildCdmSource <- function(sourceKey,
sourceName = NULL,
dbms = NULL,
cdmDatabaseSchema = NULL,
resultsDatabaseSchema = NULL,
vocabDatabaseSchema = cdmDatabaseSchema,
connectionString = NULL,
sourceId = NULL,
priority = 0,
user = NULL,
password = NULL,
connectionDetails = NULL) {
list(
sourceId = sourceId,
sourceKey = sourceKey,
sourceName = sourceName,
dbms = dbms,
connectionString = connectionString,
cdmDatabaseSchema = cdmDatabaseSchema,
resultsDatabaseSchema = resultsDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema,
priority = priority,
user = user,
password = password,
connectionDetails = connectionDetails
)
}
#' @title removeSources
#' @author Ajit Londhe
#'
#' @details
#' Removes CDM sources and their cohort results from OHDSI Repository. Sources are removed
#' from the SOURCE and SOURCE_DAIMON table, and cohort results for those sources are cleared.
#'
#' @param repoConnectionDetails A ConnectionDetails object for the OHDSI Repository
#' @param cdmSources A list of CDM sources, built using CDM source object from \code{\link{buildCdmSource}}
#' @param softRemove If TRUE, don't drop the rows, but soft remove the source. Otherwise, drop the source rows
#' @param sqlOnly Generate SQL only, don't execute
#'
#' @export
removeCdmSources <- function(repoConnectionDetails,
cdmSources,
softRemove = FALSE,
sqlOnly = FALSE) {
sourceKeys <- sapply(cdmSources, function(cdmSource) { sourceKey <- cdmSource$sourceKey })
sqls <- c()
sql <- SqlRender::render(sql = "select source_id from @ohdsiRepositorySchema.source where source_key in (@sourceKeys);",
ohdsiRepositorySchema = repoConnectionDetails$schema,
sourceKeys = paste0(sprintf("'%s'", sourceKeys), collapse = ","))
connection <- DatabaseConnector::connect(repoConnectionDetails)
sourceIds <- DatabaseConnector::querySql(connection = connection, sql = sql)
if (nrow(sourceIds) == 0) {
stop("No matching CDM sources found")
}
if (softRemove) {
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "softRemoveSource.sql",
packageName = "CdmAtlasCutover",
dbms = repoConnectionDetails$dbms,
ohdsiRepositorySchema = repoConnectionDetails$schema,
sourceIds = paste(sourceIds$SOURCE_ID, collapse = ","))
sqls <- c(sqls, sql)
} else {
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "ohdsi_repo_deletes.sql",
packageName = "CdmAtlasCutover",
dbms = repoConnectionDetails$dbms,
ohdsiRepositorySchema = repoConnectionDetails$schema,
sourceIds = paste(sourceIds$SOURCE_ID, collapse = ","))
sqls <- c(sqls, sql)
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "remove_sources.sql",
packageName = "CdmAtlasCutover",
dbms = repoConnectionDetails$dbms,
ohdsiRepositorySchema = repoConnectionDetails$schema,
sourceIds = paste(sourceIds$SOURCE_ID, collapse = ","))
sqls <- c(sqls, sql)
}
if (sqlOnly) {
if (!dir.exists("output")) { dir.create("output") }
SqlRender::writeSql(sql = paste(sqls, collapse = "\n"), targetFile = paste("output", "remove_cdm_sources.sql", sep = "/"))
} else {
connection <- DatabaseConnector::connect(repoConnectionDetails)
DatabaseConnector::executeSql(connection = connection, sql = paste(sqls, collapse = "\n"))
DatabaseConnector::disconnect(connection)
}
}
#' @title insertCdmSources
#' @author Ajit Londhe
#'
#' @details
#' Inserts CDM sources into OHDSI Repository. Creates new records in SOURCE and associated records in SOURCE_DAIMON.
#' As these tables may have identity indexes, boolean settings can be used to tell the function to skip instantiating new IDs.
#'
#'
#' @param repoConnectionDetails A ConnectionDetails object for the OHDSI Repository
#' @param cdmSources A list of CDM sources, built using CDM source object from \code{\link{buildCdmSource}}
#' @param sourceIdx Boolean to indicate if the SOURCE table uses an identity index for source_id
#' @param daimonIdx Boolean to indicate if the SOURCE_DAIMON table uses an identity index for source_daimon_id
#' @param sqlOnly Generate SQL only, don't execute
#'
#' @export
insertCdmSources <- function(repoConnectionDetails,
cdmSources,
sourceIdx = FALSE,
daimonIdx = FALSE,
sqlOnly = FALSE) {
if ("TRUE" %in% sapply(cdmSources, function(cdmSource) { connectionString <- is.null(cdmSource$connectionString) })) {
stop("Connection Strings must be defined for each CDM source object in cdmSources list")
}
getMaxId <- function(repoConnectionDetails, prefix) {
sql <- SqlRender::render(sql = "select max(@prefix_id) from @ohdsiRepositorySchema.@prefix;",
ohdsiRepositorySchema = repoConnectionDetails$schema, prefix = prefix)
connection <- DatabaseConnector::connect(repoConnectionDetails)
newId <- as.numeric(DatabaseConnector::querySql(connection = connection, sql = sql))
return(newId)
}
updateDaimonPriority <- function(sqls, repoConnectionDetails, cdmSource) {
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "update_daimon_priority.sql",
packageName = "CdmAtlasCutover",
dbms = repoConnectionDetails$dbms,
ohdsiRepositorySchema = repoConnectionDetails$schema)
sqls <- c(sqls, sql)
}
sqls <- c()
maxSourceId <- getMaxId(repoConnectionDetails, "source")
for (i in 1:length(cdmSources)) {
sourceValues <- {}
if (!sourceIdx) {
if (is.null(cdmSources[[i]]$sourceId)) {
cdmSources[[i]]$sourceId <- maxSourceId + i
}
sourceValues$source_id <- cdmSources[[i]]$sourceId
}
sourceValues$source_name <- shQuote(cdmSources[[i]]$sourceName, type = "csh")
sourceValues$source_key <- shQuote(cdmSources[[i]]$sourceKey, type = "csh")
sourceValues$source_connection <- shQuote(cdmSources[[i]]$connectionString, type = "csh")
sourceValues$source_dialect <- shQuote(cdmSources[[i]]$dbms, type = "csh")
if (!is.null(cdmSources[[i]]$user) &
!is.null(cdmSources[[i]]$password)) {
sourceValues$username <- shQuote(cdmSources[[i]]$user, type = "csh")
sourceValues$password <- shQuote(cdmSources[[i]]$password, type = "csh")
}
sql <- SqlRender::render(sql = "INSERT INTO @ohdsiRepositorySchema.source (@columns) values (@values);",
ohdsiRepositorySchema = repoConnectionDetails$schema,
columns = paste0(names(sourceValues), collapse = ","),
values = paste0(sourceValues, collapse = ","))
sqls <- c(sqls, sql)
maxDaimonId <- ((i * 3) + getMaxId(repoConnectionDetails, "source_daimon") - 2)
daimonTypes <- list(
list(name = "cdmDatabaseSchema", id = 0),
list(name = "vocabDatabaseSchema", id = 1),
list(name = "resultsDatabaseSchema", id = 2)
)
for (daimonType in daimonTypes) {
if (!is.null(cdmSources[[i]][daimonType$name][[1]])) {
daimonValues <- {}
if (!daimonIdx){
daimonValues$source_daimon_id <- paste0(maxDaimonId + daimonType$id, " as source_daimon_id")
}
daimonValues$priority <- paste0(0, ' as priority')
daimonValues$daimon_type <- paste0(daimonType$id,' as daimon_type')
daimonValues$table_qualifier <- paste0("cast('", cdmSources[[i]][daimonType$name][[1]], "' as varchar(255)) as table_qualifier")
if (cdmSources[[i]]$priority >= 1) {
daimonValues$priority <- paste0(cdmSources[[i]]$priority, ' as priority')
updateDaimonPriority(sqls = sqls, repoConnectionDetails = repoConnectionDetails, cdmSource = cdmSources[[i]])
}
sql <- SqlRender::render(sql = "INSERT INTO @ohdsiRepositorySchema.source_daimon (@columns)
select source_id, @values from @ohdsiRepositorySchema.source where source_key = '@sourceKey';",
ohdsiRepositorySchema = repoConnectionDetails$schema,
columns = paste('source_id', paste0(names(daimonValues), collapse = ","), sep = ","),
values = paste0(daimonValues, collapse = ","),
sourceKey = cdmSources[[i]]$sourceKey)
sqls <- c(sqls, sql)
}
}
}
if (sqlOnly) {
if (!dir.exists("output")) { dir.create("output") }
SqlRender::writeSql(sql = paste(sqls, collapse = "\n\n"), targetFile = paste("output", "insert_cdm_sources.sql", sep = "/"))
} else {
connection <- DatabaseConnector::connect(connectionDetails = repoConnectionDetails)
DatabaseConnector::executeSql(connection = connection, sql = paste(sqls, collapse = "\n\n"))
DatabaseConnector::disconnect(connection)
}
}
#' @title createOhdsiResultsTables
#' @author Ajit Londhe
#'
#' @details
#' Creates all the OHDSI Results tables needed by Atlas by pulling results DDL from WebAPI
#' @return
#' none
#'
#' @param cdmSources The list of databases to cut over
#' @param baseUrl The base URL for the WebApi instance, for example:
#' "http://api.ohdsi.org:80/WebAPI".
#' @param initConceptHierarchy Should the concept hierarchy be initializted?
#' @param sqlOnly Generate SQL only, don't execute
#'
#' @export
createOhdsiResultsTables <- function (cdmSources,
baseUrl,
initConceptHierarchy = FALSE,
sqlOnly = FALSE) {
if (!.checkBaseUrl(baseUrl)) {
stop("Base URL not valid, should be like http://api.ohdsi.org:80/WebAPI")
}
for (cdmSource in cdmSources) {
url <- sprintf("%1s/ddl/results?initConceptHierarchy=%2s", baseUrl, ifelse(initConceptHierarchy, "true", "false"))
req <- httr::GET(url)
httr::stop_for_status(req)
sql <- httr::content(x = req, encoding = "UTF-8")
sql <- SqlRender::render(sql = sql, warnOnMissingParameters = FALSE,
results_schema = cdmSource$resultsDatabaseSchema,
vocab_schema = cdmSource$vocabDatabaseSchema,
results_schema = cdmSource$resultsDatabaseSchema,
results_database_schema = cdmSource$resultsDatabaseSchema)
if (initConceptHierarchy) {
sql <- c(SqlRender::render("IF OBJECT_ID('@resultsDatabaseSchema.concept_hierarchy', 'U') IS NOT NULL truncate table @resultsDatabaseSchema.concept_hierarchy;",
resultsDatabaseSchema = cdmSource$resultsDatabaseSchema),
sql)
sql <- paste(sql, collapse = "\n\n")
}
# short-circuit the index creation to wrap in try statements -----------------------------------
indexes <- read.csv(file = system.file("csv", "resultsTableIndexes.csv", package = "CdmAtlasCutover"), header = TRUE, as.is = TRUE, stringsAsFactors = FALSE)
indexSqls <- lapply(indexes$SQL, function(s) {
indexSql <- SqlRender::render(sql = s, resultsDatabaseSchema = cdmSource$resultsDatabaseSchema)
indexSql <- SqlRender::translate(sql = indexSql, targetDialect = cdmSource$dbms)
indexSql
})
sql <- gsub(pattern = "CREATE INDEX ", replacement = "--CREATE INDEX ", x = sql,ignore.case = TRUE)
sql <- SqlRender::translate(sql = sql,
targetDialect = cdmSource$dbms)
if (sqlOnly) {
if (!dir.exists("output")) { dir.create("output") }
sql <- c(sql, paste(indexSqls, collapse = "\n\n"))
sql <- paste(sql, collapse = "\n\n")
SqlRender::writeSql(sql = sql,
targetFile = file.path("output",
sprintf("%s~create_ohdsi_results_tables.sql",
cdmSource$sourceKey)))
} else {
connection <- DatabaseConnector::connect(connectionDetails = cdmSource$connectionDetails)
on.exit(DatabaseConnector::disconnect(connection = connection))
DatabaseConnector::executeSql(connection = connection, sql = sql)
for (indexSql in indexSqls) {
try(DatabaseConnector::executeSql(connection = connection, sql = indexSql), silent = TRUE)
}
}
}
}
#' Refresh CDM sources in Atlas
#' @author Ajit Londhe
#' @details
#' Refreshes the source cache that feeds into Atlas.
#' This step is necessary when making changes to the SOURCE and SOURCE_DAIMON tables,
#' so that the changes propagate to Atlas.
#'
#' @param baseUrl The base URL for the WebApi instance, for example:
#' "http://api.ohdsi.org:80/WebAPI".
#'
#'
#' @export
refreshAtlasSources <- function (baseUrl) {
if (!.checkBaseUrl(baseUrl)) {
stop("Base URL not valid, should be like http://api.ohdsi.org:80/WebAPI")
}
tryCatch({
url <- gsub(pattern = "@baseUrl", replacement = baseUrl, x = "@baseUrl/source/refresh")
req <- httr::GET(url)
httr::stop_for_status(req)
writeLines("Atlas sources refreshed")
warning("In order to refresh the vocabulary caches in WebAPI, it is recommended you restart Tomcat")
}, error = function(err) {
writeLines("Unable to refresh Atlas sources")
})
}
.checkBaseUrl <- function(baseUrl) {
patterns <- list("https?:\\/\\/[a-z0-9]+([\\-\\.]{1}[a-z0-9]+)*\\.[a-z]{2,5}(:[0-9]{1,5})+(\\/.*)?\\/WebAPI$",
"https?:\\/\\/(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(:[0-9]{1,5})+(\\/.*)?\\/WebAPI$")
results <- lapply(patterns, function(p) {
result <- grepl(pattern = p,
x = baseUrl,
ignore.case = FALSE)
})
return(any(as.logical(results)))
}
#' Creates network level CDM source for Atlas
#'
#' @param cdmSources The list of databases to cut over
#' @param networkConnectionDetails A connectionDetails object for the network source
#' @param networkDatabaseSchema The fully qualified name of the schema that will serve as the Network Source in Atlas
#'
#' @export
createNetworkSource <- function(cdmSources,
networkConnectionDetails,
networkDatabaseSchema) {
achillesTypes <- data.frame(
tableName = c("achilles_results", "achilles_results_dist"),
distribution = c(0, 1)
)
analysisIds <- Achilles::getAnalysisDetails()
sqls <- apply(achillesTypes, 1, function(achillesType) {
analysisIds <- analysisIds$ANALYSIS_ID[analysisIds$DISTRIBUTION == as.integer(achillesType["distribution"][[1]])]
analysisIds <- lapply(analysisIds, function(a) {
sprintf("select %d as analysis_id", a)
})
analysisIds <- paste(analysisIds, collapse = " \nunion all\n ")
sqlUnions <- lapply(cdmSources, function(s) {
sql <- SqlRender::render("select * from @resultsDatabaseSchema.@achillesTable
where analysis_id in (select analysis_id from count_analyses)",
resultsDatabaseSchema = s$resultsDatabaseSchema,
achillesTable = achillesType["tableName"][[1]])
sql <- SqlRender::translate(sql = sql, targetDialect = networkConnectionDetails$dbms)
})
sqlUnions <- paste(sqlUnions, collapse = " \nunion all\n ")
sqlFileName <- SqlRender::snakeCaseToCamelCase(sprintf("network%s.sql", achillesType["tableName"][[1]]))
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = sqlFileName,
packageName = "CdmAtlasCutover",
dbms = networkConnectionDetails$dbms,
networkDatabaseSchema = networkDatabaseSchema,
analysisIds = analysisIds,
sqlUnions = sqlUnions)
})
connection <- DatabaseConnector::connect(connectionDetails = networkConnectionDetails)
tableNames <- DatabaseConnector::getTableNames(connection = connection, databaseSchema = networkDatabaseSchema)
if (!"achilles_analysis" %in% tolower(tableNames)) {
sql <- SqlRender::render("select * into @networkDatabaseSchema.achilles_analysis
from select * from @firstDatabaseSchema.achilles_analysis;",
networkDatabaseSchema = networkDatabaseSchema,
firstDatabaseSchema = cdmSources[[1]]$resultsDatabaseSchema)
sql <- SqlRender::translate(sql = sql, targetDialect = networkConnectionDetails$dbms)
DatabaseConnector::executeSql(connection = connection, sql = sql)
}
on.exit(DatabaseConnector::disconnect(connection = connection))
for (sql in sqls) {
DatabaseConnector::executeSql(connection = connection, sql = sql)
}
}
#' Rename the source key of an existing source
#'
#' @param oldSourceKey The name of the old source key
#' @param newSourceKey The new name of the source key
#' @param repoConnectionDetails A ConnectionDetails object for the OHDSI Repository
#' @param sqlOnly Generate SQL only, don't execute
#'
#' @export
renameSourceKey <- function(oldSourceKey,
newSourceKey,
repoConnectionDetails,
sqlOnly) {
sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "renameSourceKey.sql",
packageName = "CdmAtlasCutover",
dbms = repoConnectionDetails$dbms,
ohdsiRepositorySchema = repoConnectionDetails$schema,
oldSourceKey = oldSourceKey,
newSourceKey = newSourceKey)
if (sqlOnly) {
if (!dir.exists("output")) { dir.create("output") }
SqlRender::writeSql(sql = sql,
targetFile = file.path("output",
sprintf("%s~renameSourceKey.sql",
oldSourceKey)))
} else {
connection <- DatabaseConnector::connect(connectionDetails = repoConnectionDetails)
on.exit(DatabaseConnector::disconnect(connection = connection))
DatabaseConnector::executeSql(connection = connection, sql = sql)
}
}
# getActiveSourceDf <- function(baseUrl) {
# url <- sprintf("%s/source/sources", baseUrl)
# req <- httr::GET(url)
# httr::stop_for_status(req)
# response <- httr::content(x = req, encoding = "UTF-8")
#
# # sources <- lapply(response, function(r) {
# # list(sourceKey = r$sourceKey, r$daimons[[3]]$tableQualifier)
# # })
#
# pivot <- lapply(response, function(x) {
# x[sapply(x, is.null)] <- NA
# unlist(x)
# })
#
# df <- do.call("rbind.fill", lapply(pivot, as.data.frame))
# }
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.