R/add_pm_data.R

Defines functions add_pm get_unique_h3_3_year read_chunk_join prep_data

Documented in add_pm

#' @import data.table

prep_data <- function(d, type = "coords") {
  d$row_index <- 1:nrow(d)

  if (type == "coords") {
    dht::check_for_column(d, "lon", d$lon)
    dht::check_for_column(d, "lat", d$lat)
  }

  if (type == "h3") {
    dht::check_for_column(d, "h3", d$h3)
  }

  dht::check_for_column(d, "start_date", d$start_date)
  dht::check_for_column(d, "end_date", d$end_date)

  d$start_date <- dht::check_dates(d$start_date)
  d$end_date <- dht::check_dates(d$end_date)
  dht::check_end_after_start_date(d$start_date, d$end_date)
  return(d)
}

read_chunk_join <- function(d_split, fl_path, verbose = FALSE) {
  if (verbose) {
    message("processing ",
            stringr::str_split(fl_path, "/")[[1]][length(stringr::str_split(fl_path, "/")[[1]])],
            " ...")
  }

  chunk <- fst::read_fst(fl_path, as.data.table = TRUE)

  d_split_pm <- dplyr::left_join(d_split, chunk, by = c("h3", "date"))
  rm(chunk)
  return(d_split_pm)
}

get_unique_h3_3_year <- function(pm_chunks) {
  pm_chunks %>%
    dplyr::mutate(split_str = stringr::str_split(uri, "/")) %>%
    tidyr::unnest(split_str) %>%
    dplyr::group_by(uri) %>%
    dplyr::slice_tail() %>%
    dplyr::mutate(h3_3_year = stringr::str_sub(split_str, 1, -10))
}


#' add PM2.5 concentrations to geocoded data based on h3 geohash or lat/lon coords
#'
#' @param d dataframe with columns called "lat", "lon", "start_date" and "end_date"
#' @param type either "coords" (if d contains lat/lon) or "h3" (if d contains
#'             resolution 8 h3 ids)
#' @param verbose if TRUE a statement is printed to the console telling the user
#'                which chunk file is currently being processed. Defaults to FALSE.
#' @param ... arguments passed to \code{\link[s3]{s3_get_files}}
#'
#' @return the input dataframe, expanded to include one row per day between the given "start_date"
#'         and "end_date", with appended columns for h3_3 (resolution 3), h3 (resolution 8),
#'         year, pm_pred, and pm_se.
#'
#' @examples
#' d <- tibble::tribble(
#'   ~id, ~lat, ~lon, ~start_date, ~end_date,
#'   "55000100280", 39.2, -84.6, "2008-09-09", "2008-09-11",
#'   "55000100281", 39.2, -84.6, "2007-08-05", "2007-08-08",
#'   "55000100282", 39.2, -84.6, "2015-08-31", "2015-09-02"
#' )
#'
#' add_pm(d)
#' @export
add_pm <- function(d, type = "coords", verbose = FALSE, ...) {
  d <- prep_data(d, type)

  # dates - expand, extract year, filter out of range
  d <- dht::expand_dates(d, by = "day")
  d$year <- lubridate::year(d$date)
  out_of_range_year <- sum(d$year < 2000 | d$year > 2020)
  if (out_of_range_year > 0) {
    cli::cli_alert_warning("Data is currently available from 2000 through 2020.")
    cli::cli_alert_info(glue::glue("PM estimates for {out_of_range_year} rows will be NA due to unavailable data.\n"))
    d_missing_date <- dplyr::filter(d, !year %in% 2000:2020)
    d <- dplyr::filter(d, year %in% 2000:2020)
  }

  # coords - filter out missing, match to h3 ids
  if (type == "coords") {
    n_missing_coords <- nrow(d %>% dplyr::filter(is.na(lat) | is.na(lon)))
    if (n_missing_coords > 0) {
      cli::cli_alert_warning(glue::glue("PM estimates for {n_missing_coords} rows will be NA due to missing coordinates in input data.\n"))
      d_missing_coords <- dplyr::filter(d, is.na(lat) | is.na(lon))
      d <- dplyr::filter(d, !is.na(lat), !is.na(lon))
    }

    message("matching lat/lon to h3 cells...")
    d$h3 <- suppressMessages(h3jsr::point_to_h3(dplyr::select(d, lon, lat), res = 8))
  } else {
    n_missing_coords <- 0
  }

  # h3 res 3 - and match to safe harbor
  d$h3_3 <- h3jsr::get_parent(d$h3, res = 3)

  d <- d %>%
    dplyr::left_join(safe_hex_lookup, by = "h3_3") %>%
    dplyr::mutate(h3_3 = ifelse(!is.na(safe_hex), safe_hex, h3_3)) %>%
    dplyr::select(-safe_hex)

  # h3 availability  - check and filter
  ## h3 ids that do not exist within the scope of this model
  invalid_h3 <- sum(!d$h3_3 %in% safe_harbor_h3)
  if (invalid_h3 > 0) {
    cli::cli_alert_info(glue::glue("PM estimates for {invalid_h3} rows will be NA due to invalid h3 ids.\n"))
    d_invald_h3 <- dplyr::filter(d, !d$h3_3 %in% safe_harbor_h3)
    d <- dplyr::filter(d, d$h3_3 %in% safe_harbor_h3)
  }

  message("downloading PM chunk files...")
  pm_chunks <-
    glue::glue("s3://pm25-brokamp/{d$h3_3}_{d$year}_h3pm.fst") %>%
    unique() %>%
    s3::s3_get_files(...) %>%
    get_unique_h3_3_year()

  d_split <- split(d, f = list(d$h3_3, d$year), drop = TRUE)

  message("Reading in and joining PM data...")
  xs <- 1:length(d_split)
  progressr::with_progress({
    p <- progressr::progressor(along = xs)
    d_split_pm <- purrr::map(xs, function(x) {
      p(sprintf("x=%g", x))
      read_chunk_join(
        d_split[[x]],
        # ensure d_split chunk matches file path
        pm_chunks[pm_chunks$h3_3_year == paste0(unique(d_split[[x]]$h3_3), "_", unique(d_split[[x]]$year)), ]$file_path,
        verbose
      )
    })
  })

  d_pm <- dplyr::bind_rows(d_split_pm)
  if (out_of_range_year > 0) d_pm <- dplyr::bind_rows(d_missing_date, d_pm)
  if (invalid_h3 > 0) d_pm <- dplyr::bind_rows(d_invald_h3, d_pm)
  if (n_missing_coords > 0) d_pm <- dplyr::bind_rows(d_missing_coords, d_pm)
  d_pm <- d_pm %>%
    dplyr::arrange(row_index, date) %>%
    dplyr::select(-row_index)
  return(d_pm)
}
geomarker-io/addPmData documentation built on March 19, 2022, 4:18 p.m.