#' Read forecast and observations and verify.
#'
#' This is a wrapper for the verification process. Forecasts and observations
#' are read in, filtered down to common cases, errors checked, and a full
#' verification is done for all scores. To minimise memory usage, the
#' verification can be done for one lead time at time. It would also be possible
#' to parallelise the process using for example \link[parallel]{mclapply}, or
#' \link[furrr]{future_map}.
#'
#' @param start_date Start date to for the verification. Should be numeric or
#' character. YYYYMMDD(HH)(mm).
#' @param end_date End date for the verification. Should be numeric or
#' character.
#' @param parameter The parameter to verify.
#' @param fcst_model The forecast model(s) to verify. Can be a single string or
#' a character vector of model names.
#' @param fcst_path The path to the forecast FCTABLE files.
#' @param obs_path The path to the observation OBSTABLE files.
#' @param lead_time The lead times to verify.
#' @param num_iterations The number of iterations per verification calculation.
#' The default is to do the same number of iterations as there are lead times.
#' If a small number of iterations is set, it may be useful to set
#' \code{show_progress = TRUE}. The higher the number of iterations, the
#' smaller the amount of data that is held in memory at any one time.
#' @param verify_members Whether to verify the individual members of the
#' ensemble. Even if thresholds are supplied, only summary scores are
#' computed. If you wish to compute categorical scores, the separate
#' \link[harpPoint]{det_verify} function must be used.
#' @param thresholds The thresholds to compute categorical scores for.
#' @param members The members to retrieve if reading an EPS forecast. To select
#' the same members for all forecast models, this should be a numeric vector.
#' For specific members from specific models a named list with each element
#' having the name of the forecast model and containing a a numeric vector.
#' e.g. \cr \code{members = list(eps_model1 = seq(0, 3), eps_model2 = c(2,
#' 3))}. \cr For multi model ensembles, each element of this named list should
#' contain another named list with sub model name followed by the desired
#' members, e.g. \cr \code{members = list(eps_model1 = list(sub_model1 =
#' seq(0, 3), sub_model2 = c(2, 3)))}
#' @param obsfile_template The template for OBSTABLE files - the default is
#' "obstable", which is \code{OBSTABLE_{YYYY}.sqlite}.
#' @param groupings The groups to verify for. The default is "leadtime". Another
#' common grouping might be \code{groupings = c("leadtime", "fcst_cycle")}.
#' @param by The frequency of forecast cycles to verify.
#' @param climatology The climatology to use for the Brier Skill Score. Can be
#' "sample" for the sample climatology (the default), a named list with
#' elements eps_model and member to use a member of an eps model in the
#' harp_fcst object for the climatology, or a data frame with columns for
#' threshold and climatology and also optionally leadtime.
#' @param stations The stations to verify for. The default is to use all
#' stations from \link[harpIO]{station_list} that are common to all
#' \code{fcst_model} domains.
#' @param spread_drop_member Which members to drop for the calculation of the
#' ensemble variance and standard deviation. For harp_fcst objects, this can
#' be a numeric scalar - in which case it is recycled for all forecast models;
#' a list or numeric vector of the same length as the harp_fcst object, or a
#' named list with the names corresponding to names in the harp_fcst object.
#' @param jitter_fcst A function to perturb the forecast values by. This is used
#' to account for observation error in the rank histogram. For other
#' statistics it is likely to make little difference since it is expected that
#' the observations will have a mean error of zero.
#' @param gross_error_check Logical of whether to perform a gross error check.
#' @param min_allowed The minimum value of observation to allow in the gross
#' error check. If set to NULL the default value for the parameter is used.
#' @param max_allowed The maximum value of observation to allow in the gross
#' error check. If set to NULL the default value for the parameter is used.
#' @param num_sd_allowed The number of standard deviations of the forecast that
#' the obseravtions should be within. Set to NULL for automotic value
#' depeninding on parameter.
#' @param show_progress Logical - whether to show a progress bar. Defaults to
#' FALSE.
#' @param verif_path If set, verification files will be saved to this path.
#' @param fctable_file_template The template for the file names of the files to be read
#' from. This would normally be one of the "fctable_*" templates that can be
#' seen in \link{show_file_templates}. Can be a single string, a
#' character vector or list of the same length as \code{fcst_model}. If not
#' named, the order of templates is assumed to be the same as in
#' \code{fcst_model}. If named, the names must match the entries in
#' \code{fcst_model}.
#' @param lags
#' @param lag_fcst_models
#' @param parent_cycles
#' @param lag_direction
#' @param fcst_shifts
#' @param keep_unshifted
#' @param drop_neg_leadtimes
#' @param scale_fcst
#' @param scale_obs
#' @param common_cases_only
#' @param check_obs_fcst
#' @param vertical_coordinate
#' @param merge_lags_on_read
#'
#' @return A list containting two data frames: \code{ens_summary_scores} and
#' \code{ens_threshold_scores}.
#' @export
#'
#' @examples
ens_read_and_verify <- function(
start_date,
end_date,
parameter,
fcst_model,
fcst_path,
obs_path,
lead_time = seq(0, 48, 3),
num_iterations = length(lead_time),
verify_members = TRUE,
thresholds = NULL,
members = NULL,
vertical_coordinate = c(NA_character_, "pressure", "model", "height"),
fctable_file_template = "fctable_eps",
obsfile_template = "obstable",
groupings = "leadtime",
by = "6h",
lags = "0s",
merge_lags_on_read = TRUE,
lag_fcst_models = NULL,
parent_cycles = NULL,
lag_direction = 1,
fcst_shifts = NULL,
keep_unshifted = FALSE,
drop_neg_leadtimes = TRUE,
climatology = "sample",
stations = NULL,
scale_fcst = NULL,
scale_obs = NULL,
spread_drop_member = NULL,
jitter_fcst = NULL,
common_cases_only = TRUE,
common_cases_xtra_cols = NULL,
check_obs_fcst = TRUE,
gross_error_check = TRUE,
min_allowed = NULL,
max_allowed = NULL,
num_sd_allowed = NULL,
show_progress = FALSE,
verif_path = NULL
) {
first_obs <- start_date
last_obs <- (suppressMessages(harpIO::str_datetime_to_unixtime(end_date)) + 3600 * max(lead_time)) %>%
harpIO::unixtime_to_str_datetime(harpIO::YMDhm)
vertical_coordinate <- match.arg(vertical_coordinate)
obs_data <- harpIO::read_point_obs(
start_date = first_obs,
end_date = last_obs,
parameter = parameter,
obs_path = obs_path,
obsfile_template = obsfile_template,
gross_error_check = gross_error_check,
min_allowed = min_allowed,
max_allowed = max_allowed,
stations = stations,
vertical_coordinate = vertical_coordinate
)
parameter_sym <- rlang::sym(parameter)
if (!is.null(scale_obs)) {
stopifnot(is.list(scale_obs))
check_scale_data(scale_obs, "obs")
obs_data <- do.call(
scale_point_obs,
c(list(.obs = obs_data, parameter = parameter_sym), scale_obs)
)
}
verif_data <- list()
if (num_iterations > length(lead_time)) {
num_iterations <- length(lead_time)
}
lead_list <- split(lead_time, sort(seq_along(lead_time) %% num_iterations))
stations_used <- list()
for (i in 1:num_iterations) {
cat("Lead time:", lead_list[[i]], "( Iteration", i, "of", num_iterations, ")\n")
cat(rep("=", 80), "\n", sep = "")
harp_parameter <- harpIO::parse_harp_parameter(
parameter,
vertical_coordinate = vertical_coordinate
)
if (harp_parameter$accum > 0 && lead_list[[i]] * 3600 < harp_parameter$accum) {
warning_message <- paste0(
"Cannot accumulate ",
harpIO:::parse_accum(harp_parameter),
harp_parameter$acc_unit,
" ",
harp_parameter$basename,
" for lead time: ",
lead_list[[i]],
"h. Skipping to next iteration."
)
warning(warning_message, call. = FALSE, immediate. = TRUE)
next()
}
if (length(lags) == 1 && is.null(names(lags))) {
lags <- rep(lags, length(fcst_model)) %>%
as.list() %>%
purrr::set_names(fcst_model)
}
if (!is.null(fcst_shifts)) {
if (keep_unshifted) {
if (!any(grepl("_unshifted$", names(lags)))) {
unshifted_names <- paste0(names(fcst_shifts), "_unshifted")
fcst_model <- c(fcst_model, unshifted_names)
lags[unshifted_names] <- lags[names(fcst_shifts)]
if (!is.null(scale_fcst)) {
shifted_and_scaled <- intersect(names(scale_fcst), names(fcst_shifts))
if (length(shifted_and_scaled) > 0) {
scale_fcst[paste0(shifted_and_scaled, "_unshifted")] <- scale_fcst[shifted_and_scaled]
}
}
if (!is.null(members)) {
shifted_and_select_members <- intersect(names(members), names(fcst_shifts))
if (length(shifted_and_select_members) > 0) {
members[paste0(shifted_and_select_members, "_unshifted")] <- members[shifted_and_select_members]
}
}
if (!is.null(names(fctable_file_template))) {
shifted_and_template <- intersect(names(fctable_file_template), names(fcst_shifts))
if (length(shifted_and_template) > 0) {
fctable_file_template[paste0(shifted_and_template, "_unshifted")] <- fctable_file_template[shifted_and_template]
}
}
}
}
lags[names(fcst_shifts)] <- lapply(fcst_shifts, paste0, "h")
}
fcst_data <- harpIO::read_point_forecast(
start_date = start_date,
end_date = end_date,
fcst_model = fcst_model,
fcst_type = "EPS",
parameter = parameter,
lead_time = lead_list[[i]],
lags = lags,
merge_lags = merge_lags_on_read,
by = by,
file_path = fcst_path,
stations = stations,
members = members,
file_template = fctable_file_template,
vertical_coordinate = vertical_coordinate
) %>%
merge_multimodel()
if (!is.null(scale_fcst)) {
stopifnot(is.list(scale_fcst))
if (is.null(names(scale_fcst))) {
if (length(scale_fcst) == 1 && length(fcst_model) > 1) {
warning("Only one scaling given in 'scale_fcst'. Applying scaling to all elements of 'fcst_model'.", immediate. = TRUE, call. = FALSE)
scale_fcst <- rep(scale_fcst, length(fcst_model)) %>%
purrr::set_names(fcst_model)
} else if (length(scale_fcst) == length(fcst_model)) {
warning("No names given in 'scale_fcst'. Assuming same order as elements of 'fcst_model'.", immediate. = TRUE, call. = FALSE)
names(scale_fcst) <- fcst_model
} else {
stop("'scale_fcst' must be a named list with names as in 'fcst_model'.", call. = FALSE)
}
} else {
bad_names <- setdiff(names(scale_fcst), fcst_model)
if (length(bad_names) > 0) {
stop(paste(bad_names, collapse = ", "), "supplied in 'scale_fcst', but do not exist in 'fcst_model'.", call. = FALSE)
}
}
purrr::walk(scale_fcst, check_scale_data, "fcst")
fcst_data[names(scale_fcst)] <- purrr::map2(
fcst_data[names(scale_fcst)],
scale_fcst,
~ do.call(scale_point_forecast, c(list(.fcst = .x), .y))
)
}
if (!merge_lags_on_read) {
if (!is.null(lag_fcst_models)) {
if (is.null(parent_cycles)) {
stop("'parent_cycles' must be passed as well as 'lag_fcst_models'.")
}
fcst_data <- lag_forecast(
fcst_data,
lag_fcst_models,
parent_cycles,
direction = lag_direction
)
}
}
if (!is.null(fcst_shifts)) {
if (merge_lags_on_read) {
shifted_models <- names(fcst_data)[names(fcst_data) %in% names(fcst_shifts)]
names(fcst_data)[names(fcst_data) %in% names(fcst_shifts)] <- paste(
shifted_models,
"shifted",
paste0(fcst_shifts, "h"),
sep = "_"
)
} else {
fcst_data <- shift_forecast(
fcst_data,
fcst_shifts,
keep_unshifted = FALSE,
drop_negative_lead_times = drop_neg_leadtimes
)
}
}
fcst_data <- fcst_data %>%
dplyr::filter(.data$leadtime %in% lead_list[[i]])
if (common_cases_only) {
col_names <- unique(unlist(lapply(fcst_data, colnames)))
xtra_cols_err <- paste("common_cases_xtra_cols must be wrapped in vars and unquoted,\n",
"e.g. common_cases_xtra_cols = vars(p).")
xtra_cols_null <- try(is.null(common_cases_xtra_cols), silent = TRUE)
if (inherits(xtra_cols_null, "try-error")) {
stop(xtra_cols_err, call. = FALSE)
} else {
if (xtra_cols_null) {
fcst_data <- common_cases(fcst_data)
} else {
if (inherits(common_cases_xtra_cols, "quosures")) {
xtra_cols <- purrr::map_chr(rlang::eval_tidy(common_cases_xtra_cols), rlang::quo_name)
if (length(setdiff(xtra_cols, col_names)) > 1) {
stop(
"Column(s) '", paste(setdiff(xtra_cols, col_names), collapse = "','"), "' ",
"for selecting common cases not found.",
call. = FALSE
)
} else {
fcst_data <- common_cases(fcst_data, !!!common_cases_xtra_cols)
}
} else {
stop(xtra_cols_err, call. = FALSE)
}
}
}
}
if (all(sapply(fcst_data, nrow)) < 1) {
message("No forecast data available. Skipping to next iteration.")
next()
}
# Check column names for parameter - if not found, try the base name
if (!is.element(harp_parameter[["fullname"]], colnames(obs_data))) {
if (is.element(harp_parameter[["basename"]], colnames(obs_data))) {
names(obs_data)[names(obs_data) == harp_parameter[["basename"]]] <- harp_parameter[["fullname"]]
} else {
stop("Don't know what to do with parameter '", harp_parameter[["fullname"]], "'.", call. = FALSE)
}
}
fcst_data <- join_to_fcst(fcst_data, obs_data)
stations_used[[i]] <- unique(unlist(dplyr::pull(fcst_data, .data[["SID"]])))
if (any(purrr::map_int(fcst_data, nrow) == 0)) next()
if (check_obs_fcst) {
fcst_data <- check_obs_against_fcst(fcst_data, !! parameter_sym, num_sd_allowed = num_sd_allowed)
}
if (any(purrr::map_int(fcst_data, nrow) == 0)) next()
verif_data[[i]] <- ens_verify(
fcst_data,
!! parameter_sym,
verify_members = verify_members,
thresholds = thresholds,
groupings = groupings,
spread_drop_member = spread_drop_member,
jitter_fcst = jitter_fcst,
climatology = climatology,
show_progress = show_progress
)
} # end loop over lead times
verif_data <- verif_data[purrr::map_lgl(verif_data, ~!is.null(.x))]
if (length(verif_data) < 1) {
stop("No data to verify", call. = FALSE)
}
stations_used <- unique(unlist(stations_used))
verif_data <- bind_point_verif(verif_data)
verif_attributes <- attributes(verif_data)
verif_data <- purrr::map(
verif_data,
~ dplyr::mutate(
.x,
mname = case_when(
grepl("_unshifted$", .data$mname) ~ gsub("_unshifted", "", .data$mname),
TRUE ~ .data$mname
)
)
)
attributes(verif_data) <- verif_attributes
attr(verif_data, "num_stations") <- as.character(length(stations_used))
attr(verif_data, "stations") <- sort(stations_used)
if (!is.null(verif_path)) {
harpIO::save_point_verif(verif_data, verif_path = verif_path)
}
verif_data
}
check_scale_data <- function(scale_data, scale_what) {
expected_args <- sort(c("scale_factor", "new_units", "multiplicative"))
ref_function <- ifelse(scale_what == "obs", "'scale_point_obs'", "'scale_point_forecast'")
text_start <- ifelse(scale_what == "obs", "'scale_obs'", "Elements of 'scale_point_forecast'")
if (!identical(sort(names(scale_data)), sort(expected_args))) {
stop(
text_start, " must be a named list with names scale_factor, new_units and multiplicative\n",
"See ", ref_function, " for more details.",
call. = FALSE
)
}
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.