R/pat_createPATimeseriesObject.R

Defines functions pat_createPATimeseriesObject

Documented in pat_createPATimeseriesObject

#' @export
#' @importFrom rlang .data
#' @importFrom MazamaCoreUtils logger.debug
#' @importFrom tidyselect all_of
#' 
#' @title Combine PurpleAir raw dataframes
#' 
#' @param pat_rawList List of dataframes as returned by \code{pat_downloadParseRawData()}.
#' 
#' @return A PurpleAir Timeseries \emph{pat} object.
#' 
#' @description The \code{pat_downloadParseRawData()} function returns four
#' dataframes of data from ThingSpeak. These must be combined into the single
#' \code{data} dataframe found in a 'pat' object. This process involves selecting
#' data columns to use and bringing all data onto a unified time axis.
#' 
#' Two sets of data values exist in the raw data, one for each of two algorithms
#' that convert particle counts into aerosol density.
#' 
#' PurpleAir has the following description:
#' 
#' \emph{
#' The CF_ATM and CF_1 values are calculated from the particle count data with a 
#' proprietary algorithm developed by the PMS5003 laser counter manufacturer, 
#' PlanTower. The specifics of the calculation are not available to the public 
#' (or us for that matter). However, to convert the particle count data (um/dl) 
#' to a mass concentration (ug/m3) they must use an average particle density. 
#' They do provide 2 different mass concentration conversion options; CF_1 uses 
#' the "average particle density" for indoor particulate matter and CF_ATM uses 
#' the "average particle density" for outdoor particulate matter.
#' }
#' 
#' The \pkg{AirSensor} package and all associated archive data use PlanTower
#' algorithm \code{CF_ATM}. 
#' 
#' @references \url{https://www2.purpleair.com/community/faq#!hc-what-is-the-difference-between-cf-1-and-cf-atm}
#' 
#' @examples
#' \donttest{
#' library(AirSensor)
#' 
#' setArchiveBaseUrl("http://data.mazamascience.com/PurpleAir/v1")
#' 
#' pas <- pas_load()
#' 
#' pat_rawList <- pat_downloadParseRawData(
#'   id = "78df3c292c8448f7_21257",
#'   pas = pas
#' )
#' 
#' pat <- pat_createPATimeseriesObject(pat_rawList)
#' 
#' pat_multiPlot(pat)
#' }

pat_createPATimeseriesObject <- function(
  pat_rawList = NULL
) {
  
  # ----- Validate parameters --------------------------------------------------
  
  MazamaCoreUtils::stopIfNull(pat_rawList)
  
  meta <- pat_rawList$meta
  A_PRIMARY <- pat_rawList$A_PRIMARY
  A_SECONDARY <- pat_rawList$A_SECONDARY
  B_PRIMARY <- pat_rawList$B_PRIMARY
  B_SECONDARY <- pat_rawList$B_SECONDARY
  
  MazamaCoreUtils::stopIfNull(meta)
  MazamaCoreUtils::stopIfNull(A_PRIMARY)
  MazamaCoreUtils::stopIfNull(A_SECONDARY)
  MazamaCoreUtils::stopIfNull(B_PRIMARY)
  MazamaCoreUtils::stopIfNull(B_SECONDARY)
  
  # ----- Simplify meta --------------------------------------------------------
  
  # On 2020-09-15
  # > names(pat_rawList$meta)
  #  [1] "ID"                               "label"                           
  #  [3] "DEVICE_LOCATIONTYPE"              "THINGSPEAK_PRIMARY_ID"           
  #  [5] "THINGSPEAK_PRIMARY_ID_READ_KEY"   "THINGSPEAK_SECONDARY_ID"         
  #  [7] "THINGSPEAK_SECONDARY_ID_READ_KEY" "latitude"                        
  #  [9] "longitude"                        "pm25"                            
  # [11] "lastSeenDate"                     "sensorType"                      
  # [13] "flag_hidden"                      "isOwner"                         
  # [15] "humidity"                         "temperature"                     
  # [17] "pressure"                         "age"                             
  # [19] "parentID"                         "flag_highValue"                  
  # [21] "flag_attenuation_hardware"        "Ozone1"                          
  # [23] "pm25_current"                     "pm25_10min"                      
  # [25] "pm25_30min"                       "pm25_1hr"                        
  # [27] "pm25_6hr"                         "pm25_1day"                       
  # [29] "pm25_1week"                       "statsLastModifiedDate"           
  # [31] "statsLastModifiedInterval"        "countryCode"                     
  # [33] "stateCode"                        "timezone"                        
  # [35] "deviceID"                         "locationID"                      
  # [37] "deviceDeploymentID"               "airDistrict"                     
  # [39] "pwfsl_closestDistance"            "pwfsl_closestMonitorID"          
  # [41] "sensorManufacturer"               "targetPollutant"                 
  # [43] "technologyType"                   "communityRegion"              
  
  meta <- 
    pat_rawList$meta %>%
    dplyr::filter(is.na(.data$parentID)) %>%
    dplyr::select(.data$ID, 
                  .data$label, 
                  .data$sensorType, 
                  .data$DEVICE_LOCATIONTYPE, 
                  .data$THINGSPEAK_PRIMARY_ID, 
                  .data$THINGSPEAK_PRIMARY_ID_READ_KEY, 
                  .data$THINGSPEAK_SECONDARY_ID, 
                  .data$THINGSPEAK_SECONDARY_ID_READ_KEY, 
                  .data$longitude, 
                  .data$latitude, 
                  .data$countryCode, 
                  .data$stateCode, 
                  .data$timezone, 
                  .data$deviceID, 
                  .data$locationID, 
                  .data$deviceDeploymentID, 
                  .data$pwfsl_closestDistance, 
                  .data$pwfsl_closestMonitorID,
                  .data$sensorManufacturer,
                  .data$targetPollutant,
                  .data$technologyType,
                  .data$communityRegion)

  # Guarantee that 'ID' and 'deviceID' fields are <character> as opposed to <int>
  meta$ID <- as.character(meta$ID)
  meta$deviceID <- as.character(meta$deviceID)
  
  # ----- Create A and B channels ----------------------------------------------

  # NOTE:  Here is the structure of the raw data:
  
  # > names(A_PRIMARY)
  # [1] "created_at"  "entry_id"    "pm1.0_cf1"   "pm2.5_cf1"   "pm10.0_cf1" 
  # [6] "uptime"      "rssi"        "temperature" "humidity"    "pm2.5_atm"  
  # > names(A_SECONDARY)
  # [1] "created_at"  "entry_id"    "counts_0.3"  "counts_0.5"  "counts_1.0" 
  # [6] "counts_2.5"  "counts_5.0"  "counts_10.0" "pm1.0_atm"   "pm10.0_atm" 
  # > names(B_PRIMARY)
  # [1] "created_at" "entry_id"   "pm1.0_cf1"  "pm2.5_cf1"  "pm10.0_cf1"     
  # [6] "memory"     "adc0"       "pressure"   "bsec_iaq"   "pm2.5_atm" 
  # > names(B_SECONDARY)
  # [1] "created_at"  "entry_id"    "counts_0.3"  "counts_0.5"  "counts_1.0" 
  # [6] "counts_2.5"  "counts_5.0"  "counts_10.0" "pm1.0_atm"   "pm10.0_atm" 
  
  # NOTE:  Here is the structure of the data we wish to have in the end:
  
  # TODO:  Update this documentation to reflect new columns
  # > names(example_pat$data)
  # [1] "datetime"    "pm25_A"      "pm25_B"      "pm1_atm_A"   "pm1_atm_B"  
  # [6] "pm25_atm_A"  "pm25_atm_B"  "pm10_atm_A"  "pm10_atm_B"  "temperature"
  # [11] "humidity"    "uptime"      "adc0"        "rssi"        "datetime_A" 
  # [16] "datetime_B" 
  
  # NOTE:  When dropouts occur, you will not always have the same number of
  # NOTE:  rows in each dataframe so we cannot use dplyr::bind_rows(). Instead,
  # NOTE:  we first create a new "datetime" column on a 1-minute time axis and
  # NOTE:  then use dplyr::full_join() to create a combined dataframe that has
  # NOTE:  some missing values.
  
  # ----- A channel ------------------------------------------------------------
  
  A_PRIMARY_columns <- c(
    "datetime", "datetime_A",
    "pm25_A", "pm25_atm_A",
    "uptime", "rssi", "temperature", "humidity"
  )
  
  A_PRIMARY <-
    pat_rawList$A_PRIMARY %>%
    dplyr::mutate(
      datetime = lubridate::floor_date(.data$created_at, unit = "min"),
      datetime_A = .data$created_at,
      pm25_atm_A = .data$pm2.5_atm, 
      pm25_A = .data$pm2.5_atm
    ) %>%
    dplyr::arrange(.data$datetime) %>%
    dplyr::select(all_of(A_PRIMARY_columns)) %>%
    .replaceRecordsWithDuplicateTimestamps()
  
  A_SECONDARY_columns <- c(
    "datetime",
    "pm1_atm_A", "pm10_atm_A"
  )
  
  A_SECONDARY <-
    pat_rawList$A_SECONDARY %>%
    dplyr::mutate(
      datetime = lubridate::floor_date(.data$created_at, unit = "min"),
      pm1_atm_A = .data$pm1.0_atm, 
      pm10_atm_A = .data$pm10.0_atm
    ) %>%
    dplyr::arrange(.data$datetime) %>%
    dplyr::select(all_of(A_SECONDARY_columns)) %>%
    .replaceRecordsWithDuplicateTimestamps()
  
  A_data <- 
    dplyr::full_join(A_PRIMARY, A_SECONDARY, by = "datetime") %>%
    dplyr::distinct() %>%
    dplyr::arrange(.data$datetime) %>%
    .replaceRecordsWithDuplicateTimestamps()
  
  # NOTE:  The result is imperfect and the SECONDARY channel occasionally gets
  # NOTE:  assigned to the next 'datetime' minute as seen here:
  
  #   datetime            datetime_A          pm25_A pm25_atm_A uptime  rssi temperature humidity pm1_atm_A pm10_atm_A
  #   <dttm>              <dttm>               <dbl>      <dbl>  <int> <dbl>       <dbl>    <dbl>     <dbl>      <dbl>
  # 1 2018-08-02 14:39:00 2018-08-02 14:39:29   3.14       3.14   6306   -74          65       63      2.05       3.26
  # 2 2018-08-02 14:40:00 2018-08-02 14:40:49   3.21       3.21   6308   -74          65       63     NA         NA   
  # 3 2018-08-02 14:41:00 NA                   NA         NA        NA    NA          NA       NA      2.48       3.21
  # 4 2018-08-02 14:42:00 2018-08-02 14:42:09   2.12       2.12   6309   -74          65       63      1.3        2.12
  
  # NOTE:  We can take of this by separating our single dataframe into 
  # NOTE:  pieces, subtracting a minute from rows with SECONDARY data only and
  # NOTE:  then recombining.
  
  # Separate into records with full data, PRIMARY only or SECONDARY only
  
  PRIMARY_only_mask <- !is.na(A_data$datetime_A) & is.na(A_data$pm1_atm_A)
  SECONDARY_only_mask <- is.na(A_data$datetime_A) & !is.na(A_data$pm1_atm_A)
  full_mask <- !is.na(A_data$datetime_A) & !is.na(A_data$pm1_atm_A)
  
  fullRecords <- A_data[full_mask, ]
  
  primaryOnlyRecords <- A_data[PRIMARY_only_mask, A_PRIMARY_columns]

  secondaryOnlyRecords <- 
    A_data[SECONDARY_only_mask, A_SECONDARY_columns] %>%
    # Subtract one minute from secondary only so that 'datetime' will match up  
    dplyr::mutate(datetime = .data$datetime - lubridate::dminutes(1))

  # Create the repaired records by joining 
  
  repairedRecords <- 
    dplyr::full_join(primaryOnlyRecords, secondaryOnlyRecords, by = "datetime")  

  # Combine fullRecords and repairedRecords and arrange
  
  A_data <-
    dplyr::bind_rows(fullRecords, repairedRecords) %>%
    dplyr::distinct() %>%
    dplyr::arrange(.data$datetime) %>%
    .replaceRecordsWithDuplicateTimestamps()
  

  # ----- B channel ------------------------------------------------------------
  
  B_PRIMARY_columns <- c(
    "datetime", "datetime_B",
    "pm25_B", "pm25_atm_B",
    "memory", "adc0", "pressure", "bsec_iaq"
  )
  
  B_PRIMARY <-
    pat_rawList$B_PRIMARY %>%
    dplyr::mutate(
      datetime = lubridate::floor_date(.data$created_at, unit = "min"),
      datetime_B = .data$created_at,
      pm25_atm_B = .data$pm2.5_atm, 
      pm25_B = .data$pm2.5_atm
    ) %>%
    dplyr::arrange(.data$datetime) %>%
    dplyr::select(all_of(B_PRIMARY_columns)) %>%
    .replaceRecordsWithDuplicateTimestamps()
  
  B_SECONDARY_columns <- c(
    "datetime",
    "pm1_atm_B", "pm10_atm_B"
  )
  
  B_SECONDARY <-
    pat_rawList$B_SECONDARY %>%
    dplyr::mutate(
      datetime = lubridate::floor_date(.data$created_at, unit = "min"),
      pm1_atm_B = .data$pm1.0_atm, 
      pm10_atm_B = .data$pm10.0_atm
    ) %>%
    dplyr::arrange(.data$datetime) %>%
    dplyr::select(all_of(B_SECONDARY_columns)) %>%
    .replaceRecordsWithDuplicateTimestamps()
  
  B_data <- 
    dplyr::full_join(B_PRIMARY, B_SECONDARY, by = "datetime") %>%
    dplyr::distinct() %>%
    dplyr::arrange(.data$datetime) %>%
    .replaceRecordsWithDuplicateTimestamps()
  
  # NOTE:  The result is imperfect and the SECONDARY channel occasionally gets
  # NOTE:  assigned to the next 'datetime' minute as seen here:
  
  # # A tibble: 6 x 10
  #   datetime            datetime_B          pm25_B pm25_atm_B memory  adc0 pressure bsec_iaq pm1_atm_B pm10_atm_B
  #   <dttm>              <dttm>               <dbl>      <dbl>  <int> <dbl>    <dbl>    <dbl>     <dbl>      <dbl>
  # 1 2018-08-01 07:00:00 NA                   NA         NA        NA NA          NA       NA      3.77       5.93
  # 2 2018-08-01 07:01:00 2018-08-01 07:01:18   5.76       5.76  28568  0.06       NA       NA      4.41       6.12
  # 3 2018-08-01 07:02:00 2018-08-01 07:02:38   5.36       5.36  28568  0.06       NA       NA      3.43       5.62
  # 4 2018-08-01 07:03:00 2018-08-01 07:03:58   5.23       5.23  28568  0.06       NA       NA     NA         NA   
  # 5 2018-08-01 07:04:00 NA                   NA         NA        NA NA          NA       NA      3.48       5.77
  # 6 2018-08-01 07:05:00 2018-08-01 07:05:18   5.36       5.36  28568  0.06       NA       NA      4.27       5.57
 
  # NOTE:  We can take of this by separating our single dataframe into 
  # NOTE:  pieces, subtracting a minute from rows with SECONDARY data only and
  # NOTE:  then recombining.
  
  # Separate into records with full data, PRIMARY only or SECONDARY only
  
  PRIMARY_only_mask <- !is.na(B_data$datetime_B) & is.na(B_data$pm1_atm_B)
  SECONDARY_only_mask <- is.na(B_data$datetime_B) & !is.na(B_data$pm1_atm_B)
  full_mask <- !is.na(B_data$datetime_B) & !is.na(B_data$pm1_atm_B)
  
  fullRecords <- B_data[full_mask, ]
  
  primaryOnlyRecords <- B_data[PRIMARY_only_mask, B_PRIMARY_columns]
  
  secondaryOnlyRecords <- 
    B_data[SECONDARY_only_mask, B_SECONDARY_columns] %>%
    # Subtract one minute from secondary only so that 'datetime' will match up  
    dplyr::mutate(datetime = .data$datetime - lubridate::dminutes(1))
  
  # Create the repaired records by joining 
  
  repairedRecords <- 
    dplyr::full_join(primaryOnlyRecords, secondaryOnlyRecords, by = "datetime")  
  
  # Combine fullRecords and repairedRecords and arrange
  
  B_data <-
    dplyr::bind_rows(fullRecords, repairedRecords) %>%
    dplyr::distinct() %>%
    dplyr::arrange(.data$datetime) %>%
    .replaceRecordsWithDuplicateTimestamps()

  # ----- Combine A and B channels ---------------------------------------------
  
  # > names(A)
  # [1] "datetime"    "datetime_A"  "pm25_A"      "pm1_atm_A"   "pm25_atm_A" 
  # [6] "pm10_atm_A"  "uptime"      "rssi"        "temperature" "humidity"   
  # > names(B)
  # [1] "datetime"    "datetime_B"  "pm25_B"      "pm1_atm_B"   "pm25_atm_B"
  # [6] "pm10_atm_B"  "memory"      "adc0"        "pressure"    "bsec_iaq"    
  
  # NOTE:  Here are the columns we wish to have in the end in preferred order:
  # NOTE:  If we just take columns 1-6 we have a very useful dataframe.
  
  # NOTE:  This set of columns must match those defined in
  # NOTE:    pat_createPATimeseriesObject.R
  patData_columnNames <- c(
    "datetime", 
    "pm25_A", "pm25_B", 
    "temperature", "humidity", "pressure",
    "pm1_atm_A", "pm25_atm_A", "pm10_atm_A",
    "pm1_atm_B", "pm25_atm_B", "pm10_atm_B",
    "uptime", "rssi", "memory", "adc0", "bsec_iaq",
    "datetime_A", "datetime_B"
  )
  
  data <-
    dplyr::full_join(A_data, B_data, by = "datetime") %>%
    dplyr::select(all_of(patData_columnNames)) %>%
    # Only keep records with some pm25 data
    dplyr::filter(!is.na(.data$pm25_A) | !is.na(.data$pm25_B)) %>%
    dplyr::distinct() %>%
    dplyr::arrange(.data$datetime) %>%
    .replaceRecordsWithDuplicateTimestamps()
  
  # ----- Return ---------------------------------------------------------------
  
  # Combine meta and data dataframes into a list
  pat <- list(meta = meta, data = data)
  class(pat) <- c("pa_timeseries", class(pat))
  
  return(pat)
  
}

# ===== DEBUGGING ==============================================================

if ( FALSE ) {
  
  library(AirSensor)
  
  setArchiveBaseUrl("http://data.mazamascience.com/PurpleAir/v1") # SCAQMD sensors
  
  pas <- pas_load()
  
  id <- '78df3c292c8448f7_21257'
  label <- NULL
  startdate <- NULL
  enddate <- NULL
  timezone <- NULL
  baseUrl <- "https://api.thingspeak.com/channels/"
  
  pat_rawList <- pat_downloadParseRawData(
    id,
    label,
    pas,
    startdate,
    enddate,
    timezone,
    baseUrl
  )
 
  pat <- pat_createPATimeseriesObject(
    pat_rawList
  )
  
}

Try the AirSensor package in your browser

Any scripts or data that you put into this service are public.

AirSensor documentation built on March 13, 2021, 1:07 a.m.