R/CdmAtlasCutover.R

Defines functions buildCdmSource removeCdmSources insertCdmSources .checkBaseUrl createNetworkSource renameSourceKey

Documented in buildCdmSource createNetworkSource insertCdmSources removeCdmSources renameSourceKey

# @file CdmAtlasCutover
#
# Copyright 2019 Observational Health Data Sciences and Informatics
#
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# @author Observational Health Data Sciences and Informatics
# @author Ajit Londhe


#' @title buildCdmSource
#' @author Ajit Londhe
#' @details
#' Creates a CDM source object for use in the package with necessary connection information.
#' If using \code{\link{insertCdmSources}} then connectionString will need to be defined.
#' @examples
#' ## When building a CDM source to insert, and to make it top priority:
#' 
#' buildCdmSource(sourceKey = "TestDb_V123", sourceName = "Test Database v123", dbms = "redshift", cdmDatabaseSchema = "cdm",
#' resultsDatabaseSchema = "ohdsi_results", vocabDatabaseSchema = "cdm", priority = 1,
#' connectionString = "jdbc:redshift://some-redshift-cluster:5439/testdb_v123?user=some-user&password=some-password")
#' @return
#' A CDM source object with the necessary parameters to fulfill SOURCE and SOURCE_DAIMON
#'
#' @param sourceKey                    A unique string identifier for the data source
#' @param sourceName                   The name of the CDM source
#' @param dbms                         The dbms of the CDM source
#' @param cdmDatabaseSchema            The name of the schema where CDM data sits
#' @param resultsDatabaseSchema        The name of the schema where results data sits
#' @param vocabDatabaseSchema          The name of the schema where the vocabulary sits
#' @param sourceId                     An optional value to pre-specify the database index identifier for the data source. 
#'                                     If left NULL, this is either auto-generated by the Repo database, or the next available
#'                                     source_id is obtained.
#' @param connectionString             The JDBC connection string to the CDM source; REQUIRED if trying to insert a CDM source 
#'                                     (\code{\link{insertCdmSources}}), unnecessary when trying to remove a CDM source 
#'                                     (\code{\link{removeCdmSources}})
#' @param priority                     1 = the source become the one Atlas uses by default (e.g. to show record counts)
#'                                     0 = the source will not become the one Atlas uses by default (e.g. to show record counts)
#' @param user                         (OPTIONAL) The user name for the connection to the CDM. Only used if encryption of source credentials is enabled.
#' @param password                     (OPTIONAL) The password for the connection to the CDM. Only used if encryption of source credentials is enabled.
#' 
#' @param connectionDetails            A connectionDetails object created using DatabaseConnector of this source
#' @export
buildCdmSource <- function(sourceKey, 
                           sourceName = NULL, 
                           dbms = NULL, 
                           cdmDatabaseSchema = NULL, 
                           resultsDatabaseSchema = NULL, 
                           vocabDatabaseSchema = cdmDatabaseSchema,
                           connectionString = NULL, 
                           sourceId = NULL, 
                           priority = 0,
                           user = NULL,
                           password = NULL,
                           connectionDetails = NULL) {
  list(
    sourceId = sourceId,
    sourceKey = sourceKey,
    sourceName = sourceName,
    dbms = dbms,
    connectionString = connectionString,
    cdmDatabaseSchema = cdmDatabaseSchema,
    resultsDatabaseSchema = resultsDatabaseSchema,
    vocabDatabaseSchema = vocabDatabaseSchema,
    priority = priority,
    user = user,
    password = password,
    connectionDetails = connectionDetails
  )
}

#' @title removeSources
#' @author Ajit Londhe
#'
#' @details
#' Removes CDM sources and their cohort results from OHDSI Repository. Sources are removed
#' from the SOURCE and SOURCE_DAIMON table, and cohort results for those sources are cleared.
#'
#' @param repoConnectionDetails     A ConnectionDetails object for the OHDSI Repository
#' @param cdmSources                A list of CDM sources, built using CDM source object from \code{\link{buildCdmSource}}
#' @param softRemove                If TRUE, don't drop the rows, but soft remove the source. Otherwise, drop the source rows
#' @param sqlOnly                   Generate SQL only, don't execute
#' 
#' @export
removeCdmSources <- function(repoConnectionDetails, 
                             cdmSources, 
                             softRemove = FALSE,
                             sqlOnly = FALSE) {
  
  sourceKeys <- sapply(cdmSources, function(cdmSource) { sourceKey <- cdmSource$sourceKey })
  sqls <- c()
  sql <- SqlRender::render(sql = "select source_id from @ohdsiRepositorySchema.source where source_key in (@sourceKeys);", 
                           ohdsiRepositorySchema = repoConnectionDetails$schema, 
                           sourceKeys = paste0(sprintf("'%s'", sourceKeys), collapse = ","))
  connection <- DatabaseConnector::connect(repoConnectionDetails)
  sourceIds <- DatabaseConnector::querySql(connection = connection, sql = sql)

  if (nrow(sourceIds) == 0) {
    stop("No matching CDM sources found")
  }
  
  if (softRemove) {
    sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "softRemoveSource.sql", 
                                             packageName = "CdmAtlasCutover", 
                                             dbms = repoConnectionDetails$dbms,
                                             ohdsiRepositorySchema = repoConnectionDetails$schema,
                                             sourceIds = paste(sourceIds$SOURCE_ID, collapse = ","))  
    sqls <- c(sqls, sql)
  } else {
    sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "ohdsi_repo_deletes.sql", 
                                             packageName = "CdmAtlasCutover", 
                                             dbms = repoConnectionDetails$dbms,
                                             ohdsiRepositorySchema = repoConnectionDetails$schema, 
                                             sourceIds = paste(sourceIds$SOURCE_ID, collapse = ","))
    sqls <- c(sqls, sql)
    
    sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "remove_sources.sql", 
                                             packageName = "CdmAtlasCutover", 
                                             dbms = repoConnectionDetails$dbms,
                                             ohdsiRepositorySchema = repoConnectionDetails$schema,
                                             sourceIds = paste(sourceIds$SOURCE_ID, collapse = ","))  
    sqls <- c(sqls, sql)
  }
  
  if (sqlOnly) {
    if (!dir.exists("output")) { dir.create("output") }
    SqlRender::writeSql(sql = paste(sqls, collapse = "\n"), targetFile = paste("output", "remove_cdm_sources.sql", sep = "/"))
  } else {
    connection <- DatabaseConnector::connect(repoConnectionDetails)
    DatabaseConnector::executeSql(connection = connection, sql = paste(sqls, collapse = "\n"))  
    DatabaseConnector::disconnect(connection)
  }
}


#' @title insertCdmSources
#' @author Ajit Londhe
#'
#' @details
#' Inserts CDM sources into OHDSI Repository. Creates new records in SOURCE and associated records in SOURCE_DAIMON.
#' As these tables may have identity indexes, boolean settings can be used to tell the function to skip instantiating new IDs.
#'
#'
#' @param repoConnectionDetails     A ConnectionDetails object for the OHDSI Repository
#' @param cdmSources                A list of CDM sources, built using CDM source object from \code{\link{buildCdmSource}}
#' @param sourceIdx                 Boolean to indicate if the SOURCE table uses an identity index for source_id
#' @param daimonIdx                 Boolean to indicate if the SOURCE_DAIMON table uses an identity index for source_daimon_id
#' @param sqlOnly                   Generate SQL only, don't execute
#' 
#' @export
insertCdmSources <- function(repoConnectionDetails, 
                             cdmSources, 
                             sourceIdx = FALSE, 
                             daimonIdx = FALSE,
                             sqlOnly = FALSE) {
  
  if ("TRUE" %in% sapply(cdmSources, function(cdmSource) { connectionString <- is.null(cdmSource$connectionString) })) {
    stop("Connection Strings must be defined for each CDM source object in cdmSources list")   
  }
  
  getMaxId <- function(repoConnectionDetails, prefix) {
    
    sql <- SqlRender::render(sql = "select max(@prefix_id) from @ohdsiRepositorySchema.@prefix;", 
                     ohdsiRepositorySchema = repoConnectionDetails$schema, prefix = prefix)
    connection <- DatabaseConnector::connect(repoConnectionDetails)
    newId <- as.numeric(DatabaseConnector::querySql(connection = connection, sql = sql))
    return(newId)
  }
  
  updateDaimonPriority <- function(sqls, repoConnectionDetails, cdmSource) {
    sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "update_daimon_priority.sql", 
                                             packageName = "CdmAtlasCutover", 
                                             dbms = repoConnectionDetails$dbms, 
                                             ohdsiRepositorySchema = repoConnectionDetails$schema)
    
    sqls <- c(sqls, sql)
  }
  
  sqls <- c()
  maxSourceId <- getMaxId(repoConnectionDetails, "source")
  
  for (i in 1:length(cdmSources)) {
    sourceValues <- {}
    if (!sourceIdx) {
      if (is.null(cdmSources[[i]]$sourceId)) {
        cdmSources[[i]]$sourceId <- maxSourceId + i
      }  
      sourceValues$source_id <- cdmSources[[i]]$sourceId
    }
    
    sourceValues$source_name <- shQuote(cdmSources[[i]]$sourceName, type = "csh") 
    sourceValues$source_key <- shQuote(cdmSources[[i]]$sourceKey, type = "csh") 
    sourceValues$source_connection <- shQuote(cdmSources[[i]]$connectionString, type = "csh")
    sourceValues$source_dialect <- shQuote(cdmSources[[i]]$dbms, type = "csh")
    if (!is.null(cdmSources[[i]]$user) & 
        !is.null(cdmSources[[i]]$password)) {
     
      sourceValues$username <- shQuote(cdmSources[[i]]$user, type = "csh") 
      sourceValues$password <- shQuote(cdmSources[[i]]$password, type = "csh") 
    }
    
    sql <- SqlRender::render(sql = "INSERT INTO @ohdsiRepositorySchema.source (@columns) values (@values);",
                     ohdsiRepositorySchema = repoConnectionDetails$schema,
                     columns = paste0(names(sourceValues), collapse = ","),
                     values = paste0(sourceValues, collapse = ","))
    
    sqls <- c(sqls, sql)
    
    maxDaimonId <- ((i * 3) + getMaxId(repoConnectionDetails, "source_daimon") - 2)
    
    daimonTypes <- list(
      list(name = "cdmDatabaseSchema", id = 0),
      list(name = "vocabDatabaseSchema", id = 1),
      list(name = "resultsDatabaseSchema", id = 2)
    )
    
    for (daimonType in daimonTypes) {
      if (!is.null(cdmSources[[i]][daimonType$name][[1]])) {
      
        daimonValues <- {}
        if (!daimonIdx){
          daimonValues$source_daimon_id <- paste0(maxDaimonId + daimonType$id, " as source_daimon_id")
        }
        daimonValues$priority <- paste0(0, ' as priority')
        daimonValues$daimon_type <- paste0(daimonType$id,' as daimon_type')
        
        
        daimonValues$table_qualifier <- paste0("cast('", cdmSources[[i]][daimonType$name][[1]], "' as varchar(255)) as table_qualifier")
        
        if (cdmSources[[i]]$priority >= 1) {
          daimonValues$priority <- paste0(cdmSources[[i]]$priority, ' as priority')
          updateDaimonPriority(sqls = sqls, repoConnectionDetails = repoConnectionDetails, cdmSource = cdmSources[[i]])
        }
        
        sql <- SqlRender::render(sql = "INSERT INTO @ohdsiRepositorySchema.source_daimon (@columns) 
                                select source_id, @values from @ohdsiRepositorySchema.source where source_key = '@sourceKey';",
                         ohdsiRepositorySchema = repoConnectionDetails$schema,
                         columns = paste('source_id', paste0(names(daimonValues), collapse = ","), sep = ","),
                         values = paste0(daimonValues, collapse = ","),
                         sourceKey = cdmSources[[i]]$sourceKey)
        
        sqls <- c(sqls, sql)
      }
    }
  }
  
  if (sqlOnly) {
    
    if (!dir.exists("output")) { dir.create("output") }
    SqlRender::writeSql(sql = paste(sqls, collapse = "\n\n"), targetFile = paste("output", "insert_cdm_sources.sql", sep = "/"))
  } else {
    connection <- DatabaseConnector::connect(connectionDetails = repoConnectionDetails)
    DatabaseConnector::executeSql(connection = connection, sql = paste(sqls, collapse = "\n\n"))  
    DatabaseConnector::disconnect(connection)
  }
}

#' @title createOhdsiResultsTables
#' @author Ajit Londhe
#'
#' @details
#' Creates all the OHDSI Results tables needed by Atlas by pulling results DDL from WebAPI
#' @return
#' none
#'
#' @param cdmSources             The list of databases to cut over
#' @param baseUrl                The base URL for the WebApi instance, for example:
#'                               "http://api.ohdsi.org:80/WebAPI".
#' @param initConceptHierarchy   Should the concept hierarchy be initializted?
#' @param sqlOnly                Generate SQL only, don't execute
#' 
#' @export
createOhdsiResultsTables <- function (cdmSources,
                                      baseUrl, 
                                      initConceptHierarchy = FALSE, 
                                      sqlOnly = FALSE) {
  
  if (!.checkBaseUrl(baseUrl)) {
    stop("Base URL not valid, should be like http://api.ohdsi.org:80/WebAPI")
  }
  
  for (cdmSource in cdmSources) {
  
    url <- sprintf("%1s/ddl/results?initConceptHierarchy=%2s", baseUrl, ifelse(initConceptHierarchy, "true", "false"))
    req <- httr::GET(url)
    httr::stop_for_status(req)
    sql <- httr::content(x = req, encoding = "UTF-8")
    
    sql <- SqlRender::render(sql = sql,  warnOnMissingParameters = FALSE,
                             results_schema = cdmSource$resultsDatabaseSchema,
                             vocab_schema = cdmSource$vocabDatabaseSchema,
                             results_schema = cdmSource$resultsDatabaseSchema,
                             results_database_schema = cdmSource$resultsDatabaseSchema)
    
    if (initConceptHierarchy) {
      sql <- c(SqlRender::render("IF OBJECT_ID('@resultsDatabaseSchema.concept_hierarchy', 'U') IS NOT NULL truncate table @resultsDatabaseSchema.concept_hierarchy;",
                                 resultsDatabaseSchema = cdmSource$resultsDatabaseSchema),
               sql)
      sql <- paste(sql, collapse = "\n\n")
    }
    
    # short-circuit the index creation to wrap in try statements -----------------------------------
    
    indexes <- read.csv(file = system.file("csv", "resultsTableIndexes.csv", package = "CdmAtlasCutover"), header = TRUE, as.is = TRUE, stringsAsFactors = FALSE)

    indexSqls <- lapply(indexes$SQL, function(s) {
      indexSql <- SqlRender::render(sql = s, resultsDatabaseSchema = cdmSource$resultsDatabaseSchema)
      indexSql <- SqlRender::translate(sql = indexSql, targetDialect = cdmSource$dbms)
      indexSql
    })
    
    sql <- gsub(pattern = "CREATE INDEX ", replacement = "--CREATE INDEX ", x = sql,ignore.case = TRUE)
    
    sql <- SqlRender::translate(sql = sql,
                                targetDialect = cdmSource$dbms)
    
    if (sqlOnly) {
      if (!dir.exists("output")) { dir.create("output") }
      sql <- c(sql, paste(indexSqls, collapse = "\n\n"))
      sql <- paste(sql, collapse = "\n\n")
      SqlRender::writeSql(sql = sql, 
                          targetFile = file.path("output", 
                                                 sprintf("%s~create_ohdsi_results_tables.sql", 
                                                         cdmSource$sourceKey)))
    } else {
      connection <- DatabaseConnector::connect(connectionDetails = cdmSource$connectionDetails)
      on.exit(DatabaseConnector::disconnect(connection = connection))
      
      DatabaseConnector::executeSql(connection = connection, sql = sql)
      for (indexSql in indexSqls) {
        try(DatabaseConnector::executeSql(connection = connection, sql = indexSql), silent = TRUE)
      }
    }
  }
}


#' Refresh CDM sources in Atlas
#' @author Ajit Londhe
#' @details
#' Refreshes the source cache that feeds into Atlas. 
#' This step is necessary when making changes to the SOURCE and SOURCE_DAIMON tables,
#' so that the changes propagate to Atlas.
#' 
#' @param baseUrl        The base URL for the WebApi instance, for example:
#'                       "http://api.ohdsi.org:80/WebAPI".
#' 
#' 
#' @export
refreshAtlasSources <- function (baseUrl) {

  if (!.checkBaseUrl(baseUrl)) {
    stop("Base URL not valid, should be like http://api.ohdsi.org:80/WebAPI")
  }
  
  tryCatch({
    url <- gsub(pattern = "@baseUrl", replacement = baseUrl, x = "@baseUrl/source/refresh")
    
    req <- httr::GET(url)
    httr::stop_for_status(req)
    writeLines("Atlas sources refreshed")
    warning("In order to refresh the vocabulary caches in WebAPI, it is recommended you restart Tomcat")
  }, error = function(err) {
    writeLines("Unable to refresh Atlas sources")
  })
}

.checkBaseUrl <- function(baseUrl) {
  patterns <- list("https?:\\/\\/[a-z0-9]+([\\-\\.]{1}[a-z0-9]+)*\\.[a-z]{2,5}(:[0-9]{1,5})+(\\/.*)?\\/WebAPI$",
                   "https?:\\/\\/(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(:[0-9]{1,5})+(\\/.*)?\\/WebAPI$")
  results <- lapply(patterns, function(p) {
    result <- grepl(pattern = p, 
                    x = baseUrl, 
                    ignore.case = FALSE)
  })
  return(any(as.logical(results)))
}


#' Creates network level CDM source for Atlas
#' 
#' @param cdmSources                 The list of databases to cut over
#' @param networkConnectionDetails   A connectionDetails object for the network source
#' @param networkDatabaseSchema      The fully qualified name of the schema that will serve as the Network Source in Atlas
#' 
#' @export
createNetworkSource <- function(cdmSources,
                                networkConnectionDetails,
                                networkDatabaseSchema) {
  
  achillesTypes <- data.frame(
    tableName = c("achilles_results", "achilles_results_dist"),
    distribution = c(0, 1)
  )
  
  analysisIds <- Achilles::getAnalysisDetails()
  
  sqls <- apply(achillesTypes, 1, function(achillesType) {

    analysisIds <- analysisIds$ANALYSIS_ID[analysisIds$DISTRIBUTION == as.integer(achillesType["distribution"][[1]])]
    
    analysisIds <- lapply(analysisIds, function(a) {
      sprintf("select %d as analysis_id", a)
    })
    
    analysisIds <- paste(analysisIds, collapse = " \nunion all\n ")
    
    sqlUnions <- lapply(cdmSources, function(s) {
      sql <- SqlRender::render("select * from @resultsDatabaseSchema.@achillesTable 
                               where analysis_id in (select analysis_id from count_analyses)",
                               resultsDatabaseSchema = s$resultsDatabaseSchema,
                               achillesTable = achillesType["tableName"][[1]])
      sql <- SqlRender::translate(sql = sql, targetDialect = networkConnectionDetails$dbms)
    })
    sqlUnions <- paste(sqlUnions, collapse = " \nunion all\n ")
    
    sqlFileName <- SqlRender::snakeCaseToCamelCase(sprintf("network%s.sql", achillesType["tableName"][[1]]))
  
    sql <- SqlRender::loadRenderTranslateSql(sqlFilename = sqlFileName,
                                              packageName = "CdmAtlasCutover", 
                                              dbms = networkConnectionDetails$dbms,
                                              networkDatabaseSchema = networkDatabaseSchema,
                                              analysisIds = analysisIds,
                                              sqlUnions = sqlUnions)
  })
  
  connection <- DatabaseConnector::connect(connectionDetails = networkConnectionDetails)
  
  tableNames <- DatabaseConnector::getTableNames(connection = connection, databaseSchema = networkDatabaseSchema)
  if (!"achilles_analysis" %in% tolower(tableNames)) {
    
    sql <- SqlRender::render("select * into @networkDatabaseSchema.achilles_analysis 
                             from select * from @firstDatabaseSchema.achilles_analysis;",
                             networkDatabaseSchema = networkDatabaseSchema,
                             firstDatabaseSchema = cdmSources[[1]]$resultsDatabaseSchema)
    sql <- SqlRender::translate(sql = sql, targetDialect = networkConnectionDetails$dbms)
    
    DatabaseConnector::executeSql(connection = connection, sql = sql)
  }
  
  on.exit(DatabaseConnector::disconnect(connection = connection))
  
  for (sql in sqls) {
    DatabaseConnector::executeSql(connection = connection, sql = sql)
  }
}

#' Rename the source key of an existing source
#' 
#' @param oldSourceKey              The name of the old source key
#' @param newSourceKey              The new name of the source key
#' @param repoConnectionDetails     A ConnectionDetails object for the OHDSI Repository
#' @param sqlOnly                   Generate SQL only, don't execute
#' 
#' @export
renameSourceKey <- function(oldSourceKey, 
                            newSourceKey,
                            repoConnectionDetails,
                            sqlOnly) {
  sql <- SqlRender::loadRenderTranslateSql(sqlFilename = "renameSourceKey.sql",
                                           packageName = "CdmAtlasCutover",
                                           dbms = repoConnectionDetails$dbms,
                                           ohdsiRepositorySchema = repoConnectionDetails$schema,
                                           oldSourceKey = oldSourceKey,
                                           newSourceKey = newSourceKey)
  
  if (sqlOnly) {
    if (!dir.exists("output")) { dir.create("output") }
    SqlRender::writeSql(sql = sql, 
                        targetFile = file.path("output", 
                                               sprintf("%s~renameSourceKey.sql", 
                                                       oldSourceKey)))
  } else {
    connection <- DatabaseConnector::connect(connectionDetails = repoConnectionDetails)
    on.exit(DatabaseConnector::disconnect(connection = connection))
    DatabaseConnector::executeSql(connection = connection, sql = sql)  
  }
}


# getActiveSourceDf <- function(baseUrl) {
#   url <- sprintf("%s/source/sources", baseUrl)
#   req <- httr::GET(url)
#   httr::stop_for_status(req)
#   response <- httr::content(x = req, encoding = "UTF-8")
#   
#   # sources <- lapply(response, function(r) {
#   #   list(sourceKey = r$sourceKey, r$daimons[[3]]$tableQualifier)
#   # })
#   
#   pivot <- lapply(response, function(x) {
#     x[sapply(x, is.null)] <- NA
#     unlist(x)
#   })
#   
#   df <- do.call("rbind.fill", lapply(pivot, as.data.frame))
# }
OHDSI/CdmAtlasCutover documentation built on Sept. 28, 2019, 11:14 a.m.