##' Convert a modelling group's stochastic files into the summary format,
##' ready for later uploading to the Montagu data annex. Four files are
##' produced which reduce age to all-age-total, and under-5-total, by
##' calendar year, or birth-cohort year.
##' @export
##' @title Process stochastic data
##' @import data.table
##' @import readr
##' @importFrom utils write.csv
##' @param con DBI connection to production. Used for verifying certificate
##' against expected properties
##' @param modelling_group The modelling group id
##' @param disease The disease
##' @param touchstone The touchstone (including version) for these estimates
##' @param scenarios A vector of scenario_descriptions. If the files
##' parameter is of length more than 1, then it must be the same length as
##' the number of scenarios, and a one-to-one mapping between the two is
##' assumed.
##' @param in_path The folder containing the stochastic files
##' @param files Either a single string containing placeholders to indicate
##' filenames, or a vector of files, one for each scenario. Placeholders can
##' include :group :touchstone :scenario :disease and :index
##' @param cert Name of the certificate file accompanying the estimates
##' @param index_start A scalar or vector matching the length of scenarios.
##' Each entry is either an integer or NA, indicating the first number in a
##' sequence of files. NA implies there is a single file with no sequence.
##' The placeholder :index in the filenames will be replaced with this.
##' @param index_end Similar to index_start, indicating the last number
##' in a sequence of a files. Can be scalar, applying to all scenarios, or
##' a vector with an entry for each scenario, with an integer value or NA
##' in each case.
##' @param out_path Path to writing output files into
##' @param pre_aggregation_path Path to dir to write out pre age-disaggregated
##' data into. If NULL then this is skipped.
##' @param outcomes A list of names vectors, where the name is the burden
##' outcome, and the elements of the list are the column names in the
##' stochastic files that should be summed to compute that outcome. The
##' default is to expect outcomes `deaths`, `cases`, `dalys`, and `yll`,
##' with single columns with the same names in the stochastic files.
##' @param dalys_recipe If DALYs must be calculated, you can supply a data
##' frame here, and stoner will calculate DALYs using that recipe. The
##' data frame must have names `outcome`, `proportion`, `average_duration`
##' and `disability_weight`. See [stoner_calculate_dalys].
##' @param runid_from_file Occasionally groups have omitted the run_id
##' from the stochastic file, and provided 200 files, one per run_id. Set
##' runid_from_file to TRUE if this is the case, to deduce the run_id from
##' the filenames. The index_start and index_end must be 1 and 200 in
##' this case.
##' @param allow_missing_disease Occasionally groups have omitted the
##' disease column from their stochastic data. Set this to TRUE to expect
##' that circumstance, and avoid generating warnings.
##' @param upload_to_annex Set to TRUE if you want to upload the results
##' straight into annex. (Files will still be created, as the upload is
##' relatively fast; creating the csvs is slower and worth caching)
##' @param annex DBI connection to annex, used if upload_to_annex is TRUE.
##' @param allow_new_database If uploading, then set this to TRUE to enable
##' creating the stochastic_file table if it is not found.
##' @param bypass_cert_check If TRUE, then no checks are carried out on the
##' parameter certificate (if provided).
##' @param testing For internal use only.
##' @param lines Number of lines to read from each file, Inf by default to
##' read all lines. Set a lower number for testing subset of process before
##' doing the full run.
##' @param log_file Path to file to save logs to, NULL to not log to file.
##' If file exists it will be appended to, otherwise file will be created.
##' @param silent TRUE to silence console logs.
stone_stochastic_process <- function(con, modelling_group, disease,
touchstone, scenarios, in_path, files,
cert, index_start, index_end, out_path,
pre_aggregation_path = NULL,
outcomes = list(deaths = "deaths",
cases = "cases",
dalys = "dalys",
yll = "yll"),
dalys_recipe = NULL,
runid_from_file = FALSE,
allow_missing_disease = FALSE,
upload_to_annex = FALSE,
annex = NULL,
allow_new_database = FALSE,
bypass_cert_check = FALSE,
testing = FALSE,
lines = Inf,
log_file = NULL,
silent = FALSE) {
start <- Sys.time()
## Initialise logger
if (!is.null(log_file)) {
if (dir.exists(log_file)) {
stop(sprintf("Log file '%s' is a directory, must be a path to a file",
if (!file.exists(log_file)) {
file.create(log_file, showWarnings = FALSE, overwrite = FALSE)
lg <- get_logger()
if (silent) {
threshold <- lg$inherited_appenders$console$threshold
on.exit(lg$inherited_appenders$console$set_threshold(threshold), add = TRUE)
if (!is.null(log_file)) {
lg$add_appender(lgr::AppenderFile$new(log_file), name = "file")
on.exit(lg$remove_appender("file"), add = TRUE)
## Setup life table cache
cache$life_table <- NULL
# Do all the parameter and file testing upfront, as this can be a very
# time-consuming process...
touchpoint <- list(
modelling_group = modelling_group,
disease = disease,
touchstone = touchstone
files <- stochastic_process_validate(con,
touchpoint = touchpoint,
scenarios = scenarios,
in_path = in_path,
files = files,
index_start = index_start,
index_end = index_end,
out_path = out_path,
pre_aggregation_path = pre_aggregation_path,
outcomes = outcomes,
dalys_recipe = dalys_recipe,
runid_from_file = runid_from_file,
upload_to_annex = upload_to_annex,
annex = annex,
cert = cert,
bypass_cert_check = bypass_cert_check,
lines = lines),
error = function(e) {
lg$fatal(paste0("Processing for modelling_group: %s, disease: %s ",
"failed with error \n %s"),
modelling_group, disease, e$message)
read_params <- list(
in_path = in_path,
files = files,
runid_from_file = runid_from_file,
allow_missing_disease = allow_missing_disease,
lines = lines
lg$info(paste0("Validated inputs, processing scenario data for ",
"modelling_group: %s, disease: %s"), modelling_group, disease)
scenario_data <- timed(
touchpoint = touchpoint,
scenarios = scenarios,
read_params = read_params,
outcomes = outcomes,
dalys_recipe = dalys_recipe),
"Processed %s scenarios for modelling group: %s, disease: %s",
length(scenarios), touchpoint$modelling_group, touchpoint$disease)
if (!is.null(pre_aggregation_path)) {
timed(write_pre_aggregated_to_disk(scenario_data, touchpoint,
"Wrote all pre-aggregated output to dir %s", pre_aggregation_path)
all_aggregated <- timed(aggregate_data(scenario_data),
"Finished aggregating all scenario data")
paths <- timed(write_output_to_disk(all_aggregated, out_path,
modelling_group, disease),
"Wrote all output to dir %s", out_path)
# Upload to Annex. Only allow possibility of creating new stochastic_file
# table on the first one; it will either fail there, or exist for the later
# three uploads.
if (upload_to_annex) {
timed(write_output_to_annex(paths, con, annex, modelling_group,
disease, touchstone, allow_new_database,
"Wrote outputs to annex")
elapsed <- Sys.time() - start
lg$info("Processing for modelling_group: %s, disease: %s completed in %s",
modelling_group, disease, human_readable_time(elapsed))
all_scenarios <- function(con,
dalys_recipe) {
all_scenarios <- NULL
all_countries <- DBI::dbGetQuery(con, "SELECT id, nid FROM country")
lg <- get_logger()
lg$info("Processing %s scenarios for modelling group: %s, disease: %s",
length(scenarios), touchpoint$modelling_group, touchpoint$disease)
for (scenario_no in seq_along(scenarios)) {
scenario_name <- scenarios[scenario_no]
files <- read_params$files[[scenario_name]]
lg$info("Processing modelling group: %s, disease: %s, scenario (%s/%s): %s",
scenario_data <- process_scenario(con, scenario_name, files,
touchpoint, read_params, outcomes,
dalys_recipe, all_countries)
# If this is the first scenario, then it's easy...
if (is.null(all_scenarios)) {
all_scenarios <- scenario_data
# Otherwise, we need to add new columns with the new scenario
# HOWEVER: there could be different countries in different
# scenarios, so we may need to add countries with NA data to
# make the rows line up.
} else {
all_scenarios <- dplyr::full_join(
all_scenarios, scenario_data,
by = c("country", "year", "run_id", "age"))
process_scenario <- function(con, scenario, files, touchpoint,
read_params, outcomes, dalys_recipe,
countries) {
scenario_data <- list()
lines <- read_params$lines
lg <- get_logger()
for (i in seq_along(files)) {
the_file <- files[i]
lg$info("Reading %s", the_file)
scenario_data[[i]] <-
read_xz_csv(con, the_file, outcomes, dalys_recipe,
read_params$runid_from_file, i,
touchpoint$touchstone, countries,
lines = lines)
# We now have a full scenario. Eliminate age, splitting into
# the four files (calendar/cohort, and u5/all age).
# For now, in the cohort files, I'm going to call cohort
# 'year' - just to keep code tidier, as the code is common...
if (length(scenario_data) == 1) {
scenario_data <- scenario_data[[1]]
} else {
scenario_data <- rbindlist(scenario_data)
for (i in seq_along(outcomes)) {
outcome <- names(outcomes[i])
names(scenario_data)[names(scenario_data) == outcome] <-
paste0(outcome, "_", scenario)
if (!is.null(dalys_recipe)) {
names(scenario_data)[names(scenario_data) == 'dalys'] <-
paste0("dalys_", scenario)
aggregate_data <- function(scenario_data) {
agg_and_sort <- function(data) {
## Define run_id, year and country as NULL to avoid
## R CMD note about no visible binding for global variable
run_id <- year <- country <- age <- NULL
data %>%
dplyr::group_by(run_id, year, country) %>%
dplyr::summarise_all(sum) %>%
dplyr::select(-age) %>% ## Drop age as we have aggregated over it
dplyr::arrange(run_id, country, year)
scen_u5 <- scenario_data[scenario_data$age <= 4 , ]
scen_u5_cal <- agg_and_sort(scen_u5)
scen_u5$year <- scen_u5$year - scen_u5$age
scen_u5_coh <- agg_and_sort(scen_u5)
scen_cal <- agg_and_sort(scenario_data)
scenario_data$year <- scenario_data$year - scenario_data$age
scen_coh <- agg_and_sort(scenario_data)
scen_u5_coh <- scen_u5_coh %>%
dplyr::rename(cohort = year)
scen_coh <- scen_coh %>%
dplyr::rename(cohort = year)
u5_calendar_year = scen_u5_cal,
u5_cohort = scen_u5_coh,
all_calendar_year = scen_cal,
all_cohort = scen_coh
# Read a single csv.xz file, summing the outcomes into the three
# we want, ignoring the columns we don't want, and
read_large_file <- function(...) {
tmp <- tempfile()
unlink(tmp, recursive = TRUE)
readr::read_csv(..., lazy = FALSE))
calc_outcomes <- function(csv, outcomes, single_outcome) {
# If the outcome we want is the sum of other outcomes...
if (!identical(outcomes, single_outcome)) {
total <- csv[[outcomes[1]]]
if (length(outcomes) > 1) {
for (i in 2:length(outcomes)) {
total <- total + csv[[outcomes[i]]]
csv[[single_outcome]] <- total
read_xz_csv <- function(con, the_file, outcomes, dalys_recipe, allow_missing_disease,
runid_from_file, run_id, touchstone, countries,
lines) {
if (is.data.frame(dalys_recipe)) {
dalys_cols <- unique(dalys_recipe$outcome)
meta_cols <- unique(unlist(outcomes))
col_list <- list(
year = readr::col_integer(),
age = readr::col_integer(),
country = readr::col_character(),
country_name = readr::col_skip(),
cohort_size = readr::col_skip()
if (!runid_from_file) {
col_list[['run_id']] = readr::col_integer()
if (!allow_missing_disease) {
col_list[['disease']] <- col_skip()
for (outcome in meta_cols) {
if (!is.na(outcome)) {
col_list[[outcome]] <- readr::col_double()
columns <- do.call(readr::cols_only, col_list)
csv <- suppressMessages(as.data.table(
col_types = columns,
progress = FALSE, na = "NA",
n_max = lines)
for (n in names(csv)) {
csv[[n]][is.na(csv[[n]])] <- 0
if (runid_from_file) {
csv[['run_id']] <- run_id
csv$country <- countries$nid[match(csv$country, countries$id)]
if (is.data.frame(dalys_recipe)) {
res <- stoner_calculate_dalys(con, touchstone, csv,
dalys_recipe, cache$life_table)
csv <- res$data
if (is.null(cache$life_table)) {
cache$life_table <- res$life_table
for (i in seq_along(outcomes)) {
csv <- calc_outcomes(csv, outcomes[[i]], names(outcomes)[i])
csv <- as.data.frame(csv)
cols <- unique(c("run_id", "year", "age", "country", names(outcomes), if (!is.null(dalys_recipe)) "dalys"))
csv[, cols]
write_pre_aggregated_to_disk <- function(data, touchpoint,
pre_aggregation_path) {
countries <- unique(data$country)
for (country in countries) {
path <- file.path(pre_aggregation_path,
data <- as.data.frame(data)
qs::qsave(data[data$country == country, ], path)
}, "Saved %s size %s", path, prettyunits::pretty_bytes(file.size(path)))
write_output_to_disk <- function(output, out_path, modelling_group, disease) {
all_u5_cal_file <- file.path(out_path, sprintf("%s_%s_calendar_u5.qs",
modelling_group, disease))
timed(qs::qsave(x = as.data.frame(output$u5_calendar_year),
file = all_u5_cal_file),
"Saved %s size %s", all_u5_cal_file,
all_cal_file <- file.path(out_path, sprintf("%s_%s_calendar.qs",
modelling_group, disease))
timed(qs::qsave(x = as.data.frame(output$all_calendar_year),
file = all_cal_file),
"Saved %s size %s", all_u5_cal_file,
all_u5_coh_file <- file.path(out_path, sprintf("%s_%s_cohort_u5.qs",
modelling_group, disease))
timed(qs::qsave(x = as.data.frame(output$u5_cohort),
file = all_u5_coh_file),
"Saved %s size %s", all_u5_cal_file,
all_coh_file <- file.path(out_path, sprintf("%s_%s_cohort.qs",
modelling_group, disease))
timed(qs::qsave(x = as.data.frame(output$all_cohort),
file = all_coh_file),
"Saved %s size %s", all_u5_cal_file,
all_u5_cal_file = all_u5_cal_file,
all_u5_coh_file = all_u5_coh_file,
all_cal_file = all_cal_file,
all_coh_file = all_coh_file
write_output_to_annex <- function(paths, con, annex, modelling_group,
disease, touchstone, allow_new_database,
testing) {
paths$all_u5_cal_file, con, annex, modelling_group, disease,
touchstone, is_cohort = FALSE, is_under5 = TRUE,
allow_new_database = allow_new_database, testing = testing)
paths$all_u5_coh_file, con, annex, modelling_group, disease,
touchstone, is_cohort = TRUE, is_under5 = TRUE, testing = testing)
paths$all_cal_file, con, annex, modelling_group, disease,
touchstone, is_cohort = FALSE, is_under5 = FALSE, testing = testing)
paths$all_coh_file, con, annex, modelling_group, disease,
touchstone, is_cohort = TRUE, is_under5 = FALSE, testing = testing)
stochastic_process_validate <- function(con, touchpoint, scenarios, in_path,
files, index_start, index_end,
cert, bypass_cert_check,
logger) {
if (upload_to_annex) {
assert_db_value_exists(con, "modelling_group", "id", touchpoint$modelling_group)
assert_db_value_exists(con, "disease", "id", touchpoint$disease)
assert_db_value_exists(con, "touchstone", "id", touchpoint$touchstone)
if (!is.null(dalys_recipe)) {
c("average_duration", "disability_weight", "outcome", "proportion")))
# Can't specify both a DALYs sum and a recipe.
stopifnot(!"dalys" %in% names(outcomes))
if (!file.exists(in_path)) {
stop(sprintf("Input path not found: %s", in_path))
if (!file.exists(out_path)) {
stop(sprintf("Output path not found: %s", out_path))
if (!is.null(pre_aggregation_path)) {
if (!file.exists(pre_aggregation_path)) {
stop(sprintf("Pre aggregation output path not found: %s",
# Certificate check (if enabled).
if (!is.na(cert)) {
cert <- file.path(in_path, cert)
if (!bypass_cert_check) {
if (!file.exists(cert)) {
stop(sprintf("Certificate not found: %s", cert))
stone_stochastic_cert_verify(con, cert, touchpoint$modelling_group,
touchpoint$touchstone, touchpoint$disease)
# Check number of file patterns is 1 for all, or 1 per scenario
if (!(length(files) %in% c(1, length(scenarios)))) {
stop(sprintf("Incorrect files param - length should be 1 or %s",
# index_start and index_end are either NA, or a single integer,
# or a vector of integer for each scenario
if (!(length(index_start) %in% c(1, length(scenarios)))) {
stop(sprintf("Incorrect index_start - can be NA, or length 1 or %s",
if (!(length(index_end) %in% c(1, length(scenarios)))) {
stop(sprintf("Incorrect index_end - can be NA, or length 1 or %s",
# Check index_start and index_end are all integer or NA
all_na_or_int <- function(x) {
is.numeric(x) || all(is.na(x))
if (!all_na_or_int(index_start)) {
stop("index_start must be all NA or integers")
if (!all_na_or_int(index_end)) {
stop("index_end must be all NA or integers")
# Where index_start is NA, index_end must also be NA
if (!identical(which(is.na(index_start)),
which(is.na(index_end)))) {
stop("Mismatches of NA between index_start and index_end")
# Check whenever index_start is !NA, files contains :index
# (and the converse also holds)
if (length(files) == 1) {
files <- rep(files, length(scenarios))
if (length(index_start) == 1) {
index_start <- rep(index_start, length(files))
if (length(index_end) == 1) {
index_end <- rep(index_end, length(files))
if (!identical(grepl(":index", files), !is.na(index_start))) {
stop("Mismatch between NA in index_start, and :index placeholder in files")
if (runid_from_file) {
if (any(index_start != 1) || any(index_end != 200)) {
stop("Must have index_start and index_end as 1..200 to imply run_id")
for (scenario in scenarios) {
stochastic_validate_scenario(con, touchpoint, scenario)
for (i in seq_along(outcomes)) {
check_outcome(con, names(outcomes)[i], outcomes[[i]])
validate_paths(file.path(in_path, files), scenarios,
touchpoint, index_start, index_end)
validate_paths <- function(files, scenarios, touchpoint,
index_start, index_end) {
scenario_substitute_path <- function(scenario_no) {
if (length(index_start) != 1) {
index_from <- index_start[scenario_no]
index_to <- index_end[scenario_no]
} else {
index_from <- index_start
index_to <- index_end
if (is.na(index_from)) {
index_from <- 1
index_to <- 1
scenario <- scenarios[scenario_no]
files <- lapply(index_from:index_to, function(i) {
the_file <- files[scenario_no]
the_file <- gsub(":index", i, the_file)
the_file <- gsub(":group", touchpoint$modelling_group, the_file)
the_file <- gsub(":touchstone", touchpoint$touchstone, the_file)
the_file <- gsub(":disease", touchpoint$disease, the_file)
the_file <- gsub(":scenario", scenario, the_file)
if (!file.exists(the_file)) {
stop(sprintf("File not found: %s", the_file))
paths <- lapply(seq_along(scenarios), scenario_substitute_path)
names(paths) <- scenarios
check_outcome <- function(con, type, options) {
if (any(duplicated(options))) {
stop(sprintf("Duplicated outcome in %s", type))
res <- DBI::dbGetQuery(con, sprintf("
SELECT * FROM burden_outcome
WHERE code IN %s", sql_in(options)))
missing <- options[!options %in% res$code]
if (length(missing) > 0) {
stop(sprintf("Outcomes not found, %s %s",
type, sql_in(missing)))
stochastic_validate_scenario <- function(con, touchpoint, scenario) {
# Scenario-specific tests:
# 1. (touchstone, scenario_description) exists in scenario table
# 2. (scenario_description, disease) exists in scenario_description table
# 3. (touchstone, modelling_group) exists in responsibility_set table
# 4. (responsibility_set, scenario) exist in responsibility, where
# scenario id comes from #1, and responsibility_set id from #3
scenario_id <- DBI::dbGetQuery(con, "
SELECT id FROM scenario WHERE touchstone = $1
AND scenario_description = $2",
list(touchpoint$touchstone, scenario))$id
if (length(scenario_id) != 1) {
stop(sprintf("scenario %s not found in touchstone %s",
scenario, touchpoint$touchstone))
scenario_descs <- DBI::dbGetQuery(con, "
SELECT count(*) FROM scenario_description WHERE id = $1
AND disease = $2", list(scenario, touchpoint$disease))$count
if (scenario_descs != 1) {
stop(sprintf("scenario_description %s not valid for disease %s",
scenario, touchpoint$disease))
respset_id <- DBI::dbGetQuery(con, "
SELECT id FROM responsibility_set WHERE touchstone = $1
AND modelling_group = $2", list(touchpoint$touchstone,
if (length(respset_id) != 1) {
stop(sprintf("No responsibility_set for group %s in touchstone %s",
touchpoint$modelling_group, touchpoint$touchstone))
resp_id <- DBI::dbGetQuery(con, "
SELECT id FROM responsibility WHERE responsibility_set = $1
AND scenario = $2", list(respset_id, scenario_id))$id
if (length(resp_id) != 1) {
stop(sprintf("No responsibility for group %s, scenario %s, touchstone %s",
touchpoint$modelling_group, scenario, touchpoint$touchstone))
# Possibly, we could check that all scenarios are included, but
# the exceptions are the groups that have to do a VIS report, as they
# have several additional scenarios for which they provide central but
# not stochastic estimates. So not sure if this is so easy to insist on.
