R/Formatting.R

Defines functions addNonAgeTemp addAgeTemp reformatPerformance toSparseTorchPython MapCovariates toSparseM

Documented in toSparseM toSparseTorchPython

# @file formatting.R
#
# Copyright 2020 Observational Health Data Sciences and Informatics
#
# This file is part of PatientLevelPrediction
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#' Convert the plpData in COO format into a sparse R matrix
#'
#' @description
#' Converts the standard plpData to a sparse matrix
#'
#' @details
#' This function converts the covariate file from ffdf in COO format into a sparse matrix from
#' the package Matrix
#' @param plpData                       An object of type \code{plpData} with covariate in coo format - the patient level prediction
#'                                      data extracted from the CDM.
#' @param population                    The population to include in the matrix
#' @param map                           A covariate map (telling us the column number for covariates)
#' @param temporal                      Whether you want to convert temporal data
#' @examples
#' #TODO
#'
#' @return
#' Returns a list, containing the data as a sparse matrix, the plpData covariateRef
#' and a data.frame named map that tells us what covariate corresponds to each column
#' This object is a list with the following components: \describe{
#' \item{data}{A sparse matrix with the rows corresponding to each person in the plpData and the columns corresponding to the covariates.}
#' \item{covariateRef}{The plpData covariateRef.}
#' \item{map}{A data.frame containing the data column ids and the corresponding covariateId from covariateRef.}
#' }
#'
#' @export
toSparseM <- function(plpData,population, map=NULL, temporal=F){
  # check logger
  if(length(ParallelLogger::getLoggers())==0){
    logger <- ParallelLogger::createLogger(name = "SIMPLE",
                                        threshold = "INFO",
                                        appenders = list(ParallelLogger::createConsoleAppender(layout = ParallelLogger::layoutTimestamp)))
    ParallelLogger::registerLogger(logger)
  }
  
  ParallelLogger::logDebug(paste0('covariates nrow: ', nrow(plpData$covariateData$covariates)))
  ParallelLogger::logDebug(paste0('covariateRef nrow: ', nrow(plpData$covariateData$covariateRef)))
  
   
  #assign newIds to covariateRef
  newcovariateData <- MapCovariates(plpData$covariateData,
                                 population, 
                                 mapping=map)
  
  ParallelLogger::logDebug(paste0('Max covariateId in covariates: ',as.data.frame(newcovariateData$covariates %>% dplyr::summarise(max = max(covariateId, na.rm=T)))))
  ParallelLogger::logDebug(paste0('# covariates in covariateRef: ', nrow(newcovariateData$covariateRef)))
  ParallelLogger::logDebug(paste0('Max rowId in covariates: ', as.data.frame(newcovariateData$covariates %>% dplyr::summarise(max = max(rowId, na.rm=T)))))

  maxY <- as.data.frame(newcovariateData$mapping %>% dplyr::summarise(max=max(newCovariateId, na.rm = TRUE)))$max
  ParallelLogger::logDebug(paste0('Max newCovariateId in mapping: ',maxY))
  maxX <- max(population$rowId)
  ParallelLogger::logDebug(paste0('Max rowId in population: ',maxX))
  
  # chunk then add
  if(!temporal){
  data <- Matrix::sparseMatrix(i=1,
                               j=1,
                               x=0,
                               dims=c(maxX,maxY))
  convertData <- function(batch) {
    data <<- data + Matrix::sparseMatrix(i=as.data.frame(batch %>% select(rowId))$rowId,
                                         j=as.data.frame(batch %>% select(covariateId))$covariateId,
                                         x=as.data.frame(batch %>% select(covariateValue))$covariateValue,
                                         dims=c(maxX,maxY))
    return(NULL)
  }
  Andromeda::batchApply(newcovariateData$covariates, convertData, batchSize = 100000)
  
  } else {
    ParallelLogger::logTrace(paste0('Min time:', min(plpData$timeRef$timeId)))
    ParallelLogger::logTrace(paste0('Max time:', max(plpData$timeRef$timeId)))
    
    # do we want to use for(i in sort(plpData$timeRef$timeId)){ ?
    for(i in min(plpData$timeRef$timeId):max(plpData$timeRef$timeId)){
      
      if(nrow(newcovariateData$covariates %>% dplyr::filter(timeId==i))>0){
        ParallelLogger::logTrace(paste0('Found covariates for timeId ', i))
        
 
        # initiate the sparse matrix
        dataPlp <- Matrix::sparseMatrix(i=1,
                                     j=1,
                                     x=0,
                                     dims=c(maxX, maxY)) 
        dataEnv <- environment()
        ParallelLogger::logTrace(paste0('Initiated Mapping covariates for timeId ', i))
        
        
        # add function to batch creating matrix from Andromeda data
        convertData <- function(batch, dataEnv) {
          dataPlp <- get("dataPlp", envir = dataEnv)
          dataPlp <- dataPlp + Matrix::sparseMatrix(i=as.double(as.character(batch$rowId)),
                                               j=as.double(as.character(batch$covariateId)),
                                               x=batch$covariateValue,
                                               dims=c(maxX,maxY))
          
          assign("dataPlp", dataPlp, envir = dataEnv)
          return(NULL)
        }
        
        # add age for each time
        tempCovs <- addAgeTemp(timeId = i,newcovariateData, plpData$timeRef) # EDITED adding newCov
        if(!is.null(tempCovs)){
          Andromeda::batchApply(tempCovs, convertData, batchSize = 100000, dataEnv=dataEnv)
          ParallelLogger::logTrace(paste0('Added any age covariates for timeId ', i))
        }
        
        # add non age temporal covs for each time
        tempCovs <- addNonAgeTemp(timeId = i,newcovariateData)
        if(!is.null(tempCovs)){
          Andromeda::batchApply(tempCovs, convertData, batchSize = 100000, dataEnv=dataEnv)
          ParallelLogger::logTrace(paste0('Added non-age non-temporal covariates for timeId ', i))
        }
        
        # add non temporal covs
        tempCovs <- newcovariateData$covariates %>% 
          dplyr::filter(!is.na(timeId)) %>% 
          dplyr::filter(timeId == i) 
        Andromeda::batchApply(tempCovs, convertData, batchSize = 100000, dataEnv=dataEnv)
        
        data_array<-slam::as.simple_sparse_array(dataPlp)
        # remove dataPlp
        #dataPlp <<- NULL
        ParallelLogger::logTrace(paste0('Dim of data_array: ', paste0(dim(data_array), collapse='-')))
        
        #extending one more dimesion to the array
        data_array<-slam::extend_simple_sparse_array(data_array, MARGIN =c(1L))
        ParallelLogger::logTrace(paste0('Finished Mapping covariates for timeId ', i))
      } else {
        data_array <- tryCatch(slam::simple_sparse_array(i=matrix(c(1,1,1), ncol = 3), 
                                                   v=0,
                                                   dim=c(maxX,1, maxY))
        )
        
      }
      #binding arrays along the dimesion
      if(i==min(plpData$timeRef$timeId)) {
        result_array<-data_array
      }else{
        result_array<-slam::abind_simple_sparse_array(result_array,data_array,MARGIN=2L)
      }
    }
    data <- result_array
  }
  
  ParallelLogger::logDebug(paste0('Sparse matrix with dimensionality: ', paste(dim(data), collapse=',')  ))

  result <- list(data=data,
                 covariateRef=as.data.frame(newcovariateData$covariateRef),
                 map=as.data.frame(newcovariateData$mapping))
  return(result)

}

# restricts to pop and saves/creates mapping
MapCovariates <- function(covariateData,population, mapping){
  
  newCovariateData <- Andromeda::andromeda(covariateRef = covariateData$covariateRef,
                                           analysisRef = covariateData$analysisRef)
  
  # restrict to population for speed
  ParallelLogger::logTrace('restricting to population for speed and mapping')
  if(is.null(mapping)){
    mapping <- data.frame(oldCovariateId = as.data.frame(covariateData$covariateRef %>% dplyr::select(covariateId)),
                          newCovariateId = 1:nrow(covariateData$covariateRef))
  }
  if(sum(colnames(mapping)%in%c('oldCovariateId','newCovariateId'))!=2){
    colnames(mapping) <- c('oldCovariateId','newCovariateId')
  }
  covariateData$mapping <- mapping
  covariateData$population <- data.frame(rowId = population[,'rowId'])
  # assign new ids :
  newCovariateData$covariates <- covariateData$covariates %>%
    dplyr::inner_join(covariateData$population) %>% 
    dplyr::rename(oldCovariateId = covariateId) %>% 
    dplyr::inner_join(covariateData$mapping) %>% 
    dplyr::select(-oldCovariateId)  %>%
    dplyr::rename(covariateId = newCovariateId)
  covariateData$population <- NULL
  covariateData$mapping <- NULL
  
  newCovariateData$mapping <- mapping
  
  return(newCovariateData)
}



#' Convert the plpData in COO format into a sparse python matrix using torch.sparse
#'
#' @description
#' Converts the standard plpData to a sparse matrix firectly into python
#'
#' @details
#' This function converts the covariate file from ffdf in COO format into a sparse matrix from
#' the package Matrix
#' @param plpData                       An object of type \code{plpData} with covariate in coo format - the patient level prediction
#'                                      data extracted from the CDM.
#' @param population                    The population to include in the matrix
#' @param map                           A covariate map (telling us the column number for covariates)
#' @param temporal                      Whether to include timeId into tensor
#' @param pythonExePath                 Location of python exe you want to use
#' @examples
#' #TODO
#'
#' @return
#' Returns a list, containing the python object name of the sparse matrix, the plpData covariateRef
#' and a data.frame named map that tells us what covariate corresponds to each column
#' This object is a list with the following components: \describe{
#' \item{data}{The python object name containing a sparse matrix with the rows corresponding to each person in the plpData and the columns corresponding to the covariates.}
#' \item{covariateRef}{The plpData covariateRef.}
#' \item{map}{A data.frame containing the data column ids and the corresponding covariateId from covariateRef.}
#' }
#'
#' @export
toSparseTorchPython <- function(plpData,population, map=NULL, temporal=F, pythonExePath=NULL){
  # check logger
  if(length(ParallelLogger::getLoggers())==0){
    logger <- ParallelLogger::createLogger(name = "SIMPLE",
                                           threshold = "INFO",
                                           appenders = list(ParallelLogger::createConsoleAppender(layout = 'layoutTimestamp')))
    ParallelLogger::registerLogger(logger)
  }
  
  newcovariateData <- MapCovariates(plpData$covariateData,
                                         population, 
                                         mapping=map)
  
  ParallelLogger::logDebug(paste0('Max ',as.data.frame(newcovariateData$covariates %>% dplyr::summarise(max = max(covariateId, na.rm=T)))))
  ParallelLogger::logDebug(paste0('# cols: ', nrow(newcovariateData$covariateRef)))
  ParallelLogger::logDebug(paste0('Max rowId: ', as.data.frame(newcovariateData$covariates %>% dplyr::summarise(max = max(rowId, na.rm=T)))))
  
  ParallelLogger::logTrace(paste0('Converting data into python sparse matrix...'))
  
  maxT <- NULL
  if(temporal){
    maxT <- as.data.frame(newcovariateData$covariates$timeId %>% dplyr::summarise(max = max(id, na.rm=T)))
    ParallelLogger::logDebug(paste0('Max timeId: ', maxT))
  }
  
  maxCol <- as.data.frame(newcovariateData$mapping %>% dplyr::summarise(max=max(newCovariateId,na.rm = TRUE)))$max
  maxRow <- max(population$rowId)
  
  # source the python fucntion
  e <- environment()
  reticulate::source_python(system.file(package='PatientLevelPrediction','python','TorchMap.py'), envir = e)
  dataPlp <<- map_python_initiate(maxCol = as.integer(maxCol), 
                                  maxRow = as.integer(maxRow), 
                                  maxT= as.integer(maxT))
  
  convertData <- function(batch, temporal=T) {
    if(temporal){
      dataPlp <<- map_python(matrix = dataPlp ,
                             datas = as.matrix(as.data.frame(batch %>% dplyr::select(rowId,covariateId,timeId,covariateValue))),
                             maxCol = as.integer(maxCol),
                             maxRow = as.integer(maxRow),
                             maxT = as.integer(maxT))
    }else{
      dataPlp <<- map_python(matrix = dataPlp ,
                             datas = as.matrix(as.data.frame(batch %>% dplyr::select(rowId,covariateId,covariateValue))),
                             maxCol = as.integer(maxCol),
                             maxRow = as.integer(maxRow),
                             maxT = NULL) 
    }
    return(NULL)
  }
  
  if(temporal==T){
    # add the age and non-temporal data
    timeIds <- unique(plpData$timeRef$timeId)
    for(timeId in timeIds){
      tempData <- addAgeTemp(timeId, plpData)
      if(!is.null(tempData)){
        Andromeda::batchApply(newcovariateData$covariates, convertData,temporal =T, batchSize = 100000)
      }
      tempData <- addNonAgeTemp(timeId,plpData.mapped)
      if(!is.null(tempData)){
        Andromeda::batchApply(newcovariateData$covariates, convertData,temporal =T, batchSize = 100000)
      }
      rm(tempData)
    }
    
    # add the rest
    tempData <- newcovariateData$covariates %>%
      dplyr::filter(timeId!=0) %>%
      dplyr::filter(!is.na(timeId))
    Andromeda::batchApply(tempData, convertData,temporal =T, batchSize = 100000)
  } else {
    Andromeda::batchApply(newcovariateData$covariates, convertData,
                          temporal =F, batchSize = 100000)
  }
  result <- dataPlp
  dataPlp <<- NULL
  ParallelLogger::logTrace(paste0('Sparse python tensor converted'))                            
  
  result <- list(data=result,
                 covariateRef=as.data.frame(newcovariateData$covariateRef),
                 map=as.data.frame(newcovariateData$mapping))
  return(result)
}


# reformat the evaluation
reformatPerformance <- function(train, test, analysisId){

  nr1 <- length(unlist(train$evaluationStatistics[-1]))
  nr2 <- length(unlist(test$evaluationStatistics[-1]))
  evaluationStatistics <- cbind(analysisId=rep(analysisId,nr1+nr2),
                                Eval=c(rep('train', nr1),rep('test', nr2)),
                                Metric = names(c(unlist(train$evaluationStatistics[-1]),
                                                 unlist(test$evaluationStatistics[-1]))),
                                Value = c(unlist(train$evaluationStatistics[-1]),
                                      unlist(test$evaluationStatistics[-1]))
                                )

  nr1 <- nrow(train$thresholdSummary)
  nr2 <- nrow(test$thresholdSummary)
  thresholdSummary <- rbind(cbind(analysisId=rep(analysisId,nr1),Eval=rep('train', nr1),
                                      train$thresholdSummary),
                                cbind(analysisId=rep(analysisId,nr2),Eval=rep('test', nr2),
                                      test$thresholdSummary))

  if(!is.null(train$demographicSummary)){
    nr1 <- nrow(train$demographicSummary)
    nr2 <- nrow(test$demographicSummary)
    demographicSummary <- rbind(cbind(analysisId=rep(analysisId,nr1),Eval=rep('train', nr1),
                                      train$demographicSummary),
                                cbind(analysisId=rep(analysisId,nr2),Eval=rep('test', nr2),
                                      test$demographicSummary))
  } else{
    demographicSummary <- NULL
  }

  nr1 <- nrow(train$calibrationSummary)
  nr2 <- nrow(test$calibrationSummary)
  calibrationSummary <- rbind(cbind(analysisId=rep(analysisId,nr1),Eval=rep('train', nr1),
                                    train$calibrationSummary),
                              cbind(analysisId=rep(analysisId,nr2),Eval=rep('test', nr2),
                                    test$calibrationSummary))

  nr1 <- nrow(train$predictionDistribution)
  nr2 <- nrow(test$predictionDistribution)
  predictionDistribution <- rbind(cbind(analysisId=rep(analysisId,nr1),Eval=rep('train', nr1),
                                    train$predictionDistribution),
                              cbind(analysisId=rep(analysisId,nr2),Eval=rep('test', nr2),
                                    test$predictionDistribution))


  result <- list(evaluationStatistics=evaluationStatistics,
                 thresholdSummary=thresholdSummary,
                 demographicSummary =demographicSummary,
                 calibrationSummary=calibrationSummary,
                 predictionDistribution=predictionDistribution)

  return(result)
}


# helpers for converting temporal PLP data to matrix/tensor
addAgeTemp <- function(timeId, newcovariateData, timeRef){
  
  startDay <- as.data.frame(timeRef[timeRef$timeId==timeId,])$startDay
  
  ageId <- as.data.frame(newcovariateData$mapping %>% 
    dplyr::filter(oldCovariateId == 1002) %>%
    dplyr::select(newCovariateId))$newCovariateId
  
  ageData <- newcovariateData$covariates%>% # changed from plpData$covariateData
    dplyr::filter(covariateId == ageId) %>%
    dplyr::mutate(covariateValueNew = covariateValue*365 + startDay,
                  timeId = timeId) %>%
    dplyr::select(-covariateValue) %>%
    dplyr::rename(covariateValue = covariateValueNew) %>%
    dplyr::select(rowId,covariateId,covariateValue, timeId)
  
  if(nrow(ageData)==0){
    return(NULL)
  }
  return(ageData)
}


addNonAgeTemp <- function(timeId, newcovariateData){

  ageId <- as.data.frame(newcovariateData$mapping %>% 
                           dplyr::filter(oldCovariateId == 1002) %>%
                           dplyr::select(newCovariateId))$newCovariateId
  
  otherTempCovs <- newcovariateData$covariates%>% 
    dplyr::filter(is.na(timeId)) %>%
    dplyr::filter(covariateId != ageId) %>%
    dplyr::mutate(timeId = timeId) %>%
    dplyr::select(rowId,covariateId,covariateValue,timeId)
  
  if(nrow(otherTempCovs)==0){
    return(NULL)
  }
  return(otherTempCovs)
}
hxia/plp-git-demo documentation built on March 19, 2021, 1:54 a.m.