#' Query data from PostgreSQL database
#'
#' @param ... files and variables
#' @param varLabel logical. Whether to add label for variables, default is TRUE
#' @param codebook logical. Whether to convert code into label.
#' @param nrows maximum rows to read. default is infinity
#' @param lowercd logical. Whether to convert codebook to little letter.
#' @param force_rbind logical. Whether to rbind the query list data
#' @param conn connection to nhanes database in PostgreSQL
#'
#' @return query data
#' @export
nhs_pg <- function(...,varLabel=TRUE,codebook=TRUE,nrows=Inf,lowercd = FALSE,
force_rbind=FALSE,conn){
if (missing(conn)) conn <- get('nhs_Connect',envir = .GlobalEnv)
th <- c(...)
ck <- grepl(get_config_path(),th,ignore.case = TRUE)
(files <- th[ck])
(variable <- tolower(th[!ck]))
if (length(variable)>0){
variable[!grepl('::',variable)] <- paste0(variable[!grepl('::',variable)],'::',variable[!grepl('::',variable)])
variable
}
if (do::cnOS()){
tsv <- tmcn::toUTF8("\u5FC5\u987B\u662Ftsv\u6587\u4EF6\n ")
}else{
tsv <- 'must be tsv file\n '
}
if (any(tools::file_ext(files) != 'tsv')){
files <- paste0(files[tools::file_ext(files) != 'tsv'],collapse = '\n ')
tsv <- paste0(tsv,files)
stop(tsv)
}
(years <- prepare_years(files))
(yearu <- years |> unique() |> do::increase())
# file
maxnchar <- max(nchar(prepare_items(files)))
data <- lapply(yearu, function(i){
(filesi <- files[years==i])
cat(paste0('\n',crayon::red(do::Replace0(i,'.*/')),'(',length(filesi),')'))
for (j in 1:length(filesi)) {
(filej <- do::file.name(filesi[j]))
(itemsj <- prepare_items(filesi[j]))
# read data
if (j==1){
cat(paste0(itemsj,do::rep_n(' ',maxnchar-nchar(itemsj))))
}else{
if (itemsj != prepare_items(filesi[j-1])) cat('\n ', paste0(itemsj,do::rep_n(' ',maxnchar-nchar(itemsj))))
}
noext <- do::Replace0(filej,paste0('\\.',tools::file_ext(filej)))
cat(paste0(' ',crayon::blue(noext)))
(pgj <- sprintf('"%s"."%s---%s"',i,itemsj,do::Replace0(filej,'\\.tsv')))
if (length(variable)==0){
cmd <- sprintf('select * from %s',pgj)
dfj <- DBI::dbGetQuery(conn = conn$con,statement = cmd)
head(dfj)
pair=colnames(dfj)
names(pair) <- pair
}else{
cmd <- sprintf('select * from %s limit 1',pgj)
namej <- DBI::dbGetQuery(conn = conn$con,statement = cmd) |> colnames()
varlist <- do::Replace0(variable,' {0,}::.*') |> strsplit(' {0,}; {0,}')
names(varlist) <- do::Replace0(variable,'.*::')
pair <- lapply(varlist, function(k) set::and(k,namej))
(pair <- pair[sapply(pair, length)>0])
if (length(pair)==0){
dfj <- NULL
}else{
pair <- unlist(pair)
varsql <- sprintf('"%s" as "%s"',pair,names(pair)) |>
paste0(collapse = ', ')
cmd <- sprintf('select %s from %s',varsql,pgj)
if (!is.infinite(nrows)) cmd <- paste0(cmd,' limit ', nrows)
dfj <- DBI::dbGetQuery(conn = conn$con,statement = cmd)
dfj <- unique(dfj)
head(dfj)
}
}
# codebook
if (codebook & !is.null(dfj)){
(ckbkf <- do::Replace(filesi[j],'\\.tsv','.codebook'))
if (file.exists(ckbkf)){
ckbk <- read.delim(ckbkf,comment.char = '#')
if (nrow(ckbk)>0){
ckbk$variable <- do::Trim(ckbk$variable)
ckbk$label <- do::Trim(ckbk$label)
ckbk$code <- do::Trim(ckbk$code)
if (lowercd) ckbk$label <- tolower(ckbk$label)
head(ckbk)
(ckbk <- ckbk[ckbk$variable %in% pair,])
codepair <- set::and(pair,ckbk$variable)
if (nrow(ckbk)>0){
for (k in 1:length(codepair)) {
(code <- ckbk[ckbk$variable == codepair[k],])
for (cd in 1:nrow(code)) {
cdjd <- dfj[,names(codepair[k])] == code[cd,'code']
cdjd[is.na(cdjd)] <- FALSE
dfj[cdjd,names(codepair[k])] <- code[cd,'label']
}
}
}
}
}
}
# add varLabel
if (varLabel & !is.null(dfj)){
(labefile <- do::Replace(filesi[j],'\\.tsv','.varLabel'))
if (file.exists(labefile)){
labelj <- read.delim(labefile,comment.char = '#')
labelj <- labelj[labelj$variable %in% pair,]
if (nrow(labelj)>0){
row.names(labelj) <- do::Trim(labelj$variable)
(labelj <- labelj[pair,'label'])
labelj <- labelj[!is.na(labelj)]
(labelj <- set::not(labelj,'seqn'))
if (length(labelj)>0){
dfj <- sprintf('"%s" = "%s"',
set::not(names(pair),'seqn'),
labelj) |>
paste0(collapse = ', ') |>
sprintf(fmt = 'expss::apply_labels(dfj,%s)') |>
parse(file='',n=NULL) |>
eval()
}
}
}
}
if (j==1){
dfi <- dfj
}else{
dfi <- dplyr::full_join(dfi,dfj,by='seqn')
}
}
dfi
})
cons <- DBI::dbListConnections(RPostgreSQL::PostgreSQL())
for(i in cons) DBI::dbDisconnect(i)
names(data) <- yearu
if (length(data)==0){
if (cat) cat('no data selected')
if (cat) cat('\nTime: ',time_diff(Sys.time(),t1),'\n')
return()
}else if (length(data)==1){
if (force_rbind){
return(cbind(Year=names(data),data[[1]]))
}else{
return(data)
}
}else{
(ck <- all(sapply(2:length(data), function(i) nrow(data[[1]]) == nrow(data[[i]]))))
if (ck){
(ck <- all(sapply(2:length(data), function(i) any(do::increase(colnames(data[[1]])) == do::increase(colnames(data[[i]]))))))
}
if (ck){
for (i in 1:length(data)) {
di <- cbind(Year=names(data)[i],data[[i]])
data[[i]] <- di
}
names(data)=NULL
df <- do.call(rbind,data) |> as.data.frame()
row.names(df)=NULL
return(df)
}else{
if (force_rbind){
for (i in 1:length(data)) {
data[[i]] <- cbind(Year=names(data)[i],data[[i]])
}
df <- do.call(plyr::rbind.fill,data)
row.names(df)=NULL
return(df)
}else{
return(data)
}
}
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.