R/read-data-ukb-functions.R

Defines functions description_to_name default_ukb_fields

Documented in default_ukb_fields

#' Default_ukb_fields
#'
#' Output the default data fields in a character vector. These data fields are used in converting data to episode data functions and include visit dates,birth ,self reported illness, death.
#' @return  character vector of data fields
#' @export
#' @examples
#'default_ukb_fields()
default_ukb_fields <- function(){

  SRfieldnames<-c("20001","20002","20004","20003")
  SRdatefieldnames <- c("20006","20008","20010")
  Deathfieldnames <- c("40000","40001","40002")
  CancerRegfieldnames<-c("40006","40013","40008","40005")
  Visitfieldnames <- c("53")
  Birthfieldnames <- c("34","52")
  c(SRfieldnames,SRdatefieldnames,Deathfieldnames,CancerRegfieldnames,Visitfieldnames,Birthfieldnames)

}

# Creates a variable name from the field description.
#
# @param data Field-to-description table from html file
#
description_to_name <-  function(Vct) {

  #https://github.com/kenhanscombe/ukbtools/blob/master/R/dataset.R
  name <- tolower(Vct) %>%
    gsub(" - ", "_", x = .) %>%
    gsub(" ", "_", x = .) %>%
    # remove the sentences describing data-coding
    gsub("uses.*data.*coding.*simple_list.$", "", x = .) %>%
    gsub("uses.*data.*coding.*hierarchical_tree.", "", x = .) %>%
    gsub("uses.*data.*coding_[0-9]*", "", x = .) %>%
    # remove leading weird character?
    gsub("[^[:alnum:][:space:]_]", "", x = .) %>%
    gsub("__*", "_", x = .)

  return(name)
}


#' Replace NA With Nearest Non-NA On The Left
#'
#' Used in parsing meta data file ukbxxxxx.html.
#' @param Vct
#' @return  imputed character vector
ReplaceNAWithNearestNonNAOnTheLeft <- function(Vct) {
  N <- length(Vct)
  na.pos <- which(is.na(Vct))
  if (length(na.pos) %in% c(0, N)) {
    print("0 or N na's")
    return(Vct)
  }
  non.na.pos <- which(!is.na(Vct))
  intervals  <- findInterval(na.pos, non.na.pos,
                             all.inside = TRUE)
  left.pos   <- non.na.pos[pmax(1, intervals)]
  right.pos  <- non.na.pos[pmin(N, intervals+1)]
  left.dist  <- na.pos - left.pos
  right.dist <- right.pos - na.pos

  # Vct[na.pos] <- ifelse(left.dist <= right.dist,
  #                       Vct[left.pos], Vct[right.pos])
  Vct[na.pos] <- Vct[left.pos]
  return(Vct)
}

parse_html_tables <- function(x){
  df<-data.frame(x,stringsAsFactors = F)
  names(df) <- gsub(pattern = "NULL.","",names(df))
  return(df)
}



#' Read the data dictionary for main dataset
#'
#' The metadata file *ukbxxxxx.html* can be generated using ukb utility. This function reads the html formatted file and return metadata in a dataframe.
#' @param fhtml Path to file
#' @return   metadata for main dataset as dataframe
#' @export
#' @examples
#' read_ukb_metadata ("ukb12345.html")
read_ukb_metadata <- function(fhtml) {
  tictoc::tic(paste("read ukb html",fhtml))
  # this (redundant?) message is perhaps clearer than the one thrown by readHTMLTable()
  if (!file.exists(fhtml)){
    message(paste0("ABORT reading metadata of the main dataset: File does not exist \n",fhtml))
    return()
  }
  # Column types as described by UKB
  # http://biobank.ctsu.ox.ac.uk/crystal/help.cgi?cd=value_type
  col_type <- c(
    "Sequence" = "double",
    "Integer" = "integer",
    "Categorical (single)" = "character",
    "Categorical (multiple)" = "character",
    "Continuous" = "double",
    "Text" = "character",
    "Date" = "character",
    "Time" = "character",
    "Compound" = "character",
    "Binary object" = "character",
    "Records" = "character",
    "Curve" = "character"
  )
  print("reading .html")
  # html file is the data dictionary generated by "ukbconv ukb23456.enc_ukb docs"

  lst_html_tables <- XML::readHTMLTable(fhtml)
  # lst_html_table is a large list with hundreds of parsed elements
  #lst_html_tables <- lapply(lst_html_tables,function(x) parse_html_tables(x) )

  # target table with metadata [n x 5] on the current basket
  # the first is header , others are notes and codings
  df_meta <- lst_html_tables[[2]]
  # additional instances of any field (same description) are parsed as NA, nearest left non NA is the first instance f.12345.0.0
  df_meta$Type <- ReplaceNAWithNearestNonNAOnTheLeft(df_meta$Type)
  df_meta$Description <- ReplaceNAWithNearestNonNAOnTheLeft(df_meta$Description)
  # clean the description column to keep only key description , replace space by "_"
  df_meta$Description <- description_to_name(df_meta$Description)

  df_meta<-ConvertFactorsToStringReplaceNAInDf(df_meta)


  df_meta <- tibble::tibble(
    # mutate df_meta with cols below
    field.number = df_meta$Column,
    field.count = as.numeric(df_meta$Count),

    # remove the visit and instance mark i.e field code in showcase
    field.showcase = gsub("-.*$", "", df_meta[, "UDI"]),
    field.html = df_meta[, "UDI"],
    # colname when dataset is generated in R format i.e. ukb23456.tab
    field.tab = paste("f.", gsub("-", ".", df_meta$UDI), sep = ""),
    field.description = df_meta[, "Description"],
    col.type = df_meta[, "Type"],
    # exmple col.name: verbal_interview_duration_f3_0_0
    col.name = ifelse(
      field.showcase == "eid",
      "eid",
      stringr::str_c(
        df_meta$Description, "_f",
        stringr::str_replace_all(field.html, c("-" = "_", "\\." = "_"))
      )
    ),
    # convert UKB column types to R types based on col_type
    fread_column_type = col_type[as.character(df_meta$Type)]

  )
  df_meta[which(df_meta$field.tab=="f.eid"),]$field.tab<-"identifier"

  tictoc::toc()
  return(df_meta)
}



#' Read the main dataset
#'
#' This function reads the dataset *ukbxxxxx.tab* generated using ukb utility and keep only data fields specified. Depending on the size of the dataset the process may take some time.
#' @param fukb Path to the main dataset (.tab) file
#' @param dfhtml dataframe containing the metadata, can be obtained by `read_ukb_metadata()`.
#' @param fields_to_keep the datafields to keep as a character vector, default: default_ukb_fields()
#' @return   main dataset as dataframe with only selected data fields
#' @export
#' @examples
#' fukbtab <- paste(pheno_dir,"ukb12345.tab",sep="")
#' dfhtml<- read_ukb_metadata ("ukb12345.html")
#' dfukb <- read_ukb_tabdata(fukbtab,dfhtml,fields_to_keep = default_ukb_fields())
read_ukb_tabdata <- function(fukb,
                          dfhtml,
                          fields_to_keep = default_ukb_fields()) {
  tictoc::tic(paste("read ukb data",fukb))
  if (!file.exists(fukb)){
    message(paste0("ABORT reading main dataset: File does not exist \n",fukb))
    return()
  }

  if (!exists("n_threads")){n_threads=1}

  if(!any(fields_to_keep %in% "eid" )){
    # ensure the identifier always present
    fields_to_keep <- c("eid", fields_to_keep)
  }
  # vector containing fields in .tab format
  fields_to_keep.tab <- dfhtml[dfhtml$field.showcase %in% fields_to_keep & dfhtml$field.count!=0,]$field.tab
  # R col type for each field
  fields_to_keep.classes <- dfhtml[dfhtml$field.showcase %in% fields_to_keep  & dfhtml$field.count!=0,]$fread_column_type

  if(length(fields_to_keep.classes) != length(fields_to_keep.tab)){
    print("error, fields_to_keep.classes doesnt match fields_to_keep")
    break
  }
  # subset the metadata
  dfhtml.sub <- dfhtml[dfhtml$field.tab %in% fields_to_keep.tab,]

  freadcolclasses <- rep("NULL",nrow(dfhtml))
  # ####
  freadcolclasses[which(dfhtml$field.tab %in% fields_to_keep.tab)] <- dfhtml[which(dfhtml$field.tab %in% fields_to_keep.tab),]$fread_column_type

  tictoc::tic("fread data")
  df <- data.table::fread( paste0(fukb), header=T,
               colClasses = freadcolclasses,
               sep = "\t",
               showProgress = TRUE)
  print(format(object.size(df), units = "Gb"))
  # rename f.eid to identifier
  names(df)[names(df) == names(df)[grepl("eid", names(df))]]<-"identifier"
  #  change from integer to double; numeric sorting is ~150x faster than character sort
  df$identifier<-as.numeric(df$identifier)
  tictoc::toc()
  return(df)
}
#col.classes[col.classes %in% "integer64"] <- "character" #integer64 not supported, unsupported in disk.frame? but not in data.table



#' Read Hospital Inpatient Data
#'
#' This function reads the record-level HospitalEpisodeStatistics (HES) data organized in several data tables
#' @param fhesin Path to HESIN (master file)
#' @param fhesin_diag Path to HESIN_DIAG file containing diagnosis codes
#' @param fhesin_oper Path to HESIN_OPER file containing Operations and procedural codes
#' @param add_extra_hesin_columns  if True, adds extra columns "ins_index","source"
#' @return  list of data.table objects with all episodes
#' @export
#' @examples
#' read_hesin_data("hesin.txt" ,"hesin_diag.txt" ,"hesin_oper.txt" )
read_hesin_data <- function(fhesin, fhesin_diag,fhesin_oper,add_extra_hesin_columns=F){
  # check if file paths are valid
  if (!file.exists(fhesin)| !file.exists(fhesin_diag) | !file.exists(fhesin_oper)){
    message("ABORT reading HESIN data: Required file(s) not found:")
    message(paste("fhesin exists -",file.exists(fhesin),fhesin),sep=" ")
    message(paste("fhesin_diag exists -",file.exists(fhesin_diag),fhesin_diag,sep=" "))
    message(paste("fhesin_oper exists -",file.exists(fhesin_oper),fhesin_oper))
    return()
  }

  #  refer HES Data Dictionary Document ID:141140
  ## TODO; use library(fasttime); fastPOSIXct(DT$start_date)
  # read hesin, extract event date
  message(paste0("read hesin: "),fhesin)
  dfhesin <- (data.table::fread(fhesin,header=T,sep="\t", stringsAsFactors=FALSE, na.strings=""))
  message("converting dates")
  dfhesin$epistart <- as.Date(as.character(dfhesin$epistart),format="%d/%m/%Y") #"%Y%m%d")
  dfhesin$admidate <- as.Date(as.character(dfhesin$admidate),format="%d/%m/%Y")
  dfhesin$epiend <- as.Date(as.character(dfhesin$epiend),format="%d/%m/%Y")
  dfhesin$disdate <- as.Date(as.character(dfhesin$disdate),format="%d/%m/%Y")
  # if start and end date not available for episode, replace with admission and discharge date respectively
  dfhesin[is.na(dfhesin$epistart),"epistart"] <- dfhesin[is.na(dfhesin$epistart),"admidate"]
  dfhesin[is.na(dfhesin$epiend),"epiend"] <- dfhesin[is.na(dfhesin$epiend),"disdate"]
  # recompute epidur explicitly, a very small proportion (0.003%) of episodes crossing HES data-year have erroneous entry
  dfhesin$epidur <- as.numeric(dfhesin$epiend - dfhesin$epistart )
  # sanity check
  dfhesin[dfhesin$epidur <0,]$epidur <- NA
  # colnames(dfhesin)
  #keep only cols that maybe of use
  dfhesin <- dfhesin %>% dplyr::select(eid,ins_index,source,epistart,admidate,epiend,disdate,epidur)



  # read diag
  message(paste0("read diag: ",fhesin_diag))
  dfdiag <- (data.table::fread(fhesin_diag,header=T,sep="\t", stringsAsFactors=FALSE, na.strings=""))
  message("merging hesin + diagnosis")
  dfhesin_diag <- merge(dfhesin,dfdiag,by = c("eid","ins_index"),all=T)
  dfhesin_diag$eventdate <- dfhesin_diag$epistart
  dfhesin_diag$event <- 1
  # mark those without epistart date , ~0.001%
  dfhesin_diag[is.na(dfhesin_diag$eventdate)]$event <- 0
  dfhesin_diag <- dfhesin_diag[, event:=as.integer(event)]

  # read oper
  message(paste0("read oper: ",fhesin_oper))
  dfoper <- (data.table::fread(fhesin_oper,header=T,sep="\t", stringsAsFactors=FALSE, na.strings=""))
  # note date format differ from main hesin table
  dfoper$opdate <- as.Date(as.character(dfoper$opdate),format="%d/%m/%Y")
  message("merging hesin + operation") # for duration, take episode duration.
  dfhesin_oper <- merge(dfhesin,dfoper,by = c("eid","ins_index"),all=T)
  dfhesin_oper$eventdate <- dfhesin_oper$opdate
  # take epistart date from main HESIN table if opdate not available
  dfhesin_oper[is.na(dfhesin_oper$eventdate),"eventdate"] <- dfhesin_oper[is.na(dfhesin_oper$eventdate),"epistart"]

  dfhesin_oper$event <- 1
  # mark those without a date
  dfhesin_oper[is.na(dfhesin_oper$eventdate)]$event <- 0
  dfhesin_oper <- dfhesin_oper[, event:=as.integer(event)]

  # filter + rename
  if(add_extra_hesin_columns==T){extra_hesin_columns=c("ins_index","source")} else{extra_hesin_columns=NULL}
  tte.hesin.oper3.primary <- dfhesin_oper %>% dplyr::filter(level==1 & !is.na(oper3))  %>% dplyr::select(eid,eventdate,epidur,oper3,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = oper3,event=event)  %>% data.table::as.data.table()
  tte.hesin.oper4.primary <- dfhesin_oper %>% dplyr::filter(level==1 & !is.na(oper4))  %>% dplyr::select(eid,eventdate,epidur,oper4,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = oper4,event=event)  %>% data.table::as.data.table()
  tte.hesin.icd10.primary <- dfhesin_diag %>% dplyr::filter( level==1 & !is.na(diag_icd10))  %>% dplyr::select(eid,eventdate,epidur,diag_icd10,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = diag_icd10,event=event)  %>% data.table::as.data.table()
  tte.hesin.icd9.primary <- dfhesin_diag %>% dplyr::filter( level==1 & !is.na(diag_icd9))  %>% dplyr::select(eid,eventdate,epidur,diag_icd9,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = diag_icd9,event=event)  %>% data.table::as.data.table()

  tte.hesin.oper3.secondary <- dfhesin_oper %>% dplyr::filter(level==2 & !is.na(oper3))  %>% dplyr::select(eid,eventdate,epidur,oper3,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = oper3,event=event)  %>% data.table::as.data.table()
  tte.hesin.oper4.secondary <- dfhesin_oper %>% dplyr::filter(level==2 & !is.na(oper4))  %>% dplyr::select(eid,eventdate,epidur,oper4,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = oper4,event=event)  %>% data.table::as.data.table()
  tte.hesin.icd10.secondary <- dfhesin_diag %>% dplyr::filter( level==2 & !is.na(diag_icd10))  %>% dplyr::select(eid,eventdate,epidur,diag_icd10,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = diag_icd10,event=event)  %>% data.table::as.data.table()
  tte.hesin.icd9.secondary <- dfhesin_diag %>% dplyr::filter( level==2 & !is.na(diag_icd9))  %>% dplyr::select(eid,eventdate,epidur,diag_icd9,event,eval(extra_hesin_columns))  %>% dplyr::rename(identifier=eid,eventdate = eventdate,epidur=epidur,code = diag_icd9,event=event)  %>% data.table::as.data.table()


  lst <- list(tte.hesin.oper3.primary = tte.hesin.oper3.primary,
              tte.hesin.oper4.primary = tte.hesin.oper4.primary,
              tte.hesin.icd10.primary = tte.hesin.icd10.primary,
              tte.hesin.icd9.primary = tte.hesin.icd9.primary,

              tte.hesin.oper3.secondary = tte.hesin.oper3.secondary,
              tte.hesin.oper4.secondary = tte.hesin.oper4.secondary,
              tte.hesin.icd10.secondary = tte.hesin.icd10.secondary,
              tte.hesin.icd9.secondary = tte.hesin.icd9.secondary)
  #  change from integer to double; numeric sorting is ~150x faster than character sort
  lst<-lapply(lst,function(x) {x[, identifier:=as.numeric(identifier)] })
  # set key
  lst <- lapply(lst,function(x) {setkey(x,code) })
  lst <- lapply(lst,function(x) {x[, c('code') := lapply(.SD, as.character), .SDcols = c('code')] })
  # lst <- lapply(lst,function(x) {x[, ('f.eid') := lapply(.SD, as.character), .SDcols = 'f.eid'] })

  return(lst)

}





#' Read Primary Care clinical event records
#'
#' This function reads the record-level clinical events from General Practitioner (GP) data. Refer to official UKB documetation for more information regarding this data.
#' @param fgp Path to GP clinical event records
#' @return   list of data.table objects with all episodes
#' @export
#' @examples
#' read_gp_clinical_data("gpclinical.txt" )
read_gp_clinical_data <- function(fgp){
  # stop if there is no file
  if (!file.exists(fgp)){
    message(paste0("ABORT reading GP clinical data: File does not exist - ",fgp))
    return()
  }
  tictoc::tic(paste("read gp data",fgp))
  # message(paste("read gp data",fgp))
  # records potentially erroneous have date changed to 01/01/1901 (occured before birth), 02/02/1902 (occured on DOB), 03/03/1903 (same year as DOB), 07/07/2037 (occured after the time of extraction)
  mindate = as.Date("1930-01-01")
  maxdate = format(Sys.time(),"%Y-%m-%d") ## change to today?.
  ####################################################
  # in current structure of gp_clinical.txt
  ###################################################
  # $1 eid $3 event_dt $4 read_2 $5 read3
  # $2 data_provider  $6 - $8 are free-text fields "value" entries differ depending on the data source
  # eid	data_provider	event_dt	read_2	read_3	value1	value2	value3

    # dfgp <- data.table::fread(cmd = paste("awk -F'\t'  '$6==\"\" && $7==\"\" && $8==\"\" {print $1,$3,$4,$5}' OFS='\t'",fgp) ,sep = "\t",
                # colClasses = c("integer","character","character","character")) #,select=cols_tokeep)
  dfgp <- data.table::fread(fgp ,sep = "\t",select=c(eid="integer", event_dt="character",read_2="character",read_3="character")) # ," | head -10000 "
# temp<-fread(paste0(fgp,".head"),sep = "\t",select=c(eid="integer", event_dt="character",read_2="character",read_3="character"))

  #names(dfgp) <-  c("eid",	"data_provider",	"event_dt",	"read_2",	"read_3",	"value1","value2","value3")

  names(dfgp) <-  c("eid",	"event_dt",	"read_2",	"read_3")
  dfgp$event_dt <- as.Date(as.character(dfgp$event_dt),format="%d/%m/%Y")
  #dfgp$event_dt <- format(fasttime::fastPOSIXct(dfgp$event_dt),format="%Y-%m-%d") # needs specific input format, making that format takes more time...

  message(paste("missing dates for ", sum(is.na(dfgp$event_dt)),"of",nrow(dfgp),"entries = ",100*sum(is.na(dfgp$event_dt))/nrow(dfgp),"%, excluding these." ))
  dfgp <- dfgp[!is.na(dfgp$event_dt),]

  message(paste("QC dates.. "))
  dfgp <- subset(dfgp, event_dt > mindate ) # removing before 1930-ish.. some 1900, 1902, 1903 observations..
  dfgp <- subset(dfgp, event_dt < maxdate ) # removing after today-ish.. some 2037 observations.
  message(paste("#individuals in gp clinical:",length(unique(dfgp$eid))))

  dfgp$event <- 1


  #  parse versions
  tte.gpclinical.read3 <-  dfgp %>% dplyr::filter(read_3 !="")  %>% dplyr::select(eid,event_dt,read_3,event)  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = read_3,event=event)  %>% data.table::as.data.table()
  tte.gpclinical.read2 <-  dfgp %>% dplyr::filter(read_2 !="")  %>% dplyr::select(eid,event_dt,read_2,event)  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = read_2,event=event)  %>% data.table::as.data.table()


  lst <- list(tte.gpclinical.read2=tte.gpclinical.read2,tte.gpclinical.read3=tte.gpclinical.read3)
  lst <- lapply(lst,function(x) {data.table::setkey(x,code) })
  lst <- lapply(lst,function(x) {x[, ('identifier') := lapply(.SD, as.numeric), .SDcols = 'identifier'] })

  # ############################################################################################
  # ##instance filter
  # message(glue::glue("Retain only records which occur {min_instance} times."))
  # lst <- lapply(lst,function(x){x[, if(.N>min_instance) .SD, by = c('f.eid','code')]})
  # message(glue::glue("#indvidual remain {length(unique(c(lst$tte.gpclinical.read2$f.eid,lst$tte.gpclinical.read3$f.eid)))}"))
  # #############################################################################################
  #
  # #with more thoughts, I am not sure what to do with the window now?
  # dfout_extrastats <- suppressWarnings(test[, .(eventdate=eventdate,datediff=(eventdate-shift(eventdate,fill=NA,type="lag"))), keyby=list(f.eid,code)])
  tictoc::toc() #423.762 sec elapsed  #822.694 sec with the instance filter!
  return(lst)

}

#' Parse the value columns of the Primary Care clinical records
#'
#' This function parse the free text fields of the GP clinical records. Information contains in the columns include test results and remarks
#' @param fgp Path to GP clinical event records
#' @return   list of data.table objects with all episodes
#' @export
#' @examples
#' parse_gp_clinical_values("gpclinical.txt" )
parse_gp_clinical_values <- function(fgp){
  # stop if there is no file
  if (!file.exists(fgp)){
    message(paste0("ABORT reading GP clinical data: File does not exist - ",fgp))
    return()
  }
  tictoc::tic(paste("read gp data",fgp))
  # message(paste("read gp data",fgp))
  # records potentially erroneous have date changed to 01/01/1901 (occured before birth), 02/02/1902 (occured on DOB), 03/03/1903 (same year as DOB), 07/07/2037 (occured after the time of extraction)
  mindate = as.Date("1930-01-01")
  maxdate = format(Sys.time(),"%Y-%m-%d") ## change to today?.
  ####################################################
  # in current structure of gp_clinical.txt
  ###################################################
  # $1 eid $3 event_dt $4 read_2 $5 read3
  # $2 data_provider  $6 - $8 are free-text fields "value" entries differ depending on the data source
  # eid	data_provider	event_dt	read_2	read_3	value1	value2	value3

  # dfgp <- data.table::fread(cmd = paste("awk -F'\t'  '$6==\"\" && $7==\"\" && $8==\"\" {print $1,$3,$4,$5}' OFS='\t'",fgp) ,sep = "\t",
  # colClasses = c("integer","character","character","character")) #,select=cols_tokeep)
  dfgp <- data.table::fread(fgp ,sep = "\t",select=c(eid="integer",data_provider="integer" ,event_dt="character",read_2="character",read_3="character",value1="character",value2="character",value3="character")) # ," | head -10000 "
  # temp<-fread(paste0(fgp,".head"),sep = "\t",select=c(eid="integer", event_dt="character",read_2="character",read_3="character"))

  #names(dfgp) <-  c("eid",	"data_provider",	"event_dt",	"read_2",	"read_3",	"value1","value2","value3")

  names(dfgp) <-  c("eid","data_provider",	"event_dt",	"read_2",	"read_3","value1","value2","value3")
  dfgp$event_dt <- as.Date(as.character(dfgp$event_dt),format="%d/%m/%Y")
  #dfgp$event_dt <- format(fasttime::fastPOSIXct(dfgp$event_dt),format="%Y-%m-%d") # needs specific input format, making that format takes more time...

  message(paste("missing dates for ", sum(is.na(dfgp$event_dt)),"of",nrow(dfgp),"entries = ",100*sum(is.na(dfgp$event_dt))/nrow(dfgp),"%, excluding these." ))
  dfgp <- dfgp[!is.na(dfgp$event_dt),]

  message(paste("QC dates.. "))
  dfgp <- subset(dfgp, event_dt > mindate ) # removing before 1930-ish.. some 1900, 1902, 1903 observations..
  dfgp <- subset(dfgp, event_dt < maxdate ) # removing after today-ish.. some 2037 observations.
  message(paste("#individuals in gp clinical:",length(unique(dfgp$eid))))

  dfgp$event <- 1
  # retain only records with entry in the first free-text field which is used across all providers
  dfgp<-dfgp[value1!=""]
  #  parse versions & supplier , see official UKB documentation for details
  # TODO make a second function to process the value column (taking the event code into account)
  # !!NOTE!!: TPP stores Blood pressures as 2 separate records 246.. for SBP & 246A. for DBP ; others store this as one record with code 246.. mostly (in 2 value columns) but also use 246A. for DBP
  # TPP England uses CTV3 and only value 1 column is taken
  tte.gpclinical.read3.tpp <-  dfgp %>% dplyr::filter(read_3 !="")  %>% dplyr::select(eid,event_dt,read_3,event,value1)  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = read_3,event=event,value=value1)  %>% data.table::as.data.table()
  # Vision England READ2 codes and uses value1 & value2
  tte.gpclinical.read2.vision <-  dfgp %>% dplyr::filter(data_provider==1)  %>% dplyr::mutate(dplyr::across(value1:value3, na_if,"")) %>%  tidyr::unite(value, value1:value3, na.rm = TRUE, sep = ",", remove = TRUE)%>%dplyr::select(eid,event_dt,read_2,event,value)  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = read_2,event=event,value=value)  %>% data.table::as.data.table()
  # Wales also READ2 codes and uses value1 &value2 but values not guaranteed to be NUMERIC
  tte.gpclinical.read2.wales <-  dfgp %>% dplyr::filter(data_provider==4)  %>%dplyr::mutate(dplyr::across(value1:value3, na_if,"")) %>%  tidyr::unite(value, value1:value3, na.rm = TRUE, sep = ",", remove = TRUE)%>%dplyr::select(eid,event_dt,read_2,event,value)  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = read_2,event=event,value=value)  %>% data.table::as.data.table()
  # Scotland free text data with the most variety READ2 codes and uses all 3 columns not guaranteed to be NUMERIC
  # As the information depends heavily on the READ2 codes associated, only paste everything together here like for other providers
  # tidyr::unite() vs  dplyr::mutate(value=paste(...) https://stackoverflow.com/questions/70900172/concatenate-2-columns-into-a-new-column-skip-blanks-in-r
  tte.gpclinical.read2.scotland <-  dfgp %>% dplyr::filter(data_provider==2)  %>%dplyr::mutate(dplyr::across(value1:value3, na_if,"")) %>%  tidyr::unite(value, value1:value3, na.rm = TRUE, sep = ",", remove = TRUE)%>%dplyr::select(eid,event_dt,read_2,event,value)  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = read_2,event=event,value=value)  %>% data.table::as.data.table()

  lst <- list(tte.gpclinical.read2.vision=tte.gpclinical.read2.vision,tte.gpclinical.read2.wales=tte.gpclinical.read2.wales,tte.gpclinical.read2.scotland=tte.gpclinical.read2.scotland,tte.gpclinical.read3.tpp=tte.gpclinical.read3.tpp)
  lst <- lapply(lst,function(x) {data.table::setkey(x,code) })
  lst <- lapply(lst,function(x) {x[, ('identifier') := lapply(.SD, as.numeric), .SDcols = 'identifier'] })

  # ############################################################################################
  # ##instance filter
  # message(glue::glue("Retain only records which occur {min_instance} times."))
  # lst <- lapply(lst,function(x){x[, if(.N>min_instance) .SD, by = c('f.eid','code')]})
  # message(glue::glue("#indvidual remain {length(unique(c(lst$tte.gpclinical.read2$f.eid,lst$tte.gpclinical.read3$f.eid)))}"))
  # #############################################################################################
  #
  # #with more thoughts, I am not sure what to do with the window now?
  # dfout_extrastats <- suppressWarnings(test[, .(eventdate=eventdate,datediff=(eventdate-shift(eventdate,fill=NA,type="lag"))), keyby=list(f.eid,code)])
  tictoc::toc() #423.762 sec elapsed  #822.694 sec with the instance filter!
  return(lst)

}


#' Read Primary Care prescription records
#'
#' This function reads the prescription records from General Practitioner (GP) data. Refer to official UKB documetation for more information regarding this data.
#' @param fgp Path to GP prescription records
#' @param add_extra_script_columns whether or not to keep drug_name and quantity columns, default: False
#' @return   list of data.table objects with all episodes
#' @export
#' @examples
#' read_gp_clinical_data("gpscripts.txt" )
read_gp_script_data <- function(fgp,add_extra_script_columns=F){
  # stop if there is no file
  if (!file.exists(fgp)){
    message(paste0("ABORT reading GP prescription data: File does not exist - ",fgp))
    return()
  }
  tictoc::tic("read gp prescription data")
  mindate = as.Date("1930-01-01")
  maxdate = format(Sys.time(),"%Y-%m-%d") ## change to today?.

  # TODO: some way to preprocess such huge file ?
  # fgp<-fgp_scripts
  # 1.eid	data_provider	issue_date	read_2	bnf_code	dmd_code	drug_name	8.quantity
  dfgp <- data.table::fread(fgp ,sep = "\t",colClasses = c("numeric","integer","character","character","character","character","character","character")) #,select=cols_tokeep) ," | head -10000 "

  names(dfgp) <-  c("eid","data_provider","event_dt",	"read_2",	"bnf_code",	"dmd_code",	"drug_name",	"quantity")
  dfgp$event_dt <- as.Date(as.character(dfgp$event_dt),format="%d/%m/%Y")

  print(paste("missing dates for ", sum(is.na(dfgp$event_dt)),"of",nrow(dfgp),"entries = ",100*sum(is.na(dfgp$event_dt))/nrow(dfgp),"%, excluding these." ))
  dfgp <- dfgp[!is.na(dfgp$event_dt),]

  print(paste("QC dates.. "))
  dfgp <- subset(dfgp, event_dt > mindate ) # removing before 1930-ish.. some 1900, 1902, 1903 observations..
  dfgp <- subset(dfgp, event_dt < maxdate ) # removing after today-ish.. some 2037 observations.
  print(paste("#individuals in gp script:",length(unique(dfgp$eid))))

  dfgp$event <- 1

  if(add_extra_script_columns==T){extra_script_columns=c("drug_name","quantity")} else{extra_script_columns=NULL}


  # Parse by data sources
  # note BNF codings are used differently between data sources i.e. Scotland / England TPP, refer to official doc
  # DMD used in England Vision, note one prescription may be described differently and coded differently
  # TODO standardize med codes:  varies in length (how to deal with this?), some with "." in between (strip all dots?)
  ########################################################################################
  # England Vision, only provider using dmd codes
  # Sep2020: all records have dmd codes, ~2% has no read2 -> take dmd for these records
  tte.gpscript.dmd.england <-  dfgp %>% dplyr::filter(dmd_code !="")  %>% dplyr::select(eid,event_dt,dmd_code,event,eval(extra_script_columns))  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = dmd_code,event=event)  %>% data.table::as.data.table()
  # England TPP , bnf
  # data_provider 1= England(Vision), 2= Scotland, 3 = England (TPP), 4 = Wales bnf_code !="" &
  tte.gpscript.bnf.england <-  dfgp %>% dplyr::filter( data_provider == 3 & bnf_code !="")   %>%  dplyr::select(eid,event_dt,bnf_code,event,eval(extra_script_columns))  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = bnf_code,event=event)  %>% data.table::as.data.table()
 #######################################################################################
  #Scotland
  # majority of records have bnf codes , while some have BOTH bnf codes and read 2!
  # Sep2020: 2 records have only read2 but not bnf! -> take bnf
  # somehow if filter with && then no result is returned??????????????? but filter twice works....
  tte.gpscript.bnf.scotland <-  dfgp %>% dplyr::filter((data_provider == 2 )&(bnf_code !="")) %>% dplyr::select(eid,event_dt,bnf_code,event,eval(extra_script_columns) ) %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = bnf_code,event=event)  %>% data.table::as.data.table()
# to avoid double counting the record
  # tte.gpscript.read2.scotland <-  dfgp %>% dplyr::filter(read_2 !="" && data_provider == 2 )  %>% dplyr::select(eid,event_dt,read_2,event)  %>% dplyr::rename(f.eid=eid,eventdate = event_dt,code = read_2,event=event)  %>% data.table::as.data.table()
  ########################################################################################
  # Wales , read_2
  tte.gpscript.read2.wales <-  dfgp %>% dplyr::filter(data_provider == 4 & read_2 !="")  %>% dplyr::select(eid,event_dt,read_2,event,eval(extra_script_columns))  %>% dplyr::rename(identifier=eid,eventdate = event_dt,code = read_2,event=event)  %>% data.table::as.data.table()


  # TODO count by tables
  # counts <- dfgp[, .N, by=.(read_2, dmd_code,bnf_code,drug_name)] # add unique per individual.

  # dfgp[ dfgp$read_2!="" & dfgp$dmd_code !="",]
  # names(dfgp) <-  c("eid",	"event_dt",	"read_2",	"bnf")

  #dfgp$event_dt <- format(fasttime::fastPOSIXct(dfgp$event_dt),format="%Y-%m-%d") # needs specific input format, making that format takes more time...

  # lst <- list(tte.gpscript.dmd.england=tte.gpscript.dmd.england,tte.gpscript.bnf.england=tte.gpscript.bnf.england,tte.gpscript.bnf.scotland=tte.gpscript.bnf.scotland,tte.gpscript.read2.scotland=tte.gpscript.read2.scotland,tte.gpscript.read2.wales=tte.gpscript.read2.wales)
  lst <- list(tte.gpscript.dmd.england=tte.gpscript.dmd.england,tte.gpscript.bnf.england=tte.gpscript.bnf.england,tte.gpscript.bnf.scotland=tte.gpscript.bnf.scotland,tte.gpscript.read2.wales=tte.gpscript.read2.wales)
  lst <- lapply(lst,function(x) {data.table::setkey(x,code) })
  lst <- lapply(lst,function(x) {x[, ('identifier') := lapply(.SD, as.numeric), .SDcols = 'identifier'] })


  # ############################################################################################
  # ##instance filter
  # message(glue::glue("Retain only records which occur {min_instance} times."))
  # lst <- lapply(lst,function(x){x[, if(.N>min_instance) .SD, by = c('f.eid','code')]})
  # message(glue::glue("#indvidual remain {length(unique(c(lst$tte.gpscript.dmd.england$f.eid,lst$tte.gpscript.bnf.england$f.eid,lst$tte.gpscript.bnf.scotland$f.eid,lst$tte.gpscript.read2.wales$f.eid)))}"))
  # #############################################################################################

  tictoc::toc() #423.762 sec elapsed
  return(lst)

}



sumcounts <- function(dfs){

  # df <- suppressWarnings(Reduce(function(...) merge(..., all = TRUE, by = "code"), dfs))
  df<-suppressWarnings(Reduce((function() {counter = 0
  function(x, y) {
    counter <<- counter + 1
    d = merge(x, y, all = T, by = 'code')
    setnames(d, c(head(names(d), -1), paste0('N.', counter)))
  }})(), dfs))


  names(df)<-c("code",names(dfs))
  df <- data.table::as.data.table(cbind(df,N=df[ ,rowSums(.SD,na.rm = T), .SDcols =names(df)[!names(df) %in% "code"] ]))
  return(df)
}




#' Get summary counts on a list of data tables
#'
#' This function reads list of episode data in data tables and return summary counts by classification systems according to lst.data.settings.
#' @param lst.data list of data tables
#' @param lst.data.settings data.setting as dataframe
#' @return   a list of data tables of summary counts organized in classifications
#' @export
#' @examples
#' lst.data.settings <-fread(data.settings.tsv)
#' lst.data <- list()
#' lst.data$ts <- convert_touchscreen_to_episodedata(dfukb,ts_conditions = dfDefinitions_processed$TS)
#' lst.data$tte.sr.20002 <- convert_nurseinterview_to_episodedata(dfukb,field_sr_diagnosis = "20002",field_sr_date = "20008",qc_treshold_year = 10)
#' lst.data <- append(lst.data,read_hesin_data(fhesin ,fhesin_diag ,fhesin_oper ))
#' get_lst_counts(lst.data,lst.data.settings)
get_lst_counts <- function(lst.data,lst.data.settings) {
  if(nrow(lst.data.settings[!lst.data.settings$datasource %in% names(lst.data),])>0)  {
    message(paste("NOTE not in lst.data:", paste( lst.data.settings[!lst.data.settings$datasource %in% names(lst.data),'datasource'],sep=",",collapse=",") ))
    }
  lst.data.settings <- lst.data.settings[lst.data.settings$datasource %in% names(lst.data),]
  print("counting")
  lst.counts <- lapply(lst.data, function(x) x[, .N, by=.(code)] )
  # print(lst.counts)
  lst.counts.aggregate <- list()
  for (cls in unique(lst.data.settings$classification)){

    # print(cls)
    dfs <- lst.data.settings[lst.data.settings$classification %in% cls,'datasource']

    if(any(names(lst.counts) %in% dfs$datasource)){
      # lst.counts[[dfs]]
      # class(lst.counts)
      dfs_<-as.vector(unlist(dfs$datasource))
      i.na <- which(is.na(names(lst.counts[dfs_])))
      # print(i.na)
      if(length(i.na)>0) {message(paste("WARNING unavailable: ", dfs[i.na])); dfs <- dfs[-i.na] }

      lst.counts.aggregate[[cls]] <- sumcounts(lst.counts[dfs_])
    } else{
      # print(i.na)

      # print(length(i.na))
      # print(is.na(names(lst.counts[dfs_])))
      message(glue::glue("{cls} not found in lst.counts"))
    }

  }
  return(lst.counts.aggregate)
}

#' Read Mortality data
#'
#' This function reads death data on the Data Portal.
#' @param fdeath_portal Path to file with DEATH table
#' @param fdeath_cause_portal Path to file with DEATH_CAUSE table
#' @return   a data.table object with all episodes
#' @export
#' @examples
#' read_death_data("death.txt","death_cause.txt" )
read_death_data <- function(fdeath_portal, fdeath_cause_portal){
  # check if file paths are valid
  if (!file.exists(fdeath_portal)| !file.exists(fdeath_cause_portal) ){
    message("ABORT reading death records: Required file(s) not found:")
    message(paste("fdeath_portal exists -",file.exists(fdeath_portal),fdeath_portal),sep=" ")
    message(paste("fdeath_cause_portal exists -",file.exists(fdeath_cause_portal),fdeath_cause_portal,sep=" "))
    return()
  }

  tictoc::tic(paste("read death data",fdeath_portal,"&",fdeath_cause_portal))
  mindate = as.Date("1930-01-01")
  maxdate = format(Sys.time(),"%Y-%m-%d") ## change to today?.

  # read file
  death.portal=fread(fdeath_portal)
  death.cause.portal=fread(fdeath_cause_portal)
  #  each record uniquely identified by the eid (encoded identifier) of the participant and the instance index (ins_index) of the record

  dfdeath<-merge(death.portal,death.cause.portal,by=c("eid","ins_index"))
  # level indicate primary or secondary cause
  dfdeath<-dplyr::select(dfdeath,eid , cause_icd10, date_of_death,level)
  names(dfdeath) <- c("identifier","code","eventdate","level")
  # class change for consistency
  dfdeath <- dfdeath[, identifier:=as.numeric(identifier)]
  # note the difference in format of the date between tables
  dfdeath <- dfdeath[, eventdate:=as.Date(eventdate,format="%d/%m/%Y")]
  dfdeath <- subset(dfdeath, eventdate < maxdate ) # remove deaths that occurs after today

  # parse primary and secondary cause; add event flag with all considered valid; drop col level
  dfdeath.primary<-dfdeath %>% dplyr::filter(level ==1 ) %>% dplyr::mutate(event=1) %>% dplyr::select(identifier , code, eventdate,event)
  dfdeath.primary <- dfdeath.primary[, event:=as.integer(event)]

  dfdeath.secondary<-dfdeath %>% dplyr::filter(level ==2 ) %>% dplyr::mutate(event=1) %>% dplyr::select(identifier , code, eventdate,event)
  dfdeath.secondary <- dfdeath.secondary[, event:=as.integer(event)]

  tictoc::toc()

  lst_death <- list("primary" = dfdeath.primary, "secondary" = dfdeath.secondary)
  message("setkey(code)")
  lst_death <- lapply(lst_death,function(x) {data.table::setkey(x,code) })
  # lst_death <- lapply(lst_death,function(x) {x[, ('identifier') := lapply(.SD, as.character), .SDcols = 'identifier'] })
  return(lst_death)

}


get_all_exsiting_codes <- function(fportaldata,code.colname,fout){
  if (length(code.colname)!=length(fout)){
    message("Please check the number of code columns should equal to the number of out file paths, \n Abort!")
    return()
    }
  tictoc::tic()
  message(glue::glue("read files for column(s):{glue::glue_collapse(code.colname,sep=',')}"))
  dt <- data.table::fread(fportaldata, sep="\t", select = code.colname)
  message(glue::glue("number of lines in input file: {nrow(dt)}"))
  if (length(code.colname)>0){
    for (i in 1:length(code.colname)){
      message(glue::glue("{code.colname[i]}"))
      dt_out<-unique(dt[,code.colname[i],with=FALSE])
      # use double brackets for data tables when selecting column(s) by string
      dt_out<-dt_out[order(dt_out[[code.colname[i]]])]
      dt_out<-dt_out[!apply(dt_out == "", 1, all),]
      print(head(dt_out))
      message(glue::glue("number of lines in data table:{nrow(dt_out)}"))
      message(glue::glue("write all available {code.colname[i]} codes to file {fout[i]}."))

      # code.colnames and fout should have identical length
      data.table::fwrite(dt_out,file=fout[i])
    }

  }
  tictoc::toc()
}




#library(disk.frame)
# tic("read gp data")
# mindate = as.Date("1930-01-01")
# maxdate = format(Sys.time(),"%Y-%m-%d") ## change to today?.
# dfgp <- fread(cmd = paste("awk -F'\t'  '$6==\"\" && $7==\"\" && $7==\"\" {split($3, a, \"/\"); $3 = a[3]\"/\"a[2]\"/\"a[1];print $1,$3,$4,$5}' OFS='\t'",fgp) ,sep = "\t",
#               colClasses = c("integer","string","string","string")) #,select=cols_tokeep) ," | head -10000 "
# #names(dfgp) <-  c("eid",	"data_provider",	"event_dt",	"read_2",	"read_3",	"value1","value2","value3")
# names(dfgp) <-  c("eid",	"event_dt",	"read_2",	"read_3")
# dfgp$event_dt <- format(fasttime::fastPOSIXct(dfgp$event_dt),format="%Y-%m-%d")
# toc() #636.492 sec elapsed
#

# ####
# #
# library(readxl)
# fgpcoding.xls="/Users/niek/repos/ukbpheno/all_lkps_maps.xlsx"
# dfgpcoding.xls.read_ctv3_lkp <- as.data.table(read_xlsx(fgpcoding.xls,sheet="read_ctv3_lkp",col_types = rep("text",4) ))
#
# counts <- dfgp[, .N, by=.(read_2, read_3)]
# counts <- merge (counts,dfgpcoding.xls.read_ctv3_lkp, by.x="read_3",by.y="read_code",all=TRUE)
# dfgp <- fread(cmd = paste("awk -F'\t'  '$6==\"\" && $7==\"\" && $7==\"\" {print $1,$3,$4,$5}' OFS='\t'",fgp) ,sep = "\t") #,select=cols_tokeep) ," | head -10000 "
# names(dfgp) <-  c("eid",	"event_dt",	"read_2",	"read_3")
#
#
# ram_size=8
# outdir = paste0(fgp,"_diskframe/")
# # if !diskframe
# dfgp <- csv_to_disk.frame(fgp,
#                            nchunks = recommend_nchunks(sum(file.size(fgp)),ram_size=ram_size),
#                            #in_chunk_size = rows_to_read,
#                            outdir = outdir,
#                            colClasses = c("integer","integer","string","string","string","string","string","string"),
#                            col.names = c("eid",	"data_provider",	"event_dt",	"read_2",	"read_3",	"value1","value2","value3"), sep="\t")
#
# print(format(object.size(dfgp), units = "Mb"))
#
#
# ## doesntwork with diskframe??
# dfgp.dt <- as.data.table(dfgp) #[dfgp$value1=="" & dfgp$value2 =="" & dfgp$value3 =="",]
# dfgp.dt[dfgp.dt$value1=="" & dfgp.dt$value2 =="" & dfgp.dt$value3 =="",]
# dfgp.dt <- dfgp[dfgp$value1=="" & dfgp$value2 =="" & dfgp$value3 =="",]
#
#; fastPOSIXct(DT$start_date)

# hist( unique(dfgp$event_dt), "years", freq = TRUE)
# hist( dfgp$event_dt, "years", freq = TRUE,bars=100)
niekverw/ukbpheno documentation built on Oct. 30, 2023, 9:17 p.m.