R/getHighdimData.R

# Copyright 2014 Janssen Research & Development, LLC.
#
# This file is part of tranSMART R Client: R package allowing access to
# tranSMART's data via its RESTful API.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version, along with the following terms:
#
#   1. You may convey a work based on this program in accordance with
#      section 5, provided that you retain the above notices.
#   2. You may convey verbatim copies of this program code as you receive
#      it, in any medium, provided that you retain the above notices.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.


# Performance notes
#
# Downloading and parsing large data sets of high dimensional data can take a
# significant amount of time (minutes for several 100 mb). We have attempted to
# optimize the process a reasonable amount.
#
# The current RCurl wrapper doesn't expose functionality to download a binary
# url and process the chunks asynchronously as they come in (that is only
# supported for text urls). Doing the downloading and parsing at the same time
# should give a significant improvement, but that would require changes in RCurl
# or a different way of downloading the data.
#
# The parser has also been optimized up to the level that the R code itself only
# takes a minority of the runtime. The most time consuming operations are the
# foreign function calls to retrieve the fields from messages and to construct
# objects to parse the varint32 preceding each message. Significant further
# optimization of the parser would probably require dropping to C or another
# lower level language.


getHighdimData <- function(apiUrl, auth.token, study.name, concept.match = NULL, concept.link = NULL, projection = NULL) {


  if (is.null(concept.link) && !is.null(concept.match)) {
    studyConcepts <- getConcepts(apiUrl, auth.token, study.name)
    conceptFound <- grep(concept.match, studyConcepts$name)[1]
    if (is.na(conceptFound)) {
      warning(paste("No match found for:", concept.match))
    } else { concept.link <- studyConcepts$api.link.self.href[conceptFound] }
  }

  if (length(concept.link) == 0L) {
    warning("No concepts selected or found to match your arguments.")
    return(NULL)
  }

  serverResult <- .transmartServerGetRequest(paste(apiUrl, concept.link, "/highdim", sep=""), auth.token, accept.type = "hal")
  if (length(serverResult$dataTypes) == 0) {
    warning("This high dimensional concept contains no data.")
    return(NULL)
  }
  listOfHighdimDataTypes <- serverResult$dataTypes[[1]]

  if (!is.null(projection)) {
    matchingProjectionIndex <- which(names(listOfHighdimDataTypes$api.link) == projection)
    if (length(matchingProjectionIndex) > 0) {
      projectionLink <- listOfHighdimDataTypes$api.link[[matchingProjectionIndex]]
    } else { projection <- NULL }
  } else { projection <- NULL }

  if (is.null(projection)) {
    warning("No valid projection selected.\nSet the projection argument to one of the following options:\n",
            paste(listOfHighdimDataTypes$supportedProjections, "\n"))
    return(listOfHighdimDataTypes$supportedProjections)
  }
  message("Retrieving data from server. This can take some time, depending on your network connection speed. ",
          as.character(Sys.time()))
  serverResult <- .transmartServerGetRequest(paste(apiUrl, projectionLink, sep=""), auth.token, accept.type = "binary")
  if (length(serverResult$content) == 0) {
    warning("No data could be found. The server yielded an empty dataset. Returning NULL.")
    return(NULL)
  }

  return(.parseHighdimData(serverResult$content))
}

.parseHighdimData <-
function(rawVector, .to.data.frame.converter=.as.data.frame.fast) {
    dataChopper <- .messageChopper(rawVector)

    message <- dataChopper$getNextMessage()
    header <- read(highdim.HighDimHeader, message)
    columnSpec <- header$columnSpec
    assays <- header$assay
    DOUBLE <- highdim.ColumnSpec$ColumnType$DOUBLE
    STRING <- highdim.ColumnSpec$ColumnType$STRING

    columns <- .expandingList(1000)

    noAssays <- length(assays)

    assayLabels <- .DollarNames(assays[[1]])[1:length(assays[[1]])]

    for (label in assayLabels) {

        # RProtoBuf does not support int64 on all platforms that we need to
        # support. Specifically, binaries from CRAN don't support it, which
        # means Windows platforms and the default OSX R distribution. Linux and
        # OSX with Homebrew download source packages from CRAN and compile
        # locally, so they don't have problems with int64.
        #
        # The problem actually is in Rcpp. CRAN does not allow use of 'long
        # long' types as it is considered non portable. Rcpp works around this
        # by conditionally including 64 bit int support if it is compiled
        # somewhere other than CRAN.
        #
        # The serialization format uses an int64 field for
        # highdim.Assay.assayId. Reading that field on platforms that do not
        # have int64 support results in an error. As a workaround we parse the
        # assayId field from the string representation of the assay. (This works
        # because the as.character() conversion calls a DebugString method in
        # the C++ protobuf library, so the RProtoBuf C++ code that CRAN sees
        # never touches the 64 bit integers directly.)

        if (label == "assayId") {
            columns$add(label, sapply(assays, function(a) sub("assayId: ", "", grep(label, strsplit(as.character(a),
                    split = "\n")[[1]], value = TRUE))))
        } else {
            columns$add(label, sapply(assays, function(a) a[[label]]))
        }
    }

    message("Received data for ", noAssays, " assays. Unpacking data. ", as.character(Sys.time()))

    totalsize <- length(rawVector)

    labelToBioMarker <- hash() #biomarker info is optional, but should not be omitted, as it is also part of the data

    while (!is.null(message <- dataChopper$getNextMessage())) {
        row <- read(highdim.Row, message)
        rowlabel <- row$label

        labelToBioMarker[[rowlabel]] <- (if(is.null(row$bioMarker)) NA_character_ else row$bioMarker)
        rowValues <- row$value

        if(length(rowValues) == 1) {
            # if only one value, don't add the columnSpec name to the rowlabel.
            columns$add(rowlabel, rowValues[[1]]$doubleValue)
            next
        }

        # Multiple columns, add the columnSpec name to the labels to differentiate them.
        for(i in 1:length(rowValues)) {
            entryName <- paste(rowlabel, columnSpec[[i]]$name, sep=".")
            type <- columnSpec[[i]]$type
            if(type == STRING) {
                columns$add(entryName, rowValues[[i]]$stringValue)
            } else if(type == DOUBLE) {
                columns$add(entryName, rowValues[[i]]$doubleValue)
            } else {
                warning("Unknown row type: ", type)
            }
        }

    }

    message("Data unpacked. Converting to data.frame. ", as.character(Sys.time()))

    data <- .to.data.frame.converter(columns$as.list(), stringsAsFactors=FALSE)

    if(all(is.na(values(labelToBioMarker)))) {
        message("No biomarker information available.")
        labelToBioMarker <- "No biomarker information is available for this dataset"
        return(list(data = data))
    } else {
        message("Additional biomarker information is available.\nThis function will return a list containing a dataframe ",
                "containing the high dimensional data and a hash describing which (column) labels refer to which bioMarker")
        return(list(data = data, labelToBioMarkerMap = labelToBioMarker))
    }
}

.messageChopper <- function(rawVector, endOfLastMessage = 0) {
    msbSetToOne <- as.raw(128)
    progressTotal <- length(rawVector)
    pb <- c()

    getNextMessage <- function() {
        # The protobuf messages are written using writeDelimited in the Java
        # protobuf library. Unfortunately the C++ and R versions don't support
        # that function natively. We manually read a varint32 from the
        # connection that indicates the size of the next message.

        if (endOfLastMessage >= length(rawVector)) { return(NULL) }
        # The last byte of the varint32 has its most significant bit set to one.
        varint32Size <- min(which((msbSetToOne & rawVector[(endOfLastMessage+1):(endOfLastMessage+5)]) == as.raw(0)))
        varint32Connection <- rawConnection(rawVector[(endOfLastMessage+1):(endOfLastMessage+varint32Size)],
                open = "r+b")
        class(varint32Connection) <- "connection"
        connection <- ConnectionInputStream(varint32Connection)
        close(varint32Connection)
        messageSize <- tryCatch(ReadVarint32(connection), error= function(e) NULL)
        if (is.null(messageSize)) return(NULL)
        endOfThisMessage <- endOfLastMessage + varint32Size + messageSize
        message <- rawVector[(endOfLastMessage+1+varint32Size):endOfThisMessage]
        endOfLastMessage <<- endOfThisMessage
        return(message)
    }

    getRawVectorIndex <- function() { return(endOfLastMessage) }

    return(list(getNextMessage = getNextMessage, getRawVectorIndex = getRawVectorIndex))
}


# We need to repeatedly add an element to a list. With normal list concatenation
# or element setting this would lead to a large number of memory copies and a
# quadratic runtime. To prevent that, this function implements a bare bones
# expanding array, in which list appends are (amortized) constant time.
.expandingList <- function(capacity = 10) {
    buffer <- vector('list', capacity)
    names <- character(capacity)
    length <- 0

    methods <- list()

    methods$double.size <- function() {
        buffer <<- c(buffer, vector('list', capacity))
        names <<- c(names, character(capacity))
        capacity <<- capacity * 2
    }

    methods$add <- function(name, val) {
        if(length == capacity) {
            methods$double.size()
        }

        length <<- length + 1
        buffer[[length]] <<- val
        names[length] <<- name
    }

    methods$as.list <- function() {
        b <- buffer[0:length]
        names(b) <- names[0:length]
        return(b)
    }

    methods
}


.as.data.frame.fast <- function(data, ...) {
    # Add X'es to column names that start with numbers or other strange characters
    colnames <- names(data)
    rowDataIndexes <- -grep('^([a-zA-Z]|\\.[^0-9])', colnames)
    colnames[rowDataIndexes] <- paste('X', colnames[rowDataIndexes], sep='')

    names(data) <- colnames
    attr(data, 'row.names') <- 1:length(data[[1]])
    class(data) <- 'data.frame'

    data
}
thehyve/opencpuRScripts documentation built on May 31, 2019, 9:13 a.m.