_targets.md

Analysis pipeline: Evaluating Semi-Parametric Nowcasts of COVID-19 Hospital Admissions in Germany ================ Sam Abbott

Pipeline

The analysis pipeline for this work can be regenerated by rendering this file,

rmarkdown::render("_targets.Rmd")

The pipeline can then be run using,

tar_make()

The complete pipeline can be visualised using,

tar_visnetwork()

Alternatively the pipeline can be explored interactively using this notebook or updated programmatically using the scripts in bin. We also provide an archived version of our targets workflow if only wanting to reproduce sections of our analysis. This can be downloaded using the following,

source(here::here("R", "targets-archive.R"))
get_targets_archive()

Setup

Set up the workflow pipeline and options. We first load the targets package and remove the potentially outdated workflow.

library(targets)
library(tarchetypes)
library(data.table)
library(epinowcast)
library(ggplot2)
library(purrr, quietly = TRUE)
#> 
#> Attaching package: 'purrr'
#> The following object is masked from 'package:data.table':
#> 
#>     transpose
library(here)
#> here() starts at /eval-germany-sp-nowcasting
library(future)
library(future.callr)
tar_unscript()

We now define shared global options across our workflow and load R functions from the R folder.

library(targets)
library(tarchetypes)
library(cmdstanr)
library(data.table)
library(epinowcast)
library(ggplot2)
library(purrr, quietly = TRUE)
library(here)
library(future)
library(future.callr)
plan(callr)
functions <- list.files(here("R"), full.names = TRUE)
walk(functions, source)
rm("functions")
set_cmdstan_path()  
tar_option_set(
  packages = c("data.table", "epinowcast", "scoringutils", "ggplot2", "purrr",
               "cmdstanr", "here"),
  deployment = "worker",
  memory = "transient",
  workspace_on_error = TRUE,
  error = "continue",
  garbage_collection = TRUE
)
#> Establish _targets.R and _targets_r/globals/globals.R.

Ingest data and stratify by location and report date

tar_url(
  hospitalisation_url,
  "https://raw.githubusercontent.com/KITmetricslab/hospitalization-nowcast-hub/main/data-truth/COVID-19/COVID-19_hospitalizations_preprocessed.csv" # nolint
)
#> Establish _targets.R and _targets_r/targets/hospitalisation_url.R.
tar_target(hospitalisations, {
  get_germany_hospitalisations(url = hospitalisation_url)
})
#> Define target hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/hospitalisations.R.
tar_target(start_date, {
  as.Date("2021-10-01")
})
#> Define target start_date from chunk code.
#> Establish _targets.R and _targets_r/targets/start_date.R.
tar_target(nowcast_dates, {
  unique(
    hospitalisations[reference_date >= start_date]$reference_date
  )
})
#> Define target nowcast_dates from chunk code.
#> Establish _targets.R and _targets_r/targets/nowcast_dates.R.
tar_target(age_groups, {
  unique(hospitalisations$age_group)
})
#> Define target age_groups from chunk code.
#> Establish _targets.R and _targets_r/targets/age_groups.R.
tar_target(locations, {
  "DE"
})
#> Define target locations from chunk code.
#> Establish _targets.R and _targets_r/targets/locations.R.
tar_target(other_locations, {
  setdiff(unique(hospitalisations$location), locations)
})
#> Define target other_locations from chunk code.
#> Establish _targets.R and _targets_r/targets/other_locations.R.
tar_target(max_report_delay, {
  40
})
#> Define target max_report_delay from chunk code.
#> Establish _targets.R and _targets_r/targets/max_report_delay.R.
tar_target(
  hospitalisations_by_date_report,
  enw_retrospective_data(
    hospitalisations,
    rep_date = nowcast_dates,
    ref_days = max_report_delay
  )[, nowcast_date := nowcast_dates],
  map(nowcast_dates),
  iteration = "list",
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/hospitalisations_by_date_report.R.
tar_target(latest_hospitalisations, {
   enw_latest_data(hospitalisations)
})
#> Define target latest_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/latest_hospitalisations.R.
tar_target(latest_7day_hospitalisations, {
  copy(latest_hospitalisations)[,
    confirm := frollsum(confirm, n =  7), by = c("age_group", "location")
  ][!is.na(confirm)]
})
#> Define target latest_7day_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/latest_7day_hospitalisations.R.
tar_target(complete_hospitalisations, {
  latest_hospitalisations[reference_date < (max(reference_date) - 28)]
})
#> Define target complete_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/complete_hospitalisations.R.
tar_target(complete_7day_hospitalisations, {
  latest_7day_hospitalisations[reference_date < (max(reference_date) - 28)]
})
#> Define target complete_7day_hospitalisations from chunk code.
#> Establish _targets.R and _targets_r/targets/complete_7day_hospitalisations.R.

Fit models and produce nowcasts

Model and model settings

tar_file(
  epinowcast_model,
  compile_model(),
  deployment = "main"
)
#> Establish _targets.R and _targets_r/targets/epinowcast_model.R.
tar_target(epinowcast_settings, {
  list(
    save_warmup = FALSE,
    output_loglik = FALSE,
    pp = FALSE,
    iter_warmup = 1000,
    iter_sampling = 1000,
    chains = 2,
    parallel_chains = 2,
    threads_per_chain = 1,
    adapt_delta = 0.95,
    show_messages = FALSE,
    refresh = 0
  )
})
#> Define target epinowcast_settings from chunk code.
#> Establish _targets.R and _targets_r/targets/epinowcast_settings.R.

Priors

tar_target(uninformed_priors, {
  enw_priors()
})
#> Define target uninformed_priors from chunk code.
#> Establish _targets.R and _targets_r/targets/uninformed_priors.R.
tar_target(
  prior_obs,
  enw_retrospective_data(
    hospitalisations[location == "DE"][age_group == "00+"],
    rep_date = as.Date("2021-07-01"), ref_days = max_report_delay
  ),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/prior_obs.R.
tar_target(priors, {
  do.call(prior_epinowcast, c(
    list(
      prior_obs, max_delay = max_report_delay, scale = 10,
      priors = uninformed_priors
    ),
    epinowcast_settings
  ))
})
#> Define target priors from chunk code.
#> Establish _targets.R and _targets_r/targets/priors.R.

Models

tar_target(
  fixed_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = fixed_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/fixed_nowcast.R.
tar_target(
  dow_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = dow_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/dow_nowcast.R.
tar_target(
  age_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = age_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/age_nowcast.R.
tar_target(
  week_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = week_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/week_nowcast.R.
tar_target(
  age_week_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report,
    tar_loc = locations,
    model = age_week_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/age_week_nowcast.R.
tar_target(
  independent_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report[age_group == age_groups],
    tar_loc = locations,
    model = independent_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations, age_groups),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/independent_nowcast.R.
tar_target(
  independent_ref_dow_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report[age_group == age_groups],
    tar_loc = locations,
    model = independent_ref_dow_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, locations, age_groups),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/independent_ref_dow_nowcast.R.
tar_target(
  overall_only_nowcast,
  nowcast(
    obs = hospitalisations_by_date_report[age_group == "00+"],
    tar_loc = other_locations,
    model = independent_ref_dow_epinowcast,
    priors = priors,
    max_delay = max_report_delay,
    settings = epinowcast_settings
  ),
  cross(hospitalisations_by_date_report, other_locations),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/overall_only_nowcast.R.

Postprocess

tar_target(combined_nowcasts, {
  rbindlist(list(
    fixed_nowcast,
    dow_nowcast,
    age_nowcast,
    week_nowcast,
    age_week_nowcast,
    independent_nowcast,
    overall_only_nowcast,
    independent_ref_dow_nowcast
  ), use.names = TRUE, fill = TRUE)[,
     model := factor(
      model,
      levels = c("Reference: Fixed, Report: Fixed",
                 "Reference: Fixed, Report: Day of week",
                 "Reference: Age, Report: Day of week",
                 "Reference: Age and week, Report: Day of week",
                 "Reference: Age and week by age, Report: Day of week",
                 "Independent by age, Reference: Week, Report: Day of week",
                 "Independent by age, Reference: Week and day of week, Report: Day of week")
     )
    ][, id := 1:.N]
})
#> Define target combined_nowcasts from chunk code.
#> Establish _targets.R and _targets_r/targets/combined_nowcasts.R.
tar_target(summarised_nowcast, {
  combined_nowcasts |> 
    adjust_posteriors(
      target = "daily", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
  ) |> 
    unnest_nowcasts(target = "daily") 
})
#> Define target summarised_nowcast from chunk code.
#> Establish _targets.R and _targets_r/targets/summarised_nowcast.R.
tar_target(summarised_7day_nowcast, {
  combined_nowcasts |> 
    adjust_posteriors(
      target = "seven_day", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
  ) |> 
    unnest_nowcasts(target = "seven_day") 
})
#> Define target summarised_7day_nowcast from chunk code.
#> Establish _targets.R and _targets_r/targets/summarised_7day_nowcast.R.
tar_file(
  save_daily_nowcasts,
  summarised_nowcast[nowcast_date == nowcast_dates] |>
    save_csv(
      filename = paste0(nowcast_dates, ".csv"),
      path = here("data/nowcasts/daily"),
      allow_empty = FALSE
    ),
  map(nowcast_dates),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/save_daily_nowcasts.R.
tar_file(
  save_7day_nowcasts,
  summarised_7day_nowcast[nowcast_date == nowcast_dates] |>
    save_csv(
      filename = paste0(nowcast_dates, ".csv"),
      path = here("data/nowcasts/seven_day"),
      allow_empty = FALSE
    ),
  map(nowcast_dates),
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/save_7day_nowcasts.R.
tar_target(
  hierarchical_submission_nowcast,
  age_week_nowcast[nowcast_date == nowcast_dates] |>
    adjust_posteriors(
      target = "seven_day", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
    ) |>
    select_var("seven_day") |>
    rbindlist() |>
    format_for_submission(),
  map(nowcast_dates),
  iteration = "list",
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/hierarchical_submission_nowcast.R.
tar_file(
  save_hierarchical_submission,
  save_csv(
    hierarchical_submission_nowcast,
      filename = paste0(nowcast_dates, ".csv"),
      path = here("data/nowcasts/submission/hierarchical")
  ),
  map(hierarchical_submission_nowcast, nowcast_dates)
)
#> Establish _targets.R and _targets_r/targets/save_hierarchical_submission.R.
tar_target(
  independent_submission_nowcast,
  rbind(
    independent_ref_dow_nowcast[nowcast_date == nowcast_dates],
    overall_only_nowcast[nowcast_date == nowcast_dates]
  ) |> 
   adjust_posteriors(
      target = "seven_day", 
      max_ratio = 0.25, 
      rhat_bound = 1.1,
      per_dt_bound = 0.2
    ) |>
    select_var("seven_day") |>
    rbindlist() |>
    format_for_submission(),
  map(nowcast_dates),
  iteration = "list",
  cue = tar_cue(mode = "never")
)
#> Establish _targets.R and _targets_r/targets/independent_submission_nowcast.R.
tar_file(
  save_independent_submission,
  save_csv(
    independent_submission_nowcast,
    filename = paste0(nowcast_dates, ".csv"),
    path = here("data/nowcasts/submission/independent")
  ),
  map(independent_submission_nowcast, nowcast_dates)
)
#> Establish _targets.R and _targets_r/targets/save_independent_submission.R.
tar_file(
  save_latest_daily_hospitalisations,
  save_csv(
    latest_hospitalisations,
    filename = paste0("daily.csv"),
    path = here("data/observations")
  )
)
#> Establish _targets.R and _targets_r/targets/save_latest_daily_hospitalisations.R.
tar_file(
  save_latest_7day_hospitalisations,
  save_csv(
    latest_7day_hospitalisations,
    filename = paste0("seven_day.csv"),
    path = here("data/observations")
  )
)
#> Establish _targets.R and _targets_r/targets/save_latest_7day_hospitalisations.R.

Evaluation

tar_target(diagnostics, {
  combined_nowcasts[, c("daily", "seven_day") := NULL]
})
#> Define target diagnostics from chunk code.
#> Establish _targets.R and _targets_r/targets/diagnostics.R.
list(
  tar_file(
    save_all_diagnostics,
      save_csv(
        diagnostics,
        filename = "all.csv",
        path = here("data/diagnostics")
      )
  ),
  tar_file(
    save_high_rhat_diagnostics,
      save_csv(
        diagnostics[max_rhat > 1.05],
        filename = "high-rhat.csv",
        path = here("data/diagnostics")
      )
  ),
  tar_file(
    save_high_divergent_transitions,
      save_csv(
        diagnostics[per_divergent_transitions > 0.1],
        filename = "high-divergent-transitions.csv",
        path = here("data/diagnostics")
      )
  ),
  tar_file(
    save_failures,
      save_csv(
        diagnostics[failure == TRUE],
        filename = "fitting-failed.csv",
        path = here("data/diagnostics")
      )
  )
)
#> Establish _targets.R and _targets_r/targets/save_diagnostics.R.
tar_file(
  save_run_time,
  combined_nowcasts |>
    summarise_runtimes() |>
    save_csv(
      filename = "run-times.csv",
      path = here("data/diagnostics")
    )
)
#> Establish _targets.R and _targets_r/targets/save-run-time.R.
tar_target(scored_nowcasts, {
  summarised_nowcast[location == locations][
    reference_date < (max(nowcast_date) - 28)][,
    holiday := NULL][,
    horizon := as.numeric(as.Date(reference_date) - nowcast_date)][
    horizon >= -7
  ]
})
#> Define target scored_nowcasts from chunk code.
#> Establish _targets.R and _targets_r/targets/scored_nowcasts.R.
tar_map(
  list(score_by = list(
    "overall", "age_group", "horizon", "reference_date", "nowcast_date"
  )),
  tar_target(
    scores,
    enw_score_nowcast(
      scored_nowcasts, complete_hospitalisations, 
      summarise_by = drop_string(c(score_by, "model"), "overall"),
      log = FALSE
    )
  ),
  tar_target(
    log_scores,
    enw_score_nowcast(
      scored_nowcasts, complete_hospitalisations, 
      summarise_by = drop_string(c(score_by, "model"), "overall"),
      log = TRUE
    )
  ),
  tar_file(
    save_scores,
    save_csv(
      rbind(scores[, scale := "natural"], log_scores[, scale := "log"]),
      filename = paste0(paste(score_by, sep = "-"), ".csv"),
      path = here("data/scores")
    )
  )
)
#> Establish _targets.R and _targets_r/targets/score.R.

Visualise

Currently these are not produced. See the real time report instead

tar_target(
  plot_latest_nowcast,
  enw_plot_nowcast_quantiles(
    summarised_nowcast[nowcast_date == max(nowcast_date)][
                       location == locations][
                       reference_date >= (nowcast_date - 28)]
  ) +
  facet_grid(vars(age_group), vars(model), scales = "free_y"),
  map(locations),
  iteration = "list"
)
tar_target(
  plot_latest_7day_nowcast,
  enw_plot_nowcast_quantiles(
    summarised_7day_nowcast[nowcast_date == max(nowcast_date)][
                            location == locations][
                            reference_date >= (nowcast_date - 28)]
  ) +
  facet_grid(vars(age_group), vars(model), scales = "free_y"),
  map(locations),
  iteration = "list"
)
tar_map(
  list(horizons = 0:7),
  tar_target(
    plot_nowcast_horizon,
    enw_plot_nowcast_quantiles(
      scored_nowcasts[location == locations][horizon == -horizons][,
                      confirm := NA],
      latest_obs = latest_hospitalisations[
        location == locations][
        reference_date >= min(scored_nowcasts$reference_date)][
        reference_date <= max(scored_nowcasts$reference_date)
      ],
      log = TRUE
    ) +
    facet_grid(vars(age_group), vars(model), scales = "free_y"),
    map(locations),
    iteration = "list"
  )
)
tar_map(
  list(horizons = 0:7),
  tar_target(
    plot_7day_nowcast_horizon,
    enw_plot_nowcast_quantiles(
      summarised_7day_nowcast[
        reference_date < (max(nowcast_date) - 28)][,
        holiday := NULL][,
        horizon := as.numeric(as.Date(reference_date) - nowcast_date)][
        location == locations][
        horizon == -horizons][,
        confirm := NA],
      latest_obs = latest_7day_hospitalisations[
        location == locations][
        reference_date >= min(scored_nowcasts$reference_date)][
        reference_date <= max(scored_nowcasts$reference_date)
      ],
      log = TRUE
    ) +
    facet_grid(vars(age_group), vars(model), scales = "free_y"),
    map(locations),
    iteration = "list"
  )
)


epiforecasts/eval-germany-sp-nowcasting documentation built on July 7, 2022, 8:56 p.m.