R/ctrLoadQueryIntoDbCtgov2.R

Defines functions ctrLoadQueryIntoDbCtgov2

### ctrdata package

#' ctrLoadQueryIntoDbCtgov2
#'
#' @inheritParams ctrLoadQueryIntoDb
#'
#' @keywords internal
#' @noRd
#'
#' @importFrom jqr jq jq_flags
#' @importFrom utils URLencode
#' @importFrom jsonlite stream_in
#' @importFrom httr GET status_code content
#' @importFrom stringi stri_replace_all_regex
#'
ctrLoadQueryIntoDbCtgov2 <- function(
    queryterm = queryterm,
    register,
    euctrresults,
    euctrresultshistory,
    ctgov2history,
    documents.path,
    documents.regexp,
    annotation.text,
    annotation.mode,
    only.count,
    con, verbose,
    queryupdateterm) {

  ## create empty temporary directory
  tempDir <- ctrTempDir(verbose)

  ## ctgov api ---------------------------------------------------------

  ctgovEndpoints <- c(
    # pageSize 0 delivers default 10
    "https://www.clinicaltrials.gov/api/v2/studies?format=json&countTotal=true&pageSize=1&%s",
    "https://www.clinicaltrials.gov/api/v2/studies?format=json&countTotal=true&pageSize=1000&%s",
    "https://storage.googleapis.com/ctgov2-large-docs/%s/%s/%s",
    "https://www.clinicaltrials.gov/api/int/studies/%s/history/%s",
    "https://www.clinicaltrials.gov/api/int/studies/%s?history=true"
  )

  ## process parameters ------------------------------------------------

  ## check
  if (!is.character(ctgov2history)) {
    ctgov2history <- deparse(ctgov2history)
  }
  if (!length(ctgov2history) ||
      !grepl("^(FALSE|TRUE|-1L?|1L?|[0-9]+L?:[0-9]+L?|[1-9]+[0-9]+L?|[1-9]+L?)$", ctgov2history)) {
    message("Parameter 'ctgov2history' invalid, ignored: ", ctgov2history)
    ctgov2history <- "FALSE"
  }

  # append if to update
  queryterm <- paste0(queryterm, "&", queryupdateterm)

  # mangle
  queryterm <- gsub("&$", "", queryterm)
  queryterm <- gsub("%20", " ", queryterm) # for URLencode

  # translation to ClinicalTrials.gov REST API 2
  # https://clinicaltrials.gov/data-about-studies/learn-about-api

  # distance=50 seems to be added in webinterface
  # even though not requested by user, removing it
  queryterm <- sub("([&]?)distance=50(&|$)", "&", queryterm)

  # slice by "&"
  queryterm <- strsplit(queryterm, split = "&")[[1]]
  queryterm <- queryterm[!grepl("^https://", queryterm)]
  queryterm <- queryterm[queryterm != ""]

  # url to api
  apiParams <- list(
    #
    "filter.geo" = list(
      "extract" = "distance=(.+?)(&|$)",
      "replace" = "filter.geo=distance(\\1)",
      "collapse" = "",
      "out" = character()
    ),
    #
    "filter.advanced" = list(
      "extract" = list(
        "resFirstPost=([0-9-]*)_?([0-9-]*)(&.+|$)",
        "primComp=([0-9-]*)_?([0-9-]*)(&.+|$)",
        "studyComp=([0-9-]*)_?([0-9-]*)(&.+|$)",
        "lastUpdPost=([0-9-]*)_?([0-9-]*)(&.+|$)",
        "firstPost=([0-9-]*)_?([0-9-]*)(&.+|$)",
        "start=([0-9-]*)_?([0-9-]*)(&.+|$)",
        "ageRange=([0-9a-z]*)_?([0-9a-z]*)(&.+|$)"
      ),
      "replace" = list(
        "AREA[ResultsFirstPostDate]RANGE[\\1,\\2]",
        "AREA[PrimaryCompletionDate]RANGE[\\1,\\2]",
        "AREA[CompletionDate]RANGE[\\1,\\2]",
        "AREA[LastUpdatePostDate]RANGE[\\1,\\2]",
        "AREA[StudyFirstPostDate]RANGE[\\1,\\2]",
        "AREA[StartDate]RANGE[\\1,\\2]",
        "AREA[MinimumAge]RANGE[\\1, MAX] AND AREA[MaximumAge]RANGE[MIN, \\2]"
      ),
      "collapse" = " AND ",
      "out" = character()
    ),
    #
    "query.locn" = list(
      "extract" = list(
        "country=(.+)(&|$)",
        "locStr=(.+)(&.+|$)",
        "locn=(.+)(&|$)"
      ),
      "replace" = list(
        "AREA[LocationCountry]\\1",
        "AREA[LocationCity]\\1",
        "AREA[LocationFacility]\\1"
      ),
      "collapse" = ",",
      "out" = character()
    ),
    #
    # hand through aggFilters
    list(
      "extract" = "(aggFilters=.+)(&|$)",
      "replace" = "&\\1",
      "collapse" = "",
      "out" = character()
    ),
    #
    # other "query." terms
    # the values apparently can include AREA expressions
    #
    # https://clinicaltrials.gov/find-studies/constructing-complex-search-queries#areaOp
    # Declares which search area should be searched. Search areas are defined on the
    # ClinicalTrials.gov Search Area page. In addition to specifying search areas,
    # it is possible to specify a field from the study structure. Any field from the study
    # structure is searchable.
    #
    # https://clinicaltrials.gov/data-api/api#extapi
    # see /studies/ endpoint for relation of search areas to query parameters
    #
    list(
      "extract" = "(cond|term|intr|titles|outc|spons|lead|id)=(.+)(&|$)",
      "replace" = "&query.\\1=\\2",
      "collapse" = "",
      "out" = character()
    )
  )

  # iterate over API terms
  for (t in seq_along(queryterm)) {
    for (a in seq_along(apiParams)) {
      for (i in seq_along(apiParams[[a]][["extract"]])) {
        if (grepl(apiParams[[a]][["extract"]][[i]], queryterm[t])) {
          item <-
            sub(apiParams[[a]][["extract"]][[i]],
                apiParams[[a]][["replace"]][[i]],
                queryterm[t]
            )
          apiParams[[a]][["out"]] <-
            paste0(
              c(apiParams[[a]][["out"]], item),
              collapse = apiParams[[a]][["collapse"]]
            )
        } # if extract
      } # extract
    } # apiParams
  } # queryterm

  # concatenate
  queryterm <- sapply(apiParams, "[[", "out")
  queryterm <- queryterm[seq_along(queryterm)[sapply(queryterm, length) > 0L]]
  for (i in seq_along(queryterm)) { # i = 4
    nm <- names(queryterm)[i]
    if (nchar(nm)) queryterm[i] <- paste0(nm, "=", queryterm[i])
  }
  queryterm <- paste0(queryterm, collapse = "&")

  # adjust remaining quirks
  queryterm <- sub("^&", "", queryterm)
  queryterm <- gsub("&&+", "&", queryterm)
  queryterm <- gsub("RANGE\\[,", "RANGE[MIN,", queryterm)
  queryterm <- stringi::stri_replace_all_regex(queryterm, "(RANGE\\[.+?),\\]", "$1,MAX]")

  ## process query -----------------------------------------------------

  # corresponds to count
  url <- sprintf(ctgovEndpoints[1], queryterm)
  if (verbose) message("API call: ", url)
  message("* Checking trials using CTGOV REST API 2.0...", appendLF = FALSE)
  url <- utils::URLencode(url)
  counts <- httr::GET(url)

  # early exit
  if (httr::status_code(counts) != 200L) {
    warning("Could not be retrieved, check 'queryterm' and / or 'register'. ",
            "\nAPI returned: ", httr::content(counts), call. = FALSE)
    message("API call: ", url)
    return(emptyReturn)
  }

  # extract total number of trial records
  counts <- suppressMessages(httr::content(counts, as = "text"))
  resultsEuNumTrials <- as.numeric(jqr::jq(counts, '.totalCount'))
  message("\b\b\b, found ", resultsEuNumTrials, " trials")

  # early exit
  if (!resultsEuNumTrials) {
    warning("No trials found, check 'queryterm' and 'register'")
    return(emptyReturn)
  }

  # only count?
  if (only.count) {

    # return
    return(list(n = resultsEuNumTrials,
                success = NULL,
                failed = NULL))
  }

  # exit if too many records
  if (resultsEuNumTrials > 10000L) {
    stop("These are ", resultsEuNumTrials, " (more than 10,000) trials, this may be ",
         "unintended. Downloading more than 10,000 trials may not be supported ",
         "by the register; consider correcting or splitting queries")
  }

  ## download json -----------------------------------------------------

  # corresponds to trials

  url <- sprintf(ctgovEndpoints[2], queryterm)
  url <- utils::URLencode(url)

  pageNextToken <- ""
  pageNumber <- 1L
  importDateTime <- strftime(Sys.time(), "%Y-%m-%d %H:%M:%S")

  message("(1/3) Downloading in ",
          ceiling(resultsEuNumTrials / 1000L),
          " batch(es) (max. 1000 trials each; estimate: ",
          format(resultsEuNumTrials * 0.1, digits = 2), " Mb total)")

  # register
  on.exit(unlink(dir(tempDir, "ctgov_trials_[0-9]+.ndjson", full.names = TRUE)), add = TRUE)

  # download and compile into ndjson
  while (TRUE) {

    # page url
    urlToDownload <- ifelse(
      pageNextToken != "",
      paste0(url, "&pageToken=", pageNextToken),
      url)

    # for download
    fTrialJson <- file.path(
      tempDir, paste0(
        "ctgov_trials_",
        # include query in file name for potential re-download
        sapply(url, digest::digest, algo = "crc32"),
        "_", pageNumber, ".json"))

    # do download
    tmp <- ctrMultiDownload(
      urlToDownload,
      fTrialJson,
      progress = TRUE,
      verbose = verbose)

    # inform user
    if (tmp[1, "status_code", drop = TRUE] != 200L) message(
      "Download not successful for ", urlToDownload)

    # convert to ndjson
    message("(2/3) Converting to NDJSON...\r", appendLF = FALSE)
    fTrialsNdjson <- file.path(tempDir, paste0("ctgov_trials_", pageNumber,".ndjson"))
    jqr::jq(
      file(fTrialJson),
      paste0(
        # extract trial records. studies seems always to be an array,
        # even for a single trial, thus no handling needed if array or not
        ' .studies | .[] ',
        # add elements
        '| .["_id"] = .protocolSection.identificationModule.nctId
         | .["ctrname"] = "CTGOV2"
         | .["record_last_import"] = "', importDateTime, '"'
      ),
      flags = jqr::jq_flags(pretty = FALSE),
      out = fTrialsNdjson
    )

    # continue or exit
    pageNumber <- pageNumber + 1L
    # "nextPageToken":"NF0g5JGBlPMuwQY"} at end of json
    fTrialJsonCon <- file(fTrialJson, open = "rb")
    seek(fTrialJsonCon, where = file.size(fTrialJson) - 40L)
    pageNextTokenTest <- readChar(fTrialJsonCon, 1000L)
    close(fTrialJsonCon)
    pageNextToken <- sub('.*"nextPageToken":"(.+?)".*', "\\1", pageNextTokenTest)
    if (pageNextToken == pageNextTokenTest) break

  }

  ## database import -----------------------------------------------------

  message("\n(3/3) Importing records into database...")

  # dbCTRLoadJSONFiles operates on pattern = ".+_trials_.*.ndjson"
  imported <- dbCTRLoadJSONFiles(dir = tempDir, con = con, verbose = verbose)
  message("")

  ## download history---------------------------------------------------

  if (ctgov2history != "FALSE") {

    message("* Checking and processing historic versions... ")

    ## 1 - get history overview for every trial
    urls <- as.vector(vapply(
      X = imported$success,
      FUN = function(i) sprintf(ctgovEndpoints[5], i),
      FUN.VALUE = character(1L),
      USE.NAMES = FALSE
    ))

    files <- as.vector(vapply(
      X = urls,
      FUN = function(i) file.path(
        tempDir, paste0(
          "h_ov_",
          sub(".+/(NCT[0-9]+)[?].+", "\\1", i),
          ".json")),
      FUN.VALUE = character(1L),
      USE.NAMES = FALSE
    ))

    tmp <- ctrMultiDownload(
      urls = urls,
      destfiles = files,
      resume = FALSE,
      verbose = verbose
    )

    # process
    historyDf <- lapply(
      X = files,
      FUN = function(i) {
        jsonlite::stream_in(
          textConnection(
            jqr::jq(file(i), paste0(
              " .history.changes[] | { ",
              '"_id": "', sub(".+_(NCT[0-9]+)[.]json", "\\1", i), '", ',
              "version_number: .version, version_date: .date }"
            ))), pagesize = 5000L, verbose = FALSE)})
    #
    historyDf <- do.call(rbind, historyDf)
    if (verbose) {
      message("Trial _id - number of versions")
      msgTmp <- as.data.frame(table(historyDf[["_id"]]))
      for (i in seq_len(nrow(msgTmp))) {
        message(
          msgTmp[i, 1, drop = TRUE], " - ",
          msgTmp[i, 2, drop = TRUE])
      }
    }
    #
    # shift version number from API 0... to ctrdata 1...
    historyDf[["version_number"]] <- historyDf[["version_number"]] + 1L

    ## 2 - for specific ctgov2history
    ##     values, adjust historyDf
    #
    # n versions per trial
    if (grepl("^([1-9]+[0-9]+L?|[1-9]+L?)$", ctgov2history)) {
      ctgov2history <- sub("L$", "", ctgov2history)
      countVersions <- as.integer(ctgov2history)
      historyDf <- array2DF(tapply(
        historyDf, historyDf[["_id"]],
        FUN = function(i) {
          ntr <- sort(i[["version_number"]])
          ntl <- seq(1L, length(ntr), length.out = countVersions)
          i[i[["version_number"]] %in% ntr[ntl], ]
        }))[, -1]
    }
    # last-but-one version
    if (grepl("^-1L?$", ctgov2history)) {
      ctgov2history <- sub("L$", "", ctgov2history)
      historyDf <- array2DF(tapply(
        historyDf, historyDf[["_id"]],
        FUN = function(i) {
          lbo <- max(i[["version_number"]])
          i[i[["version_number"]] == max(lbo - 1L, 1L), ]
        }))[, -1]
    }
    # only initial version
    if (grepl("^1L?$", ctgov2history)) {
      historyDf <- historyDf[historyDf[["version_number"]] == 1L, ]
    }
    # selected versions
    if (grepl(":", ctgov2history)) {
      minVersion <- as.numeric(sub("([0-9]+)L?:([0-9]+)L?", "\\1", ctgov2history))
      maxVersion <- as.numeric(sub("([0-9]+)L?:([0-9]+)L?", "\\2", ctgov2history))
      soughtVersion <- historyDf[["version_number"]] >= minVersion &
        historyDf[["version_number"]] <= maxVersion
      historyDf <- historyDf[soughtVersion, ]
    }
    # construct urls
    urls <- sprintf(
      ctgovEndpoints[4], historyDf[["_id"]], historyDf[["version_number"]] - 1L)

    ## 3 - handle historic versions

    # calculate file paths
    files <- as.vector(vapply(
      X = urls,
      FUN = function(i) file.path(
        tempDir, paste0(
          "h_v_",
          sub(".+/(NCT[0-9]+)/.+", "\\1", i), "_",
          sub(".+/([0-9]+)$", "\\1", i),
          ".json")),
      FUN.VALUE = character(1L),
      USE.NAMES = FALSE
    ))

    # download
    message(
      "- Downloading ", length(files), " historic versions (estimate: ",
      format(length(files) * 2.7 / 71, digits = 2), " MB total)...")

    tmp <- ctrMultiDownload(
      urls = urls,
      destfiles = files,
      resume = FALSE,
      verbose = verbose
    )

    ## 4 - merge versions by trial
    message("- Merging trial versions ", appendLF = FALSE)

    # register deletion
    on.exit(unlink(file.path(tempDir, paste0(
      "h_m_", unique(historyDf[["_id"]]), ".json")
    )), add = TRUE)

    # do version merge
    res <- sapply(
      X = unique(historyDf[["_id"]]),
      FUN = function(i) {

        out <- file.path(tempDir, paste0("h_m_", i, ".json"))
        unlink(out)

        # put historic versions into top level array
        cat(paste0('{"_id": "', i, '", "history": ['), file = out, append = TRUE)

        fToMerge <- tmp[["destfile"]][grepl(i, tmp[["destfile"]])]

        # write history study versions into array
        for (ii in seq_along(fToMerge)) {

          if (!file.exists(fToMerge[ii]) || !file.size(fToMerge[ii]) > 10L) next
          if (ii > 1L) cat(",", file = out, append = TRUE)

          vn <- as.numeric(jqr::jq(file(fToMerge[ii]), ' .studyVersion')) + 1L

          # add information about version
          cat(
            jqr::jq(file(fToMerge[ii]), paste0(
              ' .study | .history_version = { "version_number": ', vn, ",",
              ' "version_date": "', historyDf[["version_date"]][
                historyDf[["_id"]] == i & historyDf[["version_number"]] == vn], '"} '
            ),
            flags = jqr::jq_flags(pretty = FALSE)
            ),
            file = out,
            append = TRUE
          )

        }

        cat(']}\n', file = out, append = TRUE)
        message(". ", appendLF = FALSE)

      },
      USE.NAMES = FALSE
    )

    ## 5 - import
    message("\n- Updating trial records ", appendLF = FALSE)
    resAll <- NULL
    for (f in dir(path = tempDir, pattern = "^h_m_.*.json$", full.names = TRUE)) {

      message(". ", appendLF = FALSE)
      res <- nodbi::docdb_update(src = con, key = con$collection, query = "{}", value = f)
      resAll <- c(resAll, res)

    }
    message("\nUpdated ", sum(resAll), " trial(s) with historic versions")

  }

  ## download files-----------------------------------------------------

  if (!is.null(documents.path)) {

    # user info
    message(
      "* Checking for documents...\n",
      "- Getting links to documents")

    # temporary file for trial ids and file names
    downloadsNdjson <- file.path(tempDir, "ctgov2_downloads.ndjson")
    unlink(downloadsNdjson)
    downloadsNdjsonCon <- file(downloadsNdjson, open = "at")
    on.exit(try(close(downloadsNdjsonCon), silent = TRUE), add = TRUE)
    on.exit(unlink(downloadsNdjson), add = TRUE)

    # extract trial ids and file name and save in temporary file
    for (ndjsonFile in dir(
      path = tempDir, pattern = "^.+_trials_.*.ndjson$", full.names = TRUE)) {
      jqr::jq(
        file(ndjsonFile),
        ' { _id: ._id,
            filename: .documentSection.largeDocumentModule.largeDocs[].filename }',
        flags = jqr::jq_flags(pretty = FALSE),
        out = downloadsNdjsonCon)
      message(". ", appendLF = FALSE)
    }
    close(downloadsNdjsonCon)
    message("\r", appendLF = FALSE)

    # get document trial id and file name
    dlFiles <- jsonlite::stream_in(
      file(downloadsNdjson), pagesize = 5000L, verbose = FALSE)

    # check if any documents
    if (!nrow(dlFiles)) {
      message("= No documents identified for downloading.")
    } else {

      # calculate urls
      dlFiles$url <- sprintf(
        ctgovEndpoints[3],
        sub(".*([0-9]{2})$", "\\1", dlFiles$`_id`),
        dlFiles$`_id`,
        dlFiles$filename)

      # do download
      resFiles <- ctrDocsDownload(
        dlFiles[, c("_id", "filename", "url"), drop = FALSE],
        documents.path, documents.regexp, verbose)

    } # if (!nrow(dlFiles))

  } # !is.null(documents.path)

  ## delete for any re-downloads
  unlink(dir(
    path = tempDir, pattern = "ctgov_trials_[0-9]+.ndjson",
    full.names = TRUE))

  ## inform user -----------------------------------------------------

  # find out number of trials imported into database
  message("= Imported or updated ", imported$n, " trial(s)")

  # return
  return(imported)

}
# end ctrLoadQueryIntoDbCtogv2023

Try the ctrdata package in your browser

Any scripts or data that you put into this service are public.

ctrdata documentation built on June 30, 2024, 5:07 p.m.