##' Upload the reduced csvs produced by stoner_stochastic_process into
##' the annex database.
##' @export
##' @title Upload stochastic data to annex
##' @import data.table
##' @import readr
##' @importFrom utils read.csv
##' @param file A qs or csv file generated by stone_stochastic_process
##' @param con DBI connection to production (for argument validation)
##' @param annex DBI connection to annex, to receive stochastic uploads.
##' @param modelling_group The modelling group id
##' @param disease The disease
##' @param touchstone The touchstone (including version) for these estimates
##' @param is_cohort Set this to TRUE if the csv file is cohort-oriented, or
##' FALSE for calendar-year oriented.
##' @param is_under5 Set this to TRUE if the csv file only considers ages
##' 0-4 inclusive; FALSE if all ages are included.
##' @param allow_new_database Create the stochastic_file database if it
##' does not exist. Should only be needed first time on a new database.
##' @param testing For internal use only.
stone_stochastic_upload <- function(file, con, annex, modelling_group,
disease, touchstone, is_cohort,
is_under5, allow_new_database = FALSE,
testing = FALSE) {
initialise_stochastic_file_db <- function(con) {
DBI::dbExecute(con, '
CREATE TABLE "stochastic_file" (
"id" SERIAL ,
"touchstone" TEXT NOT NULL ,
"modelling_group" TEXT NOT NULL ,
"disease" TEXT NOT NULL ,
"is_cohort" BOOLEAN NOT NULL ,
"is_under5" BOOLEAN NOT NULL,
"version" INTEGER NOT NULL DEFAULT 1,
"creation_date" DATE NOT NULL DEFAULT NOW())')
}
stopifnot(file.exists(file))
assert_connection(con)
assert_connection(annex)
assert_scalar_character(modelling_group)
if (!db_exists(con, "modelling_group", "id", modelling_group)) {
stop(sprintf("Unknown modelling group: %s", modelling_group))
}
assert_scalar_character(disease)
if (!db_exists(con, "disease", "id", disease)) {
stop(sprintf("Unknown disease: %s", disease))
}
assert_scalar_character(touchstone)
if (!db_exists(con, "touchstone", "id", touchstone)) {
stop(sprintf("Unknown touchstone: %s", touchstone))
}
if (!"stochastic_file" %in% DBI::dbListTables(annex)) {
if (allow_new_database) {
initialise_stochastic_file_db(annex)
} else {
stop("stochastic_file database table not found")
}
}
assert_scalar_logical(is_cohort)
assert_scalar_logical(is_under5)
data <- read_processed_stochastic_data(file, is_cohort)
# See if we've got previous history for this combo...
annex_meta <- DBI::dbGetQuery(annex, "
SELECT * FROM stochastic_file
WHERE (modelling_group=$1)
AND (disease=$2)
AND (touchstone=$3)
AND (is_cohort=$4)
AND (is_under5=$5)",
list(modelling_group, disease, touchstone, is_cohort, is_under5))
# New data. Add an entry and get ID.
if (nrow(annex_meta) == 0) {
id <- as.integer(DBI::dbGetQuery(annex, "
INSERT INTO stochastic_file (touchstone, modelling_group, disease,
is_cohort, is_under5, version)
VALUES ($1,$2,$3,$4,$5,$6)
RETURNING id",
list(touchstone, modelling_group, disease,
is_cohort, is_under5, 1))$id)
# Then create the new stochastic table
sqlcols <- ""
for (col in seq_along(names(data))) {
if (col!=1) sqlcols <- paste0(sqlcols, ", ")
colname <- names(data)[col]
if (colname %in% c("year", "country", "run_id", "cohort")) {
sqlcols <- sprintf(' %s "%s" INTEGER ', sqlcols, colname)
} else {
sqlcols <- sprintf(' %s "%s" DECIMAL ', sqlcols, colname)
}
}
table_name <- sprintf("stochastic_%s", id)
message(sprintf("Creating table %s",table_name))
DBI::dbExecute(annex, sprintf('
CREATE TABLE "%s" (%s)', table_name, sqlcols))
} else {
id <- as.integer(annex_meta$id)
version <- as.integer(annex_meta$version) + 1L
DBI::dbExecute(annex, "
UPDATE stochastic_file
SET version = $1,
creation_date = NOW()
WHERE id = $2", list(version, id))
table_name <- sprintf("stochastic_%s", id)
message("Overwriting table with id ",id)
if (table_name %in% DBI::dbListTables(annex)) {
DBI::dbExecute(annex, sprintf("DROP TABLE %s", table_name))
}
}
# All ready - write the data.
message(sprintf("Writing data"))
DBI::dbWriteTable(annex, table_name, data, overwrite = TRUE)
# On annex, we need to make the new tables public - but not
# when we're testing. Want to do this in a way that preserves
# code coverage...
grant <- "GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly"
if (testing) {
grant <- paste("/*", grant, " */")
}
DBI::dbExecute(annex, grant)
invisible()
}
read_processed_stochastic_data <- function(file, is_cohort) {
type <- tools::file_ext(file)
if (type == "csv") {
read_stochastic_csv(file, is_cohort)
} else if (type == "qs") {
read_stochastic_qs(file, is_cohort)
} else {
stop(paste0("Can only read csv or qs format stochastic data, got ", type))
}
}
read_stochastic_csv <- function(file, is_cohort) {
if (is_cohort) {
col_types <- readr::cols(cohort = readr::col_integer(),
country = readr::col_integer(),
run_id = readr::col_integer(),
.default = readr::col_guess())
expected_cols <- c("cohort", "country", "run_id")
} else {
col_types <- readr::cols(year = readr::col_integer(),
country = readr::col_integer(),
run_id = readr::col_integer(),
.default = readr::col_guess())
expected_cols <- c("year", "country", "run_id")
}
# Test columns before reading...
first_line <- read.csv(file, nrows = 1)
if (anyNA(match(expected_cols, names(first_line)))) {
stop("Columns in csv file not as expected")
}
message(sprintf("Reading %s", file))
readr::read_csv(file, col_types = col_types, progress = FALSE)
}
read_stochastic_qs <- function(file, is_cohort) {
if (is_cohort) {
expected_cols <- c("cohort", "country", "run_id")
} else {
expected_cols <- c("year", "country", "run_id")
}
## Read data
message(sprintf("Reading %s", file))
data <- qs::qread(file)
if (anyNA(match(expected_cols, colnames(data)))) {
stop("Columns in qs file not as expected")
}
data
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.