#--- batch p-value calculations ------------------------------------------------
use_server <- FALSE
if(!use_server) {
# can't install package on mfcf server because of rvest dependency
require(aq2020)
} else {
.libPaths("/u/mlysy/R/x86_64-pc-linux-gnu-library/3.6")
source("../../R/fisher_pv.R")
source("../../R/Tdiff.R")
source("../../R/Tvar.R")
source("../../R/Trange.R")
load("../../data/pollutant_data.rda")
load("../../data/pollutant_info.rda")
}
require(tidyr)
require(dplyr)
require(lubridate)
# where to save pvalue calculations
data_path <- file.path("data", "pollutant", "pvalue")
# helper function to create file names for saving data.
get_filename <- function(path, ..., ext = "rds") {
fname <- paste0(c(...), collapse = "_")
file.path(path, paste0(fname, ".", ext))
}
# Test statistics for two-group comparison: 2017-2019 vs 2020
Tbin <- function(value, group) {
if(n_distinct(group) < 2) return(rep(NA, 4))
c(Tdiff(value, group), Trange(value, group))
}
# Test statistics for multi-group comparisons: 2017 vs 2018 vs 2019
Tmulti <- function(value, group) {
if(n_distinct(group) < 2) return(rep(NA, 4))
c(Tvar(value, group), Trange(value, group))
}
# helper function for summarizing
set_tibble <- function(x, nm) {
tibble(Value = setNames(x, NULL),
Name = nm)
}
#' P-value calculation.
#'
#' @param station Station name.
#' @param pollutant Pollutant name.
#' @param months Vector of integers between 1 and 12.
#' @param no_wknd Logical, whether to exclude weekends.
#' @param nsim Integer number of simulations.
#'
#' @return A tibble with columns:
#' \describe{
#' \item{`Month`}{Month of the year.}
#' \item{`Value`}{Value of the computed statistic.}
#' \item{`Name`}{Name of the computed statistic. These are: `{Tobs/pval}_{mean/med}_{old/new}`, `stat_{mean/med}`, and `n`.}
#' \item{`Period`}{Either "2017-2019" or "2020".}
#' \item{`Station`}{Station name.}
#' \item{`Pollutant`}{Pollutant name.}
#' }
pval_calc <- function(station, pollutant, months, no_wknd = TRUE, nsim) {
message("Station: ", station, ", Pollutant: ", pollutant)
# prepare data for p-values
poll_data <- pollutant_data %>%
filter(Station == station & Pollutant == pollutant) %>%
mutate(Year = year(Date),
Month = month(Date, label = TRUE, abbr = FALSE),
Day = day(Date)) %>%
filter(Month %in% month(months, label = TRUE, abbr = FALSE)) %>%
filter(!is.na(Concentration))
if(no_wknd) {
poll_data <- poll_data %>%
mutate(Weekday = wday(Date, label = TRUE)) %>%
filter(!Weekday %in% c("Sat", "Sun")) %>%
select(-Weekday)
}
# pvalue calculations
stat_names <- c("Tobs_mean_old", "Tobs_med_old",
"Tobs_mean_new", "Tobs_med_new",
"pval_mean_old", "pval_med_old",
"pval_mean_new", "pval_med_new",
"stat_mean", "stat_med", "n")
tm <- system.time({
# 2017-2019
pv_ref <- poll_data %>%
filter(Year != 2020) %>%
group_by(Month) %>%
summarize(set_tibble(
x = c(fisher_pv(group = Year,
value = Concentration,
nsim = nsim,
Tfun = Tmulti),
mean(Concentration, na.rm = TRUE),
median(Concentration, na.rm = TRUE),
sum(!is.na(Concentration))),
nm = stat_names),
.groups = "drop") %>%
mutate(Period = "2017-2019")
# 2020
pv_2020 <- poll_data %>%
mutate(Year = ifelse(Year == 2020, "2020", "2017-2019")) %>%
group_by(Month) %>%
summarize(set_tibble(
x = c(fisher_pv(group = Year,
value = Concentration,
nsim = nsim,
Tfun = Tbin),
mean(Concentration[Year == "2020"], na.rm = TRUE),
median(Concentration[Year == "2020"], na.rm = TRUE),
sum(!is.na(Concentration[Year == "2020"]))),
nm = stat_names),
.groups = "drop") %>%
mutate(Period = "2020")
pv_data <- bind_rows(pv_ref, pv_2020)
})
message("Time: ", round(tm[3], 1), " seconds")
# append station and pollutant information
pv_data <- pv_data %>%
mutate(Station = station,
Pollutant = pollutant)
pv_data
}
# test
pv <- pval_calc(station = "Milton", pollutant = "O3",
months = 12, no_wknd = TRUE, nsim = 100)
pv
pv %>% pivot_wider(names_from = "Name", values_from = "Value")
require(parallel)
ncores <- detectCores(logical = FALSE) # number of cores to use
RNGkind("L'Ecuyer-CMRG") # parallel processing seed
cl <- makeCluster(spec = ncores)
nsim <- 1e4 # number of random permutations
months <- 1:12 # months for which to calculate p-values
no_wknd <- TRUE # exclude weekends
data_path <- file.path("data", "pollutant", "pvalue") # where to save data
# which station/pollutant combinations
# from pollutant_info to run
job_ind <- which(pollutant_info$has_poll)
# cluster setup
# load packages
if(!use_server) {
invisible(
clusterEvalQ(cl, {
require(aq2020)
})
)
} else {
invisible(
clusterEvalQ(cl, {
.libPaths("/u/mlysy/R/x86_64-pc-linux-gnu-library/3.6")
})
)
}
invisible(
clusterEvalQ(cl, {
require(tidyr)
require(dplyr)
require(lubridate)
})
)
# send workspace to each worker
clusterExport(cl, varlist = ls()[ls() != "cl"])
system.time({
bad_ind <- parLapply(cl, job_ind, fun = function(ii) {
station <- pollutant_info$station[ii]
pollutant <- pollutant_info$pollutant[ii]
## pv_data <- pval_calc(station = station, pollutant = pollutant,
## months = months, no_wknd = no_wknd, nsim = nsim)
pv_data <- tryCatch(
pval_calc(station = station, pollutant = pollutant,
months = months, no_wknd = no_wknd, nsim = nsim),
error = function(e) NULL
)
if(!is.null(pv_data)) {
saveRDS(pv_data,
file = get_filename(path = data_path,
station, pollutant))
}
is.null(pv_data)
})
})
# kill cluster
stopCluster(cl)
#--- check if there were any issues --------------------------------------------
bad_ind <- lapply(job_ind, function(ii) {
station <- pollutant_info$station[ii]
pollutant <- pollutant_info$pollutant[ii]
pv_data <- tryCatch(
readRDS(file = get_filename(path = data_path,
station, pollutant)),
error = function(e) NULL
)
is.null(pv_data)
})
any(unlist(bad_ind))
#--- combine datasets ----------------------------------------------------------
#' The dataset will contain the following columns:
#'
#' - Station: station name.
#' - Pollutant: pollutant name.
#' - Period: 2017-2019 or 2020.
#' - Month: name of the month.
#' - `Ndays`: number of days in the given Month and Period.
#' - Median concentration over all days in the given Month and Period.
#' - `Pval`: p-value of the randomization test.
pollutant_pval <- lapply(job_ind, function(ii) {
station <- pollutant_info$station[ii]
pollutant <- pollutant_info$pollutant[ii]
pv_data <- readRDS(file = get_filename(path = data_path,
station, pollutant))
pv_data %>%
pivot_wider(names_from = "Name", values_from = "Value") %>%
select(Station, Pollutant, Period, Month,
Ndays = n, Median = stat_med, Pval = pval_med_new)
}) %>% bind_rows()
#--- check boxplot -------------------------------------------------------------
poll_label <- c(O3 = "O[3]*' Concentration '*(ppb)",
PM25 = "PM[2.5]*' Concentration '*(mu*g/m^3)",
NO2 = "NO[2]*' Concentration '*(ppb)",
SO2 = "SO[2]*' Concentration '*(ppb)",
CO = "CO*' Concentration '*(ppm)")
poll_data %>%
mutate(Year = factor(Year),
Period = ifelse(Year == 2020, "2020", "2017-2019")) %>%
ggplot(aes(x = Month, y = Concentration)) +
geom_jitter(aes(fill = Year, group = Period),
pch = 21, position = position_jitterdodge(.5),
size = 2) +
geom_boxplot(aes(color = Period), outlier.shape = NA, alpha = 0) +
geom_text(mapping = aes(x = Month, group = Period,
label = signif(pval*100, 2), y = 0),
data = pv_data, position = position_dodge(.8),
size = 4) +
scale_fill_manual(values = c("red", "blue", "orange", "darkgreen")) +
ylab(parse(text = poll_label[pollutant])) +
ggtitle(paste0("Station: ", station)) +
multiplot::theme_min2()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.