knitr::opts_chunk$set( collapse = TRUE, comment = "#>" )
The randomization test p-value proposed by @alabadleh.etal21 is useful for estabilishing statistical significance of differences in air quality measures under minimal modeling assumptions. However, the computation of the p-value can be time-consumming for each of numerous pollutants, stations, years, months, etc.
The following R code provides an example of computing these p-values on multiple cores in parallel (e.g., over a server). Essentially this will consist of a for-loop over all combinations of pollutants, stations, years, and months in the pollutant_data
dataset. Each iteration of the for-loop is independent of the others, such that the whole loop is embarassingly parallelizable and thus can be easily run on parallel cores.
First we load the required packages:
# packages require(aq2020) require(tidyr) require(dplyr) require(lubridate) require(parallel)
In large-scale computations, it is best to save results after each iteration of the for-loop rather than wait for the entire loop to terminate. This way if anything goes wrong part way through we don't need to restart from scratch.
# where to save pvalue calculations data_path <- file.path("data", "pollutant", "pvalue") #' Helper function to standardize file names. #' #' @param path Path to the file. #' @param ... Elements of the file name, to be concatenated into a single string separated by "_". #' @param ext File extension. #' @return A string representing the file name. get_filename <- function(path, ..., ext = "rds") { fname <- paste0(c(...), collapse = "_") file.path(path, paste0(fname, ".", ext)) }
# true data path data_path <- system.file("extdata", "data", "pollutant", "pvalue", package = "aq2020")
Now let's create the functions to compute the absolute difference of medians and range of medians test statistics. For more information on these please see vignette("aq2020")
.
#' Calculate the absolute difference in medians statistic. #' #' @param value A vector of length `n_obs` of daily median data. #' @param group A grouping vector of length `n_obs` indicating whether the observation is pre or post lockdown data. Any vector for which `length(unique(group)) == 2` will work. #' @return The difference in medians statistic. #' @details When `length(unique(group)) = n_group < 2`, the function returns `NA`. This is so the p-value calculation doesn't crash when e.g., there is no data in one of the groups. When `n_group > 2` an error is thrown. med_diff <- function(value, group) { n_group <- length(unique(group)) if(n_group > 2) { stop("group must consist of at most 2 unique groups.") } if(n_group < 2) return(NA) meds <- tapply(value, group, median) dmeds <- abs(meds[1] - meds[2]) setNames(dmeds, nm = "med_diff") } #' Calculate the range between group medians statistic. #' #' @param value A vector of length `n_obs` of daily median data. #' @param group A grouping vector of length `n_obs` indicating whether the observation is pre or post lockdown data. Any vector for which `length(unique(group)) >= 2` will work. #' @return The range between group medians statistic. #' @details When `length(unique(group)) = n_group < 2`, the function returns `NA`. This is so the p-value calculation doesn't crash when e.g., there is no data in one of the groups. med_range <- function(value, group) { if(length(unique(group)) < 2) return(NA) meds <- tapply(value, group, median) rmeds <- max(meds) - min(meds) setNames(rmeds, nm = "med_range") }
Now we define the core function pval_calc()
which will be applied at each step of the for-loop to each combination of pollutants.
#' Compute the randomization p-value for each combination of station, pollutant and month. #' #' @param station Station name. #' @param pollutant Pollutant name. #' @param months Vector of integers between 1 and 12. #' @param no_wknd Logical, whether to exclude weekends. #' @param nsim Number of permutations to use in the Monte Carlo calculation. #' #' @return A tibble with columns: #' \describe{ #' \item{`Station`}{Station name.} #' \item{`Pollutant`}{Pollutant name.} #' \item{`Month`}{Month of the year.} #' \item{`Period`}{Either "2017-2019" or "2020".} #' \item{`Stat`}{Name of the computed statistic. These are: `Tobs`, `pval`, `med`, and `n`.} #' \item{`Value`}{Value of the computed statistic.} #' } pval_calc <- function(station, pollutant, months, no_wknd = TRUE, nsim) { # helper function for summarize() with tibbles set_tibble <- function(x, nm) { tibble(Value = setNames(x, NULL), Stat = nm) } message("Station: ", station, ", Pollutant: ", pollutant) # prepare data for p-values poll_data <- pollutant_data %>% filter(Station == station & Pollutant == pollutant) %>% mutate(Year = year(Date), Month = month(Date, label = TRUE, abbr = FALSE), Day = day(Date)) %>% filter(Month %in% month(months, label = TRUE, abbr = FALSE)) %>% filter(!is.na(Concentration)) if(no_wknd) { poll_data <- poll_data %>% mutate(Weekday = wday(Date, label = TRUE)) %>% filter(!Weekday %in% c("Sat", "Sun")) %>% select(-Weekday) } # pvalue calculations stat_names <- c("Tobs", "pval", "median", "n") tm <- system.time({ # 2017-2019 pv_ref <- poll_data %>% filter(Year != 2020) %>% group_by(Month) %>% summarize(set_tibble( x = c(fisher_pv(group = Year, value = Concentration, nsim = nsim, Tfun = med_range), median(Concentration), length(Concentration)), nm = stat_names), .groups = "drop") %>% mutate(Period = "2017-2019") # 2020 pv_2020 <- poll_data %>% mutate(Year = ifelse(Year == 2020, "2020", "2017-2019")) %>% group_by(Month) %>% summarize(set_tibble( x = c(fisher_pv(group = Year, value = Concentration, nsim = nsim, Tfun = med_diff), median(Concentration[Year == "2020"]), length(Concentration[Year == "2020"])), nm = stat_names), .groups = "drop") %>% mutate(Period = "2020") pv_data <- bind_rows(pv_ref, pv_2020) }) message("Time: ", round(tm[3], 1), " seconds") # append station and pollutant information pv_data <- pv_data %>% mutate(Station = station, Pollutant = pollutant) %>% select(Station, Pollutant, Month, Period, Stat, Value) }
Now let's check what pval_calc()
computes:
pv <- pval_calc(station = "Milton", pollutant = "O3", months = 11:12, no_wknd = TRUE, nsim = 100) pv %>% pivot_wider(names_from = "Stat", values_from = "Value")
As we can see, the output consists of the station, pollutant and months requested, the period, and four computed statistics. These are:
Tobs
: The value of the test statistic, which is difference in medians for the 2020 period and range of medians for the 2017-2019 period.pval
: The corresponding randomization p-value.median
: The median concentration value during either period.n
: The sample size in either period.Note that Tobs
and pval
in November 2017-2019 are NA
. Let's take a closer look at why this is:
pollutant_data %>% mutate(Year = factor(year(Date))) %>% filter( Station == "Milton", Pollutant == "O3", month(Date) == 11 ) %>% group_by(Year, .drop = FALSE) %>% summarize(n = n())
As we can see it is because there is no $\text{O}_3$ pollutant data for Milton in November of 2017 and 2018, such that the reference set 2017-2019 consists of only one year and thus the range-of-medians statistic becomes meaningless.
We are now ready to calculate the p-value for all station/pollutant/month combinations. Since pval_calc()
accepts a vector of months, the for-loop need only run over all r sum(pollutant_info$has_poll)
possible station/pollutant combinations:
# all station/pollutant combinations pollutant_info %>% filter(has_poll)
First let's set up our R session for parallel processing the calculations in the for-loop:
# parallel job specification nsim <- 1e4 # number of random permutations months <- 1:12 # months for which to calculate p-values no_wknd <- TRUE # exclude weekends # which station/pollutant combinations # from pollutant_info to run job_ind <- which(pollutant_info$has_poll)
# cluster setup # number of cores to use. # this uses all available cores, but a different number can be set. ncores <- detectCores(logical = FALSE) # parallel processing seed RNGkind("L'Ecuyer-CMRG") # create the parallel cluster cl <- makeCluster(spec = ncores) # load all packages on each cluster. invisible( clusterEvalQ(cl, { require(aq2020) require(tidyr) require(dplyr) require(lubridate) require(parallel) }) ) # copy all R objects required for the calculation onto each core. # it's often simplest to just copy everything in the workspace. clusterExport(cl, varlist = ls()[ls() != "cl"])
Now we're ready to run the p-value computation in parallel. This is done with the function parallel::parLapply()
. Unlike regular R for-loops, parLapply()
does the entire for-loop computation in a single function call. Therefore, if at step there is an error all previous computations are lost. Therefore, the parallel computation is set up so that:
i. The p-value calculation is saved at the end of each step.
ii. parLapply()
is instructed to flag an error at each step rather than throw one, using tryCatch()
. This way parLapply()
will never throw an error, but you can find out after the fact which steps of the computation failed by inspecting the contents of bad_ind
.
system.time({ bad_ind <- parLapply(cl, job_ind, fun = function(ii) { station <- pollutant_info$station[ii] pollutant <- pollutant_info$pollutant[ii] # calculate p-value in tryCatch block to catch/flag any errors pv_data <- tryCatch( pval_calc(station = station, pollutant = pollutant, months = months, no_wknd = no_wknd, nsim = nsim), error = function(e) NULL ) # save data if(!is.null(pv_data)) { saveRDS(pv_data, file = get_filename(path = data_path, station, pollutant)) } # return TRUE if p-value computation failed and FALSE otherwise is.null(pv_data) }) })
Once the calculation is done, we can check whether any of the jobs failed:
# if you are in an interactive R session, you can simply do this: ## any(unlist(bad_ind)) # if you ran R as a batch job which exists after the computation is done, # you can regenerate the contents of bad_ind like this: bad_ind <- lapply(job_ind, function(ii) { station <- pollutant_info$station[ii] pollutant <- pollutant_info$pollutant[ii] pv_data <- tryCatch( readRDS(file = get_filename(path = data_path, station, pollutant)), error = function(e) NULL ) is.null(pv_data) }) any(unlist(bad_ind))
If the result is other than FALSE
, you can rerun parLapply()
over the jobs in which(unlist(bad_ind))
.
Finally, when the parallel computation is done you must shut down the cluster:
stopCluster(cl)
The computation above were saved in separate files for each station/pollutant combination. The following code puts these together into a single dataset containing the following columns:
Station
: Station name.Pollutant
: Pollutant name.Period
: 2017-2019 or 2020.Month
: Name of the month.Ndays
: Number of days in the given Month and Period.Median
: Median concentration over all days in the given Month and Period.Pval
: P-value of the randomization test.pollutant_pval <- lapply(job_ind, function(ii) { station <- pollutant_info$station[ii] pollutant <- pollutant_info$pollutant[ii] pv_data <- readRDS(file = get_filename(path = data_path, station, pollutant)) pv_data %>% pivot_wider(names_from = "Stat", values_from = "Value") %>% select(Station, Pollutant, Period, Month, Ndays = n, Median = median, Pval = pval) }) %>% bind_rows()
This dataset is available in the aq2020 dataset as pollutant_pval
:
aq2020::pollutant_pval # dataset inside the aq2020 package
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.