R/ws_data_na_span_2_pgdb.R

#' ws_data_na_span_2_pgdb() retrieves the subset of ws with data available in the intersecting ws
#'
#' Retrieves the weather data from the intersecting stations. It is known that NOAA listed
#' available data for certain years, months, or days; however, some of this information is missing. 
#' This function replaces the missing data with a NA value of the dataset. These value is read by 
#' the Postgres database as an empty cell. You can confirm this using pgAdmin3 or other database manager.
#' The purpose of this function is to store the raw data, as it is, so that, the user can decide
#' what to do with missing points ( i.e. extrapolate). The example shows the use of ws_metadata_span_2_pgdb()
#' , but it can be very well be replaced by ws_metadata_2_pgdb().  
#'
#' @keywords NOAA, weather station, rgdal, RPostgreSQL, rnoaa
#' @param ghcnd    String that refers to the Global Historical Climate Network (daily) dataset
#' @param geoid    FIPS number from census tables
#' @param type     Variable under investigation i.e. TMAX, TMIN, PRCP
#' @param span     Look-back time to search and retrieve weather data information
#' @param ws_metadata  The table name of the subset of weather stations with intersecting data
#' @return Returns the name of the table that has been created. When fail, it returns 1.

#' @examples
#' ghcnd        <- 'GHCND'
#' geoid        <- '12087'
#' type         <- 'TMAX'
#' stations     <- as.data.frame( all_coor_ws( ghcnd, geoid, type) )
#' span         <- '2'
#' ws_metadata  <-  ws_metadata_span_2_pgdb( geoid, type, stations, span)
#' ws_data_na_span_2_pgdb(ghcnd, geoid, type, span, ws_metadata)
#'
#' @export
ws_data_na_span_2_pgdb <- function( ghcnd, geoid, type, span, ws_metadata){

  file   <- base::paste(Sys.getenv("HOME"), "/","pg_config.yml", sep="")
  config <- yaml::yaml.load_file( file )

  driver <- "PostgreSQL"
  drv    <- RPostgres::Postgres()
  conn   <- RPostgres::dbConnect(drv, host= config$dbhost, port= config$dbport, dbname= config$dbname, user= config$dbuser, password= config$dbpwd)

  
  if ( as.integer(geoid) < 100) {
    q1     <- base::paste("select NAME from cb_2013_us_state_20m where GEOID='", geoid,"'", sep="")
    res    <- RPostgres::dbSendQuery(conn, q1)
    state  <- RPostgres::dbFetch(res)
    RPostgres::dbClearResult(res)
    tableName <- base::paste(state, "_",geoid,"_ws_data_span_",span,"_na_", sep="")

  } else {

    q2     <- base::paste("select NAME from cb_2013_us_county_20m where GEOID='", geoid,"'", sep="")
    res    <- RPostgres::dbSendQuery(conn, q2)
    county <- RPostgres::dbFetch(res)
    RPostgres::dbClearResult(res)
    q3     <- base::paste("select NAME from cb_2013_us_state_20m where GEOID='", substr(geoid, 1, 2),"'", sep="")
    res    <- RPostgres::dbSendQuery(conn, q3)
    state  <- RPostgres::dbFetch(res)
    RPostgres::dbClearResult(res)
    tableName <- base::paste(state, "_",county,"_",geoid,"_ws_data_span_",span,"_na_",sep="")

  }# endIF/ELSE

  type      <- tolower(  type  )
  tableName <- gsub(" ", "_", tableName)
  tableName <- tolower( tableName ) 
  tableName <- base::paste( tableName, type, sep="")      

  cat("Creating Postgres weather information table \t\t\t\t")
  
  if (RPostgres::dbExistsTable(conn, tableName)) {

    msg <- base::paste("Done - Table ", tableName, " exists.\t\t\t\t\n", sep="")
    cat(msg)
    cat("Check Postgres table!\n")
    RPostgres::dbDisconnect(conn)
    return(tableName)
    
  } else {
    
    q1      <- base::paste("select name as id, mindate as mindate, maxdate as maxdate, longitude as longitude, latitude as latitude from ", ws_metadata, sep="")
    res     <- RPostgres::dbSendQuery(conn, q1)
    station <- data.frame(RPostgres::dbFetch(res))
    RPostgres::dbClearResult(res)
    minDate <- base::as.Date( lubridate::ymd( station$mindate[1] ) )
    maxDate <- base::as.Date( lubridate::ymd( station$maxdate[1] ) )
	  
    # One is end in a new year.  
    ndays <- ( lubridate::int_length( lubridate::interval( base::as.Date( minDate ) , base::as.Date( maxDate )))/(3600*24))+1 
    dates <- as.character( seq.Date( minDate , by ="days", length.out= ndays) )

    # The NOAA API only allows queries from users to retrieve data sets by year; 
    # thus, this code section limits each query to a different year for each iteration.  
    # WARNING: There is 500 limit request per day with the current token 
	
    for (i in 1:nrow( station ) ) {
	
      ws	 <- station$id[i]
      # startDate
      sDate	 <- minDate
      # finishDate                 
      fDate	 <- minDate
	    
      # Used for testing Comment this out after testing ( maybe deprecated)        
      # year( sDate )	<- year( maxDate ) - 5   
      # year( fDate )	<- year( maxDate ) - 5
	    
      lubridate::year( fDate ) <- lubridate::year( fDate ) + 1    #Interval (sDate, fDate) is one year.

      # Temporal strings attached to the dates for consistency with NOAA 	    
      ssDate <- base::paste(sDate, "T00:00:00", sep="")
      ffDate <- base::paste(fDate, "T00:00:00", sep="")
      estacion <- base::paste("GHCND:", ws, sep="")
	  
      weatherVar <- rnoaa::ncdc(datasetid=ghcnd, stationid=estacion, datatypeid=type, startdate=ssDate, enddate=ffDate , limit=366, token=config$token)

      # Verify that available information exist
      if (length(as.character( weatherVar$meta$totalCount)) == 0 ) {
        msg <- base::paste("Not Data Available for station ", estacion, " for variable type:  ", type, ".", sep="" )
        cat( msg )
        colnames( valores ) <- station$id[1:(i-1)]
        RPostgres::dbWriteTable( conn, tableName, as.data.frame( valores ) )
        RPostgres::dbDisconnect( conn )
        return(tableName) 
      }#endIF

      weatherYear<- weatherVar$data

	    repeat
	    {              

	      # Replaces starting year, year( sDate ), with a value of new time series
	      lubridate::year( sDate )  <- lubridate::year( sDate )  + 1

	      # Replaces starting day, day( sDate ), with a value of new time
	      lubridate::day(  sDate )  <- lubridate::day(  fDate )  + 1    

	      # Replaces ending year, year( fDate ), with a value of new time series
	      lubridate::year( fDate )  <- lubridate::year( fDate )  + 1

	      ssDate         <- base::paste(sDate, "T00:00:00", sep="")
	      ffDate         <- base::paste(fDate, "T00:00:00", sep="")
	           
	      if ( isTRUE( lubridate::year( fDate ) < lubridate::year( maxDate ) ) ) {
	 
	         weatherVar<-rnoaa::ncdc(datasetid = ghcnd, stationid = estacion, datatypeid=type, startdate = ssDate, enddate= ffDate , limit=366, token=config$token)

	        #Verify that available information exist
      		if (length(as.character( weatherVar$meta$totalCount)) == 0 ) {
                  msg <- base::paste("Not Data Available for statino ", estacion, " for variable type:  ", type, ".\t\n", sep="" )
                  cat( msg )
        	  colnames( valores ) <- station$id[1:(i-1)]
	          RPostgres::dbWriteTable( conn, tableName, as.data.frame( valores ) )
	          RPostgres::dbDisconnect( conn )
	          return(tableName) 
                }#endIF

	        # Delay needed since NOAA service accepts a maximum of 5 request per second.
	        Sys.sleep(0.2)											   	    
	        weatherYear <- rbind(weatherYear, weatherVar$data)
	 
	      } else {
	      
	        lubridate::month( fDate ) <- lubridate::month( maxDate ) 
	        lubridate::day(   fDate ) <- lubridate::day(   maxDate ) 
	        ssDate         <- base::paste( sDate, "T00:00:00", sep="")
	        ffDate         <- base::paste( fDate, "T00:00:00", sep="")                      
	      
	        weatherVar<-rnoaa::ncdc(datasetid = ghcnd, stationid = estacion, datatypeid=type, startdate = ssDate, enddate= ffDate , limit=366, token=config$token)

	        # Verify that available information exist
      		if (length(as.character( weatherVar$meta$totalCount)) == 0 ) {
                  msg <- base::paste("Not Data Available for station ", estacion, " for variable type:  ", type, ".\t\n", sep="" )
                  cat( msg )
        	  colnames( valores ) <- station$id[1:(i-1)]
	          RPostgres::dbWriteTable( conn, tableName, as.data.frame( valores ) )
	          RPostgres::dbDisconnect( conn )
	          return(tableName)
                }# endIF
	        weatherYear    <- rbind( weatherYear, weatherVar$data)      
	        break
	      }# endIF/ELSE	      
	    }# endRepeat

	    # TMAX = Maximum temperature (tenths of degrees C)	    
	    # promedio <- sprintf( "%.4f", mean( weatherYear$value/10 ) )     
	    
	    # Return year mean value  
	    # print( promedio )                                               

	    # NOAA specification of available data within a range often is not complete. 
	    # Individual dates are missing even within an specified range.
	    # SOLUTIONS:  
	    #  1) Average from the available number of samples.  
	    #  2) Interpolation when possible

	
	    if ( i == 1) {
	    
	      intervalAsDate     <-  base::as.Date( weatherYear$date ) 
              # TMAX = Maximum temperature (tenths of degrees C)
	      intervalAsValue    <- sprintf("%.4f", weatherYear$value/10)
	      stationDataHash    <- hash::hash( intervalAsDate , intervalAsValue )               
	      allDatesHash       <- hash::hash( dates ,  rep(1, ndays ) )         # h1 <- hash( dates , rep(1, ndays ) )   
	      hash::values( allDatesHash, keys=intervalAsDate ) <- intervalAsValue
	      v1.df              <- as.data.frame( hash::has.key( dates , stationDataHash ) )
	      colnames(v1.df)    <- "value"
	      # Section that guarantees that v1.df has at least one value
	      if ( length( which( v1.df$value == FALSE) ) > 0 ) {            
	      
	        sub_v1.df       <- as.data.frame( subset( v1.df, value == FALSE ) )

		# Dates from station which values do not exist in allDatasHash
	        dateKeys        <- as.character( rownames( sub_v1.df ) )  #print(dateKeys)
	        sizeSubV1       <- nrow( sub_v1.df )
	        
	        # Main change from conn_rnooa_postgresql_writeOGR_locs_and_weatherData_v2 (v2)
		# is the following line. We replace average for Interpolated values.
	        # In v2 the vecPromedio variable hold the average value of the available data.
		# Currently as can be observed, it is replaced by NA that is replaced by an empty
		# cell when exported to a database with column holding a variable of type double
	        vecPromedio     <- rep( NA, sizeSubV1  )	
	        
		#Update allDatesHash values found in 'dataKeys' with NA 
	        hash::values( allDatesHash, keys = dateKeys) <- vecPromedio
	                 
	        valores <- as.data.frame( hash::values( allDatesHash, keys=NULL ) )

	      } else { valores  <- data.frame( intervalAsValue )  }#endIF/ELSE	      
	    } else {
	    
	      intervalAsDate     <-  base::as.Date( weatherYear$date ) 

	      # TMAX = Maximum temperature (tenths of degrees C)
	      intervalAsValue    <- sprintf( "%.4f", weatherYear$value/10 ) 
	      stationDataHash    <- hash::hash( intervalAsDate , intervalAsValue )
	      hash::values( allDatesHash, keys=dates) <- rep(1, ndays )	                
	      hash::values( allDatesHash, keys=intervalAsDate) <- intervalAsValue
	      v1.df              <- as.data.frame( hash::has.key(  dates , stationDataHash ) ) 	      
	      colnames(v1.df)    <- "value"
	      
	      # Section guarantees that v1.df has at least one value
	      if ( length( which( v1.df$value == FALSE) ) > 0 ) {            

	        sub_v1.df        <- as.data.frame( subset( v1.df, value == FALSE) )
	        dateKeys         <- as.character( rownames( sub_v1.df ) )  #print(dateKeys)
	        sizeSubV1        <- nrow( sub_v1.df)
	        
	        # Main change from v2 is this line. We replace average for Interpolated values.
	        # In v2 the vecPromedio the current NA values were replaced instead by an average value.
	        vecPromedio      <- rep( NA, sizeSubV1  )
	        hash::values( allDatesHash, keys=dateKeys) <- vecPromedio
	        subValores       <- as.data.frame( hash::values( allDatesHash, keys=NULL ) )            
	        valores          <- as.data.frame( cbind( valores, subValores ) )	        

	      } else { valores     <- as.data.frame( cbind( valores, intervalAsValue ) )  }#endIF/ELSE
	    }# endIF/ELSE
            rm( stationDataHash )	
	  }# endFOR   (stick time series of different stations together)		  
	  colnames( valores ) <- station$id
          msg <- base::paste("Creating table ", tableName, ".\t\t\t\t\t", sep="")
          cat(msg)
          RPostgres::dbWriteTable( conn, tableName, as.data.frame( valores ) )
          Sys.sleep(4)
          cat("Check Postgres table!")	  
  }# endIF/ELSE
   
  RPostgres::dbDisconnect( conn )
  return(tableName) 

}
rabravo/ws2pgdb documentation built on May 26, 2019, 8:51 p.m.