get_weather <- function(location_ids,
date_from,
date_to,
weather_vars,
weather_sources = c("era5"),
read_weather_filename = NULL,
save_weather_filename = NULL,
n_per_location = 4,
years = seq(2015, 2020),
years_force_refresh = NULL,
add_pbl = T,
add_sunshine = T,
add_fire = F,
fire_source = "viirs",
fire_mode = "oriented",
fire_buffer_km = NULL,
fire_split_days = F,
fire_split_regions = NULL,
fire_split_regions_res = "low",
trajs_parallel = T,
trajs_cores = parallel::detectCores() - 1,
trajs_height = NULL,
trajs_hours = seq(0, 23, 4),
trajs_duration_hour = 72,
trajs_met_type = "gdas1",
use_trajs_cache = T,
use_weather_cache = T,
upload_trajs = F,
upload_weather = F,
update_era5 = T,
save_trajs_filename = NULL) {
if (!is.null(read_weather_filename) && file.exists(read_weather_filename)) {
weather <- read_weather(read_weather_filename)
} else {
weather <- collect_weather(
location_ids = location_ids,
date_from = date_from,
date_to = date_to,
weather_vars = weather_vars,
weather_sources = weather_sources,
years = years,
years_force_refresh = years_force_refresh,
n_per_location = n_per_location,
add_sunshine = F,
add_fire = add_fire,
fire_source = fire_source,
fire_mode = fire_mode,
fire_buffer_km = fire_buffer_km,
fire_split_days = fire_split_days,
fire_split_regions = fire_split_regions,
fire_split_regions_res = fire_split_regions_res,
trajs_parallel = trajs_parallel,
trajs_cores = trajs_cores,
trajs_height = trajs_height,
trajs_duration_hour = trajs_duration_hour,
trajs_hours = trajs_hours,
trajs_met_type = trajs_met_type,
use_trajs_cache = use_trajs_cache,
use_weather_cache = use_weather_cache,
upload_trajs = upload_trajs,
upload_weather = upload_weather,
save_trajs_filename = save_trajs_filename,
update_era5 = update_era5
)
if (!is.null(save_weather_filename)) {
dir.create(dirname(save_weather_filename), recursive = T, showWarnings = F)
saveRDS(weather, save_weather_filename)
}
}
return(weather)
}
#' Collect weather (and atmospheric) data from various sources
#'
#' @param location_ids
#' @param n_per_station how many NOAA stations do we fetch per AQ measurement station (default 2)
#' @param years which years are we getting measurements from
#' @param years_force_refresh ignoring cache files for these years
#' @param add_pbl Adding planet boundary layer (not ready)
#' @param add_sunshine Adding sunshine information
#' @param trajs_height receptor height for trajectories in meter.
#' If null, pbl average will be considered.
#' @return
#' @export
#'
#' @examples
collect_weather <- function(location_ids,
date_from,
date_to,
weather_vars,
weather_sources = c("era5", "noaa"),
n_per_location = 4,
years = seq(2015, 2020),
years_force_refresh = year(today()),
add_pbl = T,
add_sunshine = T,
add_fire = F,
fire_source = "viirs",
fire_mode = "trajectory",
fire_buffer_km = NULL,
fire_split_days = F,
fire_split_regions = NULL,
fire_split_regions_res = "low",
trajs_parallel = T,
trajs_cores = parallel::detectCores() - 1,
trajs_hours = seq(0, 23, 4),
trajs_duration_hour = 120,
trajs_height = NULL,
trajs_met_type = "gdas1",
use_trajs_cache = T,
use_weather_cache = T,
upload_trajs = F,
upload_weather = F,
save_trajs_filename = NULL,
update_era5 = T) {
if (is.null(location_ids)) {
stop("missing location_ids")
}
location_dates <- rcrea::locations(id = location_ids, with_source = F) %>%
distinct(location_id = id, country, geometry) %>%
mutate(date_from = date_from, date_to = date_to)
if (nrow(location_dates) == 0) {
stop("no valid location_id found")
}
weather_noaa <- NULL
weather_era5 <- NULL
weather_sirad <- NULL
if ("noaa" %in% weather_sources) {
weather_noaa <- noaa.collect_weather(
location_dates = location_dates,
weather_vars = weather_vars,
n_per_location = n_per_location,
years_force_refresh = years_force_refresh
)
}
if ("era5" %in% weather_sources) {
weather_era5 <- era5.collect_weather(
location_dates = location_dates,
weather_vars = weather_vars,
update = update_era5
)
}
if (add_sunshine){
weather_vars <- unique(c(weather_vars, "sunshine"))
weather_sources <- unique(c(weather_sources, "sirad"))
weather_sirad <- sirad.collect_weather(
location_dates = location_dates
)
}
weather <- combine_weathers(
weathers = list(
"noaa" = weather_noaa,
"era5" = weather_era5,
"sirad" = weather_sirad
),
weather_sources = weather_sources
)
# Add fire radiative power
if (add_fire) {
print("Getting Fire Radiative Power (will compute trajectories if required)")
weather <- fire.add_fire(weather,
source = fire_source,
mode = fire_mode,
duration_hour = trajs_duration_hour,
met_type = trajs_met_type,
buffer_km = fire_buffer_km,
trajs_hours = trajs_hours,
trajs_height = trajs_height,
trajs_parallel = trajs_parallel,
trajs_cores = trajs_cores,
split_days = fire_split_days,
split_regions = fire_split_regions,
split_regions_res = fire_split_regions_res,
use_trajs_cache = use_trajs_cache,
upload_trajs = upload_trajs,
upload_weather = upload_weather,
use_weather_cache = use_weather_cache,
save_trajs_filename = save_trajs_filename
)
}
return(weather)
}
available_weather_vars <- function(weather){
setdiff(names(ungroup(weather) %>% select(weather) %>% unnest(weather)), "date")
}
#' Combine weather from different sources by selecting
#' for each variable the first one in weather_sources
#' that has at least min_share_available available
#'
#' @param weathers
#' @param weather_sources
#' @param min_pct
#'
#' @return
#' @export
#'
#' @examples
combine_weathers <- function(
weathers,
weather_sources=names(weathers),
min_share_available = 0.5) {
w <- lapply(weather_sources, function(weather_source) {
w <- weathers[[weather_source]]
if (is.null(w)) {
return(NULL)
}
w %>%
select(location_id, weather) %>%
tidyr::unnest(weather) %>%
tidyr::gather(key = "weather_var", value = "value", -c(location_id, date)) %>%
mutate(source = weather_source)
}) %>%
bind_rows()
selected <- w %>%
ungroup() %>%
tidyr::complete(location_id, date, tidyr::nesting(weather_var, source),
fill=list(value=NA)) %>%
group_by(location_id, weather_var, source) %>%
summarise(n_nonna = sum(!is.na(value)),
n_total=n(),
share_available = n_nonna / n_total) %>%
# Arrange based on weather_sources
mutate(source = factor(source, levels = weather_sources, ordered=T)) %>%
# Pick first one above min_pct
filter(share_available >= min_share_available) %>%
select(-c(n_nonna, n_total, share_available)) %>%
arrange(source) %>%
dplyr::slice_head(n=1)
selected %>%
left_join(w) %>%
select(-c(source)) %>%
tidyr::spread(weather_var, value) %>%
group_by(location_id) %>%
tidyr::nest() %>%
rename(weather=data)
}
combine_meas_weather <- function(meas, weather) {
print("Attaching weather to measurements")
meas %>%
tibble() %>%
select(setdiff(names(.), "geometry")) %>%
dplyr::left_join(
weather %>%
tibble() %>%
select(location_id, weather),
by = "location_id"
) %>%
dplyr::rowwise() %>%
dplyr::filter(
!is.null(weather),
!is.null(meas)
) %>%
dplyr::mutate(meas = list(
meas %>%
dplyr::mutate(date = date(date)) %>%
dplyr::left_join(weather, by = "date")
)) %>%
dplyr::rename(meas_weather = meas) %>%
dplyr::select(-c(weather))
}
read_weather <- function(filename) {
readRDS(filename) %>%
select_at(setdiff(names(.), "value"))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.