knitr::opts_chunk$set(collapse = TRUE, comment = "#>")
The analysis pipeline for this work can be regenerated by rendering this file,
rmarkdown::render("_targets.Rmd")
The pipeline can then be run using,
tar_make()
The complete pipeline can be visualised using,
tar_visnetwork()
Alternatively the pipeline can be explored interactively using this notebook or updated programmatically using the scripts in bin
. We also provide an archived version of our targets
workflow if only wanting to reproduce sections of our analysis. This can be downloaded using the following,
source(here::here("R", "targets-archive.R")) get_targets_archive()
Set up the workflow pipeline and options. We first load the targets
package and remove the potentially outdated workflow.
library(targets) library(tarchetypes) library(data.table) library(epinowcast) library(ggplot2) library(purrr, quietly = TRUE) library(here) library(future) library(future.callr) tar_unscript()
We now define shared global options across our workflow and load R functions from the R
folder.
```{targets globals, tar_globals = TRUE}
library(targets)
library(tarchetypes)
library(cmdstanr)
library(data.table)
library(epinowcast)
library(ggplot2)
library(purrr, quietly = TRUE)
library(here)
library(future)
library(future.callr)
plan(callr)
functions <- list.files(here("R"), full.names = TRUE)
walk(functions, source)
rm("functions")
set_cmdstan_path()
tar_option_set(
packages = c("data.table", "epinowcast", "scoringutils", "ggplot2", "purrr",
"cmdstanr", "here"),
deployment = "worker",
memory = "transient",
workspace_on_error = TRUE,
error = "continue",
garbage_collection = TRUE
)
# Ingest data and stratify by location and report date * Watch the URL at which hospitalisation data is hosted in order to trigger an update to the workflow when new data is uploaded ```{targets hospitalisation_url} tar_url( hospitalisation_url, "https://raw.githubusercontent.com/KITmetricslab/hospitalization-nowcast-hub/main/data-truth/COVID-19/COVID-19_hospitalizations_preprocessed.csv" # nolint )
```{targets hospitalisations, tar_simple = TRUE} get_germany_hospitalisations(url = hospitalisation_url)
* Define date to start nowcasting ```{targets start_date, tar_simple = TRUE} as.Date("2021-10-01")
```{targets nowcast_dates, tar_simple = TRUE} unique( hospitalisations[reference_date >= start_date]$reference_date )
* Define age groups ```{targets age_groups, tar_simple = TRUE} unique(hospitalisations$age_group)
```{targets locations, tar_simple = TRUE} "DE"
* Define locations in which to nowcast only for the overall count and not for each age group. ```{targets other_locations, tar_simple = TRUE} setdiff(unique(hospitalisations$location), locations)
```{targets max_report_delay, tar_simple = TRUE} 40
* Define hospitalisations by date of report. Note that here the target is defined to never update if data exists for that nowcast date. This is because the truth date for previous days can change slightly over time as negative cases are adjusted for leading to spurious refits. ```{targets hospitalisations_by_date_report} tar_target( hospitalisations_by_date_report, enw_retrospective_data( hospitalisations, rep_date = nowcast_dates, ref_days = max_report_delay )[, nowcast_date := nowcast_dates], map(nowcast_dates), iteration = "list", cue = tar_cue(mode = "never") )
```{targets latest_hospitalisations, tar_simple = TRUE} enw_latest_data(hospitalisations)
* Make latest observations into 7 day incidence. ```{targets, latest_7day_hospitalisations, tar_simple = TRUE} copy(latest_hospitalisations)[, confirm := frollsum(confirm, n = 7), by = c("age_group", "location") ][!is.na(confirm)]
```{targets, complete_hospitalisations, tar_simple = TRUE} latest_hospitalisations[reference_date < (max(reference_date) - 28)]
* Similarly, define "complete" 7 day hospitalisation incidence. ```{targets, complete_7day_hospitalisations, tar_simple = TRUE} latest_7day_hospitalisations[reference_date < (max(reference_date) - 28)]
```{targets, epinowcast_model} tar_file( epinowcast_model, compile_model(), deployment = "main" )
* Define stan settings shared across models and used for fitting ```{targets epinowcast_settings, tar_simple = TRUE} list( save_warmup = FALSE, output_loglik = FALSE, pp = FALSE, iter_warmup = 1000, iter_sampling = 1000, chains = 2, parallel_chains = 2, threads_per_chain = 1, adapt_delta = 0.95, show_messages = FALSE, refresh = 0 )
epinowcast
.```{targets uninformed_priors, tar_simple = TRUE} enw_priors()
* Define a set of observations to use as a source for informed priors. Here we use overall national hospitalisations and data from the 1st of July 2021. ```{targets prior_obs} tar_target( prior_obs, enw_retrospective_data( hospitalisations[location == "DE"][age_group == "00+"], rep_date = as.Date("2021-07-01"), ref_days = max_report_delay ), cue = tar_cue(mode = "never") )
```{targets priors, tar_simple = TRUE} do.call(prior_epinowcast, c( list( prior_obs, max_delay = max_report_delay, scale = 10, priors = uninformed_priors ), epinowcast_settings ))
## Models * Intercept only model. ```{targets fixed_nowcast} tar_target( fixed_nowcast, nowcast( obs = hospitalisations_by_date_report, tar_loc = locations, model = fixed_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, locations), cue = tar_cue(mode = "never") )
```{targets dow_nowcast} tar_target( dow_nowcast, nowcast( obs = hospitalisations_by_date_report, tar_loc = locations, model = dow_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, locations), cue = tar_cue(mode = "never") )
* Age group random effect model with day of the week effect reporting effects. ```{targets age_nowcast} tar_target( age_nowcast, nowcast( obs = hospitalisations_by_date_report, tar_loc = locations, model = age_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, locations), cue = tar_cue(mode = "never") )
```{targets week_nowcast} tar_target( week_nowcast, nowcast( obs = hospitalisations_by_date_report, tar_loc = locations, model = week_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, locations), cue = tar_cue(mode = "never") )
* Age group random effect model with an age group specific weekly random walk, and a day of the week reporting effect. ```{targets age_week_nowcast} tar_target( age_week_nowcast, nowcast( obs = hospitalisations_by_date_report, tar_loc = locations, model = age_week_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, locations), cue = tar_cue(mode = "never") )
```{targets independent_nowcast} tar_target( independent_nowcast, nowcast( obs = hospitalisations_by_date_report[age_group == age_groups], tar_loc = locations, model = independent_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, locations, age_groups), cue = tar_cue(mode = "never") )
* Independent age group model with a weekly random walk and a day of the week reference and reporting model. ```{targets independent_ref_dow_nowcast} tar_target( independent_ref_dow_nowcast, nowcast( obs = hospitalisations_by_date_report[age_group == age_groups], tar_loc = locations, model = independent_ref_dow_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, locations, age_groups), cue = tar_cue(mode = "never") )
cue
settings here so that once a model has been fit it is never run again for that combination of locations and dates regardless of upstream changes).```{targets overall_only_nowcast} tar_target( overall_only_nowcast, nowcast( obs = hospitalisations_by_date_report[age_group == "00+"], tar_loc = other_locations, model = independent_ref_dow_epinowcast, priors = priors, max_delay = max_report_delay, settings = epinowcast_settings ), cross(hospitalisations_by_date_report, other_locations), cue = tar_cue(mode = "never") )
# Postprocess * Combine nowcasts from each model ```{targets combined_nowcasts, tar_simple = TRUE} rbindlist(list( fixed_nowcast, dow_nowcast, age_nowcast, week_nowcast, age_week_nowcast, independent_nowcast, overall_only_nowcast, independent_ref_dow_nowcast ), use.names = TRUE, fill = TRUE)[, model := factor( model, levels = c("Reference: Fixed, Report: Fixed", "Reference: Fixed, Report: Day of week", "Reference: Age, Report: Day of week", "Reference: Age and week, Report: Day of week", "Reference: Age and week by age, Report: Day of week", "Independent by age, Reference: Week, Report: Day of week", "Independent by age, Reference: Week and day of week, Report: Day of week") ) ][, id := 1:.N]
```{targets summarised_nowcast, tar_simple = TRUE} combined_nowcasts |> adjust_posteriors( target = "daily", max_ratio = 0.25, rhat_bound = 1.1, per_dt_bound = 0.2 ) |> unnest_nowcasts(target = "daily")
* Extract summarised 7 day nowcast. As a temporary measure here we adjust quantiles that are more than 25% of the median when there is evidence of a fitting issue (based on divergent transistions and R hat values). ```{targets summarised_7day_nowcast, tar_simple = TRUE} combined_nowcasts |> adjust_posteriors( target = "seven_day", max_ratio = 0.25, rhat_bound = 1.1, per_dt_bound = 0.2 ) |> unnest_nowcasts(target = "seven_day")
```{targets save_daily_nowcasts} tar_file( save_daily_nowcasts, summarised_nowcast[nowcast_date == nowcast_dates] |> save_csv( filename = paste0(nowcast_dates, ".csv"), path = here("data/nowcasts/daily"), allow_empty = FALSE ), map(nowcast_dates), cue = tar_cue(mode = "never") )
* Save 7 day nowcasts stratified by nowcasting date ```{targets save_7day_nowcasts} tar_file( save_7day_nowcasts, summarised_7day_nowcast[nowcast_date == nowcast_dates] |> save_csv( filename = paste0(nowcast_dates, ".csv"), path = here("data/nowcasts/seven_day"), allow_empty = FALSE ), map(nowcast_dates), cue = tar_cue(mode = "never") )
```{targets hierarchical_submission_nowcast} tar_target( hierarchical_submission_nowcast, age_week_nowcast[nowcast_date == nowcast_dates] |> adjust_posteriors( target = "seven_day", max_ratio = 0.25, rhat_bound = 1.1, per_dt_bound = 0.2 ) |> select_var("seven_day") |> rbindlist() |> format_for_submission(), map(nowcast_dates), iteration = "list", cue = tar_cue(mode = "never") )
* Format and save 7 day hierarchical nowcasts for hub submission. ```{targets save_hierarchical_submission} tar_file( save_hierarchical_submission, save_csv( hierarchical_submission_nowcast, filename = paste0(nowcast_dates, ".csv"), path = here("data/nowcasts/submission/hierarchical") ), map(hierarchical_submission_nowcast, nowcast_dates) )
```{targets independent_submission_nowcast} tar_target( independent_submission_nowcast, rbind( independent_ref_dow_nowcast[nowcast_date == nowcast_dates], overall_only_nowcast[nowcast_date == nowcast_dates] ) |> adjust_posteriors( target = "seven_day", max_ratio = 0.25, rhat_bound = 1.1, per_dt_bound = 0.2 ) |> select_var("seven_day") |> rbindlist() |> format_for_submission(), map(nowcast_dates), iteration = "list", cue = tar_cue(mode = "never") )
* Format and save 7 day independent nowcasts for hub submission. ```{targets save_independent_submission} tar_file( save_independent_submission, save_csv( independent_submission_nowcast, filename = paste0(nowcast_dates, ".csv"), path = here("data/nowcasts/submission/independent") ), map(independent_submission_nowcast, nowcast_dates) )
```{targets save_latest_daily_hospitalisations} tar_file( save_latest_daily_hospitalisations, save_csv( latest_hospitalisations, filename = paste0("daily.csv"), path = here("data/observations") ) )
* Save 7 day hospitalisation data ```{targets save_latest_7day_hospitalisations} tar_file( save_latest_7day_hospitalisations, save_csv( latest_7day_hospitalisations, filename = paste0("seven_day.csv"), path = here("data/observations") ) )
```{targets diagnostics, tar_simple = TRUE} combined_nowcasts[, c("daily", "seven_day") := NULL]
```{targets save_diagnostics} list( tar_file( save_all_diagnostics, save_csv( diagnostics, filename = "all.csv", path = here("data/diagnostics") ) ), tar_file( save_high_rhat_diagnostics, save_csv( diagnostics[max_rhat > 1.05], filename = "high-rhat.csv", path = here("data/diagnostics") ) ), tar_file( save_high_divergent_transitions, save_csv( diagnostics[per_divergent_transitions > 0.1], filename = "high-divergent-transitions.csv", path = here("data/diagnostics") ) ), tar_file( save_failures, save_csv( diagnostics[failure == TRUE], filename = "fitting-failed.csv", path = here("data/diagnostics") ) ) )
```{targets save-run-time} tar_file( save_run_time, combined_nowcasts |> summarise_runtimes() |> save_csv( filename = "run-times.csv", path = here("data/diagnostics") ) )
* Filter nowcasts to only include those with "complete" (more than 28 days of reports) data for all age groupsand with horizons between 0 days and -7 days from the nowcast date. ```{targets scored_nowcasts, tar_simple = TRUE} summarised_nowcast[location == locations][ reference_date < (max(nowcast_date) - 28)][, holiday := NULL][, horizon := as.numeric(as.Date(reference_date) - nowcast_date)][ horizon >= -7 ]
data/scores
.```{targets, score} tar_map( list(score_by = list( "overall", "age_group", "horizon", "reference_date", "nowcast_date" )), tar_target( scores, enw_score_nowcast( scored_nowcasts, complete_hospitalisations, summarise_by = drop_string(c(score_by, "model"), "overall"), log = FALSE ) ), tar_target( log_scores, enw_score_nowcast( scored_nowcasts, complete_hospitalisations, summarise_by = drop_string(c(score_by, "model"), "overall"), log = TRUE ) ), tar_file( save_scores, save_csv( rbind(scores[, scale := "natural"], log_scores[, scale := "log"]), filename = paste0(paste(score_by, sep = "-"), ".csv"), path = here("data/scores") ) ) )
# Visualise **Currently these are not produced. See the real time report instead** * Plot most recent daily nowcast by location, age group, and model. ```r tar_target( plot_latest_nowcast, enw_plot_nowcast_quantiles( summarised_nowcast[nowcast_date == max(nowcast_date)][ location == locations][ reference_date >= (nowcast_date - 28)] ) + facet_grid(vars(age_group), vars(model), scales = "free_y"), map(locations), iteration = "list" )
tar_target( plot_latest_7day_nowcast, enw_plot_nowcast_quantiles( summarised_7day_nowcast[nowcast_date == max(nowcast_date)][ location == locations][ reference_date >= (nowcast_date - 28)] ) + facet_grid(vars(age_group), vars(model), scales = "free_y"), map(locations), iteration = "list" )
tar_map( list(horizons = 0:7), tar_target( plot_nowcast_horizon, enw_plot_nowcast_quantiles( scored_nowcasts[location == locations][horizon == -horizons][, confirm := NA], latest_obs = latest_hospitalisations[ location == locations][ reference_date >= min(scored_nowcasts$reference_date)][ reference_date <= max(scored_nowcasts$reference_date) ], log = TRUE ) + facet_grid(vars(age_group), vars(model), scales = "free_y"), map(locations), iteration = "list" ) )
tar_map( list(horizons = 0:7), tar_target( plot_7day_nowcast_horizon, enw_plot_nowcast_quantiles( summarised_7day_nowcast[ reference_date < (max(nowcast_date) - 28)][, holiday := NULL][, horizon := as.numeric(as.Date(reference_date) - nowcast_date)][ location == locations][ horizon == -horizons][, confirm := NA], latest_obs = latest_7day_hospitalisations[ location == locations][ reference_date >= min(scored_nowcasts$reference_date)][ reference_date <= max(scored_nowcasts$reference_date) ], log = TRUE ) + facet_grid(vars(age_group), vars(model), scales = "free_y"), map(locations), iteration = "list" ) )
Plot daily nowcasts for locations without age groups stratified nowcasts.
Plot 7 day nowcasts for locations without age group stratified nowcasts.
Plot scores relative to the baseline by location and age group on both the natural and log scale.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.