R/stochastic_upload.R

Defines functions read_stochastic_qs read_stochastic_csv read_processed_stochastic_data stone_stochastic_upload

Documented in stone_stochastic_upload

##' Upload the reduced csvs produced by stoner_stochastic_process into
##' the annex database.
##' @export
##' @title Upload stochastic data to annex
##' @import data.table
##' @import readr
##' @importFrom utils read.csv
##' @param file A qs or csv file generated by stone_stochastic_process
##' @param con DBI connection to production (for argument validation)
##' @param annex DBI connection to annex, to receive stochastic uploads.
##' @param modelling_group The modelling group id
##' @param disease The disease
##' @param touchstone The touchstone (including version) for these estimates
##' @param is_cohort Set this to TRUE if the csv file is cohort-oriented, or
##' FALSE for calendar-year oriented.
##' @param is_under5 Set this to TRUE if the csv file only considers ages
##' 0-4 inclusive; FALSE if all ages are included.
##' @param allow_new_database Create the stochastic_file database if it
##' does not exist. Should only be needed first time on a new database.
##' @param testing For internal use only.

stone_stochastic_upload <- function(file, con, annex, modelling_group,
                                    disease, touchstone, is_cohort,
                                    is_under5, allow_new_database = FALSE,
                                    testing = FALSE) {

  initialise_stochastic_file_db <- function(con) {
    DBI::dbExecute(con, '
    CREATE TABLE "stochastic_file" (
    "id" SERIAL ,
    "touchstone" TEXT NOT NULL ,
    "modelling_group" TEXT NOT NULL ,
    "disease" TEXT NOT NULL ,
    "is_cohort" BOOLEAN NOT NULL ,
    "is_under5" BOOLEAN NOT NULL,
    "version" INTEGER NOT NULL DEFAULT 1,
    "creation_date" DATE NOT NULL DEFAULT NOW())')
  }

  stopifnot(file.exists(file))

  assert_connection(con)
  assert_connection(annex)

  assert_scalar_character(modelling_group)
  if (!db_exists(con, "modelling_group", "id", modelling_group)) {
    stop(sprintf("Unknown modelling group: %s", modelling_group))
  }

  assert_scalar_character(disease)
  if (!db_exists(con, "disease", "id", disease)) {
    stop(sprintf("Unknown disease: %s", disease))
  }

  assert_scalar_character(touchstone)
  if (!db_exists(con, "touchstone", "id", touchstone)) {
    stop(sprintf("Unknown touchstone: %s", touchstone))
  }

  if (!"stochastic_file" %in% DBI::dbListTables(annex)) {
    if (allow_new_database) {
      initialise_stochastic_file_db(annex)
    } else {
      stop("stochastic_file database table not found")
    }
  }

  assert_scalar_logical(is_cohort)
  assert_scalar_logical(is_under5)

  data <- read_processed_stochastic_data(file, is_cohort)

  # See if we've got previous history for this combo...

  annex_meta <- DBI::dbGetQuery(annex, "
    SELECT * FROM stochastic_file
     WHERE (modelling_group=$1)
       AND (disease=$2)
       AND (touchstone=$3)
       AND (is_cohort=$4)
       AND (is_under5=$5)",
    list(modelling_group, disease, touchstone, is_cohort, is_under5))

  # New data. Add an entry and get ID.

  if (nrow(annex_meta) == 0) {
    id <- as.integer(DBI::dbGetQuery(annex, "
      INSERT INTO stochastic_file (touchstone, modelling_group, disease,
                                   is_cohort, is_under5, version)
           VALUES ($1,$2,$3,$4,$5,$6)
        RETURNING id",
      list(touchstone, modelling_group, disease,
           is_cohort, is_under5, 1))$id)

  # Then create the new stochastic table

    sqlcols <- ""

    for (col in seq_along(names(data))) {
      if (col!=1) sqlcols <- paste0(sqlcols, ", ")
      colname <- names(data)[col]

      if (colname %in% c("year", "country", "run_id", "cohort")) {
        sqlcols <- sprintf(' %s "%s" INTEGER ', sqlcols, colname)
      } else {
        sqlcols <- sprintf(' %s "%s" DECIMAL ', sqlcols, colname)
      }
    }

    table_name <- sprintf("stochastic_%s", id)

    message(sprintf("Creating table %s",table_name))
    DBI::dbExecute(annex, sprintf('
      CREATE TABLE "%s" (%s)', table_name, sqlcols))

  } else {
    id <- as.integer(annex_meta$id)
    version <- as.integer(annex_meta$version) + 1L

    DBI::dbExecute(annex, "
      UPDATE stochastic_file
         SET version = $1,
             creation_date = NOW()
       WHERE id = $2", list(version, id))

    table_name <- sprintf("stochastic_%s", id)
    message("Overwriting table with id ",id)
    if (table_name %in% DBI::dbListTables(annex)) {
      DBI::dbExecute(annex, sprintf("DROP TABLE %s", table_name))
    }
  }

  # All ready - write the data.

  message(sprintf("Writing data"))
  DBI::dbWriteTable(annex, table_name, data, overwrite = TRUE)

  # On annex, we need to make the new tables public - but not
  # when we're testing. Want to do this in a way that preserves
  # code coverage...

  grant <- "GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly"

  if (testing) {
    grant <- paste("/*", grant, " */")
  }

  DBI::dbExecute(annex, grant)
  invisible()
}

read_processed_stochastic_data <- function(file, is_cohort) {
  type <- tools::file_ext(file)

  if (type == "csv") {
    read_stochastic_csv(file, is_cohort)
  } else if (type == "qs") {
    read_stochastic_qs(file, is_cohort)
  } else {
    stop(paste0("Can only read csv or qs format stochastic data, got ", type))
  }
}

read_stochastic_csv <- function(file, is_cohort) {
  if (is_cohort) {
    col_types <- readr::cols(cohort = readr::col_integer(),
                             country = readr::col_integer(),
                             run_id = readr::col_integer(),
                             .default = readr::col_guess())
    expected_cols <- c("cohort", "country", "run_id")
  } else {
    col_types <- readr::cols(year = readr::col_integer(),
                             country = readr::col_integer(),
                             run_id = readr::col_integer(),
                             .default = readr::col_guess())
    expected_cols <- c("year", "country", "run_id")
  }
  # Test columns before reading...

  first_line <- read.csv(file, nrows = 1)
  if (anyNA(match(expected_cols, names(first_line)))) {
    stop("Columns in csv file not as expected")
  }

  message(sprintf("Reading %s", file))
  readr::read_csv(file, col_types = col_types, progress = FALSE)
}

read_stochastic_qs <- function(file, is_cohort) {
  if (is_cohort) {
    expected_cols <- c("cohort", "country", "run_id")
  } else {
    expected_cols <- c("year", "country", "run_id")
  }

  ## Read data
  message(sprintf("Reading %s", file))
  data <- qs::qread(file)
  if (anyNA(match(expected_cols, colnames(data)))) {
    stop("Columns in qs file not as expected")
  }
  data
}
vimc/stoner documentation built on May 16, 2024, 11:09 a.m.