R/nhs_pg.R

Defines functions nhs_pg

Documented in nhs_pg

#' Query data from PostgreSQL database
#'
#' @param ... files and variables
#' @param varLabel logical. Whether to add label for variables, default is TRUE
#' @param codebook logical. Whether to convert code into label.
#' @param nrows maximum rows to read. default is infinity
#' @param lowercd logical. Whether to convert codebook to little letter.
#' @param force_rbind logical. Whether to rbind the query list data
#' @param conn connection to nhanes database in PostgreSQL
#'
#' @return query data
#' @export
nhs_pg <- function(...,varLabel=TRUE,codebook=TRUE,nrows=Inf,lowercd = FALSE,
                   force_rbind=FALSE,conn){
    if (missing(conn)) conn <- get('nhs_Connect',envir = .GlobalEnv)
    th <- c(...)
    ck <- grepl(get_config_path(),th,ignore.case = TRUE)
    (files <- th[ck])
    (variable <- tolower(th[!ck]))
    if (length(variable)>0){
        variable[!grepl('::',variable)] <- paste0(variable[!grepl('::',variable)],'::',variable[!grepl('::',variable)])
        variable
    }
    if (do::cnOS()){
        tsv <- tmcn::toUTF8("\u5FC5\u987B\u662Ftsv\u6587\u4EF6\n     ")
    }else{
        tsv <- 'must be tsv file\n     '
    }
    if (any(tools::file_ext(files) != 'tsv')){
        files <- paste0(files[tools::file_ext(files) != 'tsv'],collapse = '\n     ')
        tsv <- paste0(tsv,files)
        stop(tsv)
    }
    (years <- prepare_years(files))
    (yearu <- years |> unique() |> do::increase())
    # file
    maxnchar <- max(nchar(prepare_items(files)))
    data <- lapply(yearu, function(i){
        (filesi <- files[years==i])
        cat(paste0('\n',crayon::red(do::Replace0(i,'.*/')),'(',length(filesi),')'))
        for (j in 1:length(filesi)) {
            (filej <- do::file.name(filesi[j]))
            (itemsj <- prepare_items(filesi[j]))
            # read data
            if (j==1){
                cat(paste0(itemsj,do::rep_n(' ',maxnchar-nchar(itemsj))))
            }else{
                if (itemsj != prepare_items(filesi[j-1])) cat('\n           ', paste0(itemsj,do::rep_n(' ',maxnchar-nchar(itemsj))))
            }
            noext <- do::Replace0(filej,paste0('\\.',tools::file_ext(filej)))
            cat(paste0(' ',crayon::blue(noext)))
            (pgj <- sprintf('"%s"."%s---%s"',i,itemsj,do::Replace0(filej,'\\.tsv')))
            if (length(variable)==0){
                cmd <- sprintf('select * from %s',pgj)
                dfj <- DBI::dbGetQuery(conn = conn$con,statement = cmd)
                head(dfj)
                pair=colnames(dfj)
                names(pair) <- pair
            }else{
                cmd <- sprintf('select * from %s limit 1',pgj)
                namej <- DBI::dbGetQuery(conn = conn$con,statement = cmd)  |> colnames()
                varlist <- do::Replace0(variable,' {0,}::.*') |> strsplit(' {0,}; {0,}')
                names(varlist) <- do::Replace0(variable,'.*::')
                pair <- lapply(varlist, function(k) set::and(k,namej))
                (pair <- pair[sapply(pair, length)>0])
                if (length(pair)==0){
                    dfj <- NULL
                }else{
                    pair <- unlist(pair)
                    varsql <- sprintf('"%s" as "%s"',pair,names(pair)) |>
                        paste0(collapse = ', ')
                    cmd <- sprintf('select %s from %s',varsql,pgj)
                    if (!is.infinite(nrows)) cmd <- paste0(cmd,' limit ', nrows)
                    dfj <- DBI::dbGetQuery(conn = conn$con,statement = cmd)
                    dfj <- unique(dfj)
                    head(dfj)
                }
            }
            # codebook
            if (codebook & !is.null(dfj)){
                (ckbkf <- do::Replace(filesi[j],'\\.tsv','.codebook'))
                if (file.exists(ckbkf)){
                    ckbk <- read.delim(ckbkf,comment.char = '#')
                    if (nrow(ckbk)>0){
                        ckbk$variable <- do::Trim(ckbk$variable)
                        ckbk$label <- do::Trim(ckbk$label)
                        ckbk$code <- do::Trim(ckbk$code)
                        if (lowercd) ckbk$label <- tolower(ckbk$label)
                        head(ckbk)
                        (ckbk <- ckbk[ckbk$variable %in% pair,])
                        codepair <- set::and(pair,ckbk$variable)
                        if (nrow(ckbk)>0){
                            for (k in 1:length(codepair)) {
                                (code <- ckbk[ckbk$variable == codepair[k],])
                                for (cd in 1:nrow(code)) {
                                    cdjd <- dfj[,names(codepair[k])] == code[cd,'code']
                                    cdjd[is.na(cdjd)] <- FALSE
                                    dfj[cdjd,names(codepair[k])] <- code[cd,'label']
                                }
                            }
                        }
                    }

                }
            }
            # add varLabel
            if (varLabel & !is.null(dfj)){
                (labefile <- do::Replace(filesi[j],'\\.tsv','.varLabel'))
                if (file.exists(labefile)){
                    labelj <- read.delim(labefile,comment.char = '#')
                    labelj <- labelj[labelj$variable %in% pair,]
                    if (nrow(labelj)>0){
                        row.names(labelj) <- do::Trim(labelj$variable)
                        (labelj <- labelj[pair,'label'])
                        labelj <- labelj[!is.na(labelj)]
                        (labelj <- set::not(labelj,'seqn'))
                        if (length(labelj)>0){
                            dfj <- sprintf('"%s" = "%s"',
                                           set::not(names(pair),'seqn'),
                                           labelj) |>
                                paste0(collapse = ', ') |>
                                sprintf(fmt = 'expss::apply_labels(dfj,%s)') |>
                                parse(file='',n=NULL) |>
                                eval()
                        }
                    }

                }
            }
            if (j==1){
                dfi <- dfj
            }else{
                dfi <- dplyr::full_join(dfi,dfj,by='seqn')
            }
        }
        dfi
    })
    cons <- DBI::dbListConnections(RPostgreSQL::PostgreSQL())
    for(i in cons) DBI::dbDisconnect(i)
    names(data) <- yearu
    if (length(data)==0){
        if (cat) cat('no data selected')
        if (cat) cat('\nTime: ',time_diff(Sys.time(),t1),'\n')
        return()
    }else if (length(data)==1){
        if (force_rbind){
            return(cbind(Year=names(data),data[[1]]))
        }else{
            return(data)
        }
    }else{
        (ck <- all(sapply(2:length(data), function(i) nrow(data[[1]]) == nrow(data[[i]]))))
        if (ck){
            (ck <- all(sapply(2:length(data), function(i) any(do::increase(colnames(data[[1]])) == do::increase(colnames(data[[i]]))))))
        }
        if (ck){
            for (i in 1:length(data)) {
                di <- cbind(Year=names(data)[i],data[[i]])
                data[[i]] <- di
            }
            names(data)=NULL
            df <- do.call(rbind,data) |> as.data.frame()
            row.names(df)=NULL
            return(df)
        }else{
            if (force_rbind){
                for (i in 1:length(data)) {
                    data[[i]] <- cbind(Year=names(data)[i],data[[i]])
                }
                df <- do.call(plyr::rbind.fill,data)
                row.names(df)=NULL
                return(df)
            }else{
                return(data)
            }
        }
    }
}
yikeshu0611/nhanesR documentation built on Jan. 29, 2022, 6:08 a.m.