R/WorkDatabase.R

Defines functions dbWork_update_status dbWork_update_granular dbWork_update_job recreate_dbWork dbWork_check_design dbWork_check_status dbWork_check_granular dbWork_report_completion dbWork_check_run dbWork_redo dbWork_agg_timing dbWork_timing dbWork_Ntodo dbWork_todos dbWork_clean dbWork_checkpoint setup_dbWork dbWork_update_IncludeYN add_status_dbWork add_granular_dbWork create_job_df colnames_need_outputs_dbWork colnames_modification_status_dbWork colnames_work_dbWork create_dbWork fname_dbWork add_dbWork_index

Documented in add_granular_dbWork add_status_dbWork create_dbWork dbWork_agg_timing dbWork_check_design dbWork_check_granular dbWork_checkpoint dbWork_check_run dbWork_check_status dbWork_clean dbWork_Ntodo dbWork_redo dbWork_report_completion dbWork_timing dbWork_todos dbWork_update_granular dbWork_update_IncludeYN dbWork_update_job dbWork_update_status recreate_dbWork setup_dbWork

########################
#------ dbWork functions

add_dbWork_index <- function(con) {
  stopifnot(dbIsValid(con))

  prev_indices <- dbGetQuery(con,
    "SELECT * FROM sqlite_master WHERE type = 'index'")

  if (dbExistsTable(con, "work") && (NROW(prev_indices) == 0L ||
    !("i_runIDs" %in% prev_indices[, "name"]))) {

    sql <- paste("CREATE INDEX i_runIDs ON work (runID_total, runID_sites,",
      "include_YN)")
    dbExecute(con, sql)
  }

  if (dbExistsTable(con, "need_outputs") && (NROW(prev_indices) == 0L ||
    !("i_needIDs" %in% prev_indices[, "name"]))) {

    sql <- paste("CREATE INDEX i_needIDs ON need_outputs (Pid, runID_total,",
      "include_YN)")
    dbExecute(con, sql)
  }
}


fname_dbWork <- function(path, dbname = "dbWork.sqlite3") {
  if (grepl(".sql", basename(path))) {
    path <- dirname(path)
  }

  file.path(path, dbname)
}



#' Create a \var{SQLite}-database \code{dbWork} to manage runs of a \pkg{rSFSW2}
#' simulation project
#'
#' @param path A character string. Path to the folder where the database will be
#'   created.
#' @param jobs An integer matrix. Each row corresponds to one call of the
#'   simulation function \code{do_OneSite}, i.e., \code{runsN_master} x
#'   \code{expN}. The columns \code{runID_total}, \code{runID_sites},
#'   \code{include_YN} represent a running ID, the \code{site_id} (row number in
#'   master input file), and a flag whether site is being simulated or not. See
#'   \code{\link{indices}}.
#'
#' @return Invisibly \code{TRUE}
create_dbWork <- function(path, jobs) {

  stopifnot(colnames_work_dbWork() %in% dimnames(jobs)[[2]])
  dbWork <- fname_dbWork(path)

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RWC)
  on.exit(dbDisconnect(con), add = TRUE)

  dbExecute(con,
    paste("CREATE TABLE work(runID_total INTEGER PRIMARY KEY,",
    "runID_sites INTEGER NOT NULL, include_YN INTEGER NOT NULL,",
    "completed INTEGER NOT NULL, failed INTEGER NOT NULL,",
    "inwork INTEGER NOT NULL, time_s REAL)"))

  dbWriteTable(con, "work", append = TRUE, value = as.data.frame(jobs))
  add_dbWork_index(con)

  add_status_dbWork(con)

  # Set WAL-mode (http://www.sqlite.org/wal.html)
  temp_jmode <- dbGetQuery(con, "PRAGMA journal_mode=WAL;")
  if (!(tolower(as.character(temp_jmode)) == "wal")) {
    print(paste("'create_dbWork': setting WAL mode for dbWork failed;",
      "journal mode is instead", shQuote(temp_jmode)))
  }

  # Set synchronous mode to OFF=0
  #   (https://www.sqlite.org/pragma.html#pragma_synchronous)
  # Fast but data may be lost if power fails
  # no need to set this PRAGMA, because `dbConnect` does it by default

  # Turn off busy timeout
  #   (https://www.sqlite.org/pragma.html#pragma_busy_timeout)
  # no need to set this PRAGMA, because `dbConnect` does it by default


  invisible(TRUE)
}


colnames_work_dbWork <- function() {
  c("runID_total", "runID_sites", "include_YN", "completed", "failed",
    "inwork", "time_s")
}

colnames_modification_status_dbWork <- function() {
  c("status", "time_stamp")
}

colnames_need_outputs_dbWork <- function() {
  c("Pid", "runID_total", "include_YN")
}


create_job_df <- function(sim_size, include_YN) {
  temp <- colnames_work_dbWork()
  jobs <- matrix(data = 0L, nrow = sim_size[["runsN_total"]],
    ncol = length(temp), dimnames = list(NULL, temp))

  jobs[, "runID_total"] <- seq_len(sim_size[["runsN_total"]])
  jobs[, "runID_sites"] <- rep(seq_len(sim_size[["runsN_master"]]),
    times = max(sim_size[["expN"]], 1L))
  temp <- rep(include_YN, times = max(sim_size[["expN"]], 1L))
  jobs[temp, "include_YN"] <- 1L

  jobs
}


#' Create new table \code{need_outputs} for granular control of individual
#' output elements for each \code{Pid x table} combination
#'
#' The table \code{need_outputs} has one row/record for each \code{P_id = Pid}
#' and contains columns \code{Pid}, \code{runID_total} (that links to table
#' \code{work}), and one column for each output table of \code{dbOutput} with
#' the same names. Those fields contain values \itemize{ \item -1 No output is
#' requested, e.g., site is excluded from simulation project by input setting of
#' \code{include_YN == 0}, \item 0 FALSE, output is not needed (any more), i.e.,
#' output is already produced, \item 1 TRUE, output is needed, i.e., output has
#' not yet been generated. }
#'
#' @inheritParams create_dbWork
#' @return A logical value.
add_granular_dbWork <- function(SFSW2_prj_meta) {
  dbWork <- fname_dbWork(SFSW2_prj_meta[["project_paths"]][["dir_out"]])
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RW)
  on.exit(dbDisconnect(con), add = TRUE)

  if (dbExistsTable(con, "need_outputs")) {
    res <- TRUE

  } else {
    #--- Create new table 'need_outputs' for granular control;
    # with default -1 = no output requested (e.g., Include_YN == 0)
    stopifnot(file.exists(SFSW2_prj_meta[["fnames_out"]][["dbOutput"]]))

    out_tables <- dbOutput_ListOutputTables(
      dbname = SFSW2_prj_meta[["fnames_out"]][["dbOutput"]])
    temp_tables <- paste(dbQuoteIdentifier(con, out_tables),
      "INTEGER DEFAULT -1", collapse = ", ")

    sql <- paste0("CREATE TABLE need_outputs(Pid INTEGER PRIMARY KEY, ",
      "runID_total INTEGER, include_YN INTEGER, ", temp_tables, ")")
    dbExecute(con, sql)

    #--- Create table content: add one row per Pid
    temp_pids <- seq_len(SFSW2_prj_meta[["sim_size"]][["runsN_Pid"]])
    temp_runIDs <- it_sim2(temp_pids, SFSW2_prj_meta[["sim_scens"]][["N"]])

    sql <- paste("SELECT runID_total, include_YN FROM work",
      "WHERE runID_total IN (?)",
      "ORDER BY runID_total")
    rs <- dbSendStatement(con, sql)
    dbBind(rs, list(unique(temp_runIDs)))
    temp_include_YN <- dbFetch(rs)
    dbClearResult(rs)

    df_granular <- data.frame(matrix(NA, nrow = length(temp_pids),
      ncol = 3L + length(out_tables), dimnames = list(NULL,
        c("Pid", "runID_total", "include_YN", out_tables))))

    df_granular[, "Pid"] <- temp_pids
    df_granular[, "runID_total"] <- temp_runIDs
    ids <- match(temp_runIDs, temp_include_YN[, "runID_total"], nomatch = 0)
    df_granular[, "include_YN"] <- temp_include_YN[ids, "include_YN"]
    df_granular[, out_tables] <-
      ifelse(df_granular[, "include_YN"] > 0, 1L, -1L)

    res <- dbWriteTable(con, "need_outputs", value = df_granular,
      row.names = FALSE, append = TRUE)
  }

  add_dbWork_index(con)

  invisible(res)
}

#' Create a new table \code{modification_status} with one row for status control
#' of entire \code{dbWork}
#'
#' First check whether such a table with one row exists, if so, don't overwrite.
#'
#' The table \code{modification_status} consists of one row/record/entry and has
#' two fields:
#' \itemize{
#'   \item \code{status} with value FALSE/0 for \code{not modified} and
#'     TRUE/1 for \code{modified}.
#'   \item \code{time_stamp} with the \code{\link{POSIXct}} value of the
#'     last time the \code{status} was updated. }
#'
#' @param con A valid \code{SQLiteConnection} database connection to
#'   \code{dbWork}.
add_status_dbWork <- function(con) {
  has_table <- dbExistsTable(con, "modification_status")
  sql <- "SELECT COUNT(*) FROM modification_status"
  create_new <- !(has_table && length(dbGetQuery(con, sql)) == 1L)

  if (create_new) {
    if (has_table) {
      sql <- "DROP TABLE modification_status"
      dbExecute(con, sql)
    }

    sql <- paste("CREATE TABLE modification_status(status INTEGER,",
      "time_stamp INTEGER)")
    dbExecute(con, sql)

    # Set initial status and assume that dbWork is in sync,
    # i.e., dbOut is not modified
    sql <- paste0("INSERT INTO modification_status ",
      "VALUES (", 0L, ", ", as.integer(Sys.time()), ")")
    dbExecute(con, sql)
  }
}


#' Update \var{\sQuote{include_YN}}
dbWork_update_IncludeYN <- function(con, table, id_name, has_include_YN,
  should_include_YN) {

  if (!identical(has_include_YN, should_include_YN)) {
    Yes <- 1L
    No <- 0L

    sql <- paste("UPDATE", dbQuoteIdentifier(con, table),
      "SET include_YN = :yn WHERE", dbQuoteIdentifier(con, id_name),
      "= :x")
    rs <- dbSendStatement(con, sql)

    # now excluded and previously included
    iwork <- which(has_include_YN == Yes & should_include_YN == No)
    n <- length(iwork)
    if (n > 0) {
      dbBind(rs, params = list(yn = rep(No, n), x = iwork))
    }

    # now included and previously excluded
    iwork <- which(has_include_YN == No & should_include_YN == Yes)
    n <- length(iwork)
    if (n > 0) {
      dbBind(rs, params = list(yn = rep(Yes, n), x = iwork))
    }

    dbClearResult(rs)
  }
}


#' Setup or connect to \var{SQLite}-database \code{dbWork} to manage runs of a
#' \pkg{rSFSW2} simulation project
#'
#' \code{dbWork} tracks completion of each \code{runID} with table \code{work},
#' i.e., an entire call to \code{\link{do_OneSite}}. If your project requires a
#' finer granularity of output management, then set the
#' \code{use_granular_control} in the project description and pass
#' \code{SFSW2_prj_meta}; in that case, the function calls
#' \code{\link{add_granular_dbWork}} to add the table \code{need_outputs} to
#' \code{dbWork}.
#'
#' @inheritParams create_dbWork
#' @param resume A logical value. If \code{TRUE} and \code{dbWork} exists, then
#'   function connects to the existing database. If \code{FALSE}, then a new
#'   database is created (possibly overwriting an existing one).
#' @param SFSW2_prj_meta An environment. Required if \code{use_granular_control}
#'   is set or if \code{sim_size} or \code{path} are missing. If passed as
#'   argument and \code{resume} and \code{dbWork} exists, then
#'   \code{\link{recreate_dbWork}} is called.
#'
#' @return A logical value indicating success/failure of setting up/connecting
#'   to \code{dbWork} and initializing with \code{runIDs}.
#' @export
setup_dbWork <- function(path, sim_size, include_YN, resume = FALSE,
  SFSW2_prj_meta = NULL) {

  if (missing(path)) {
    path <- if (!is.null(SFSW2_prj_meta)) {
        SFSW2_prj_meta[["project_paths"]][["dir_out"]]
      } else stop("'setup_dbWork': argument 'path' is missing.")
  }

  dbWork <- fname_dbWork(path)
  success <- FALSE
  create <- TRUE

  if (resume) {
    if (file.exists(dbWork)) {
      create <- FALSE
      success <- if (!is.null(SFSW2_prj_meta)) {
          recreate_dbWork(SFSW2_prj_meta)
        } else TRUE
    }

  } else {
    unlink(dbWork)
  }

  if (create) {
    if (missing(sim_size)) {
      sim_size <- if (!is.null(SFSW2_prj_meta)) {
        SFSW2_prj_meta[["sim_size"]]
      } else stop("'setup_dbWork': argument 'sim_size' is missing.")
    }

    temp <- create_dbWork(path, jobs = create_job_df(sim_size, include_YN))
    success <- !inherits(temp, "try-error")

    if (success && !is.null(SFSW2_prj_meta) &&
      isTRUE(SFSW2_prj_meta[["opt_out_fix"]][["use_granular_control"]])) {
      success <- add_granular_dbWork(SFSW2_prj_meta)
    }
  }

  success
}


#' Initiate a checkpoint operation on a \var{SQLite}-database \code{dbWork} of a
#' \pkg{rSFSW2} simulation project
#'
#' @references \url{https://www.sqlite.org/pragma.html#pragma_wal_checkpoint}
dbWork_checkpoint <- function(path = NULL, con = NULL,
  mode = c("PASSIVE", "FULL", "RESTART", "TRUNCATE", ""),
  failure = c("silent", "warning", "error"), verbose = FALSE) {

  mode <- match.arg(mode)
  failure <- match.arg(failure)

  if (is.null(con)) {
    dbWork <- fname_dbWork(path)
    stopifnot(file.exists(dbWork))

    # need write privilege to run wal_checkpoint
    con <- dbConnect2(dbWork, verbose = verbose)

    stopifnot(inherits(con, "SQLiteConnection"))
    on.exit(dbDisconnect(con), add = TRUE)

  } else {
    stopifnot(dbIsValid(con))
  }

  sql <- if (nchar(mode) > 0) {
      paste0("PRAGMA wal_checkpoint(", mode, ")")
    } else {
      "PRAGMA wal_checkpoint"
    }

  res <- try(dbGetQuery(con, sql), silent = TRUE)

  if (!identical(failure, "silent") && (inherits(res, "try-error") ||
      res["busy"] > 0)) {

    msg <- paste("'dbWork_checkpoint': failed to run wal-checkpoint for",
      shQuote(basename(dbWork)), "with", shQuote(paste(res, collapse = "/")))
    if (identical(failure, "warning")) {
      warning(msg)
    } else {
      stop(msg)
    }
  }

  invisible(res)
}


#' Do maintenance work on a \var{SQLite}-database \code{dbWork} if it exists
#'
#' Some code, power, and system failures may leave \code{dbWork} in an
#' incomplete state, e.g., a rollback journal is present, or runs are marked as
#' \var{\dQuote{inwork}} even though no runs are currently being worked on. This
#' function cleans such situations up.
#'
#' @inheritParams create_dbWork
#' @return A logical value
#' @export
dbWork_clean <- function(path, verbose = FALSE) {
  res <- TRUE

  dbWork <- fname_dbWork(path)
  if (file.exists(dbWork)) {

    con <- dbConnect2(dbWork, verbose = verbose)
    res <- inherits(con, "SQLiteConnection")

    if (res) {
      on.exit(dbDisconnect(con), add = TRUE)

      temp_jmode <- tolower(as.character(dbGetQuery(con,
        "PRAGMA journal_mode;")))

      if (temp_jmode == "wal") {
        dbWork_checkpoint(con = con, mode = "TRUNCATE", failure = "error")

      } else {
        dbVacuumRollack(con, dbWork)
      }

      # Are there 'inwork' records?
      # - mark non-complete and non-failed records as incomplete
      sql <- paste("UPDATE work SET completed = 0, failed = 0, inwork = 0,",
        "time_s = 0 WHERE inwork = 1 AND completed != 1 AND failed != 1")
      dbExecute(con, sql)
      # - mark completed records as complete
      sql <- paste("UPDATE work SET completed = 1, failed = 0, inwork = 0",
        "WHERE inwork = 1 AND completed = 1")
      dbExecute(con, sql)
      # - mark failed records as failed
      sql <- paste("UPDATE work SET completed = 0, failed = 1, inwork = 0",
        "WHERE inwork = 1 AND failed = 1")
      dbExecute(con, sql)
    }
  }

  invisible(res)
}


#' Extract identification numbers of runs of a \pkg{rSFSW2} simulation project
#' which are uncompleted and not \var{\dQuote{inwork}}
#'
#' @inheritParams create_dbWork
#' @return An integer vector of \code{runIDs}.
#' @export
dbWork_todos <- function(path) {
  dbWork <- fname_dbWork(path)
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)

  x <- dbGetQuery(con, paste("SELECT runID_total FROM work ",
    "WHERE include_YN = 1 AND completed = 0 AND inwork = 0",
    "ORDER BY runID_total"))
  as.integer(x[, 1])
}


#' Numbers of runs of a \pkg{rSFSW2} simulation project which are uncompleted
#' and not \var{\dQuote{inwork}}
#'
#' @inheritParams create_dbWork
#' @return An integer value.
#' @export
dbWork_Ntodo <- function(path) {
  dbWork <- fname_dbWork(path)
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)

  x <- dbGetQuery(con, paste("SELECT COUNT(*) FROM work",
    "WHERE include_YN = 1 AND completed = 0 AND inwork = 0",
    "ORDER BY runID_total"))
  as.integer(x[, 1])
}


#' Extract stored execution times of completed runs of a \pkg{rSFSW2}
#' simulation project
#'
#' @inheritParams create_dbWork
#' @return A numeric vector of execution time in seconds.
#' @export
dbWork_timing <- function(path) {
  dbWork <- fname_dbWork(path)
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)

  times <- dbGetQuery(con, paste("SELECT time_s FROM work",
    "WHERE include_YN = 1 AND completed > 0"))
  as.numeric(times[, 1])
}


#' Calculate mean and standard deviation of stored execution times of
#' completed runs
#'
#' @inheritParams create_dbWork
#' @return A numeric vector with mean and standard deviation of execution
#'   time in seconds and number of completed runs.
#' @seealso \code{\link{dbWork_timing}}
#' @export
dbWork_agg_timing <- function(path) {
  dbWork <- fname_dbWork(path)
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)

  sql <- paste(
    "WITH done(time_s) AS (SELECT time_s FROM work",
      "WHERE include_YN = 1 AND completed > 0)",
    "SELECT sub.mean AS mean, COUNT(*) AS n,",
      "SUM((time_s - sub.mean) * (time_s - sub.mean)) AS ss from done,",
      "(SELECT AVG(time_s) AS mean FROM done) AS sub")
  temp <- dbGetQuery(con, sql)

  c(mean = temp[, "mean"],
    sd = sqrt(temp[, "ss"] / (temp[, "n"] - 1)),
    n = temp[, "n"])
}




#' Set runs information that need to be redone / simulated (again)
#'
#' @inheritParams create_dbWork
#' @inheritParams dbWork_update_job
#' @param runIDs An integer vector. The identification numbers of queried runs,
#'  i.e., values out of \code{runIDs_total}, see \code{\link{indices}}.
#' @return A logical vector indicating success.
#' @export
dbWork_redo <- function(path, runIDs, verbose = FALSE) {
  res <- TRUE

  if (length(runIDs) > 0) {
    dbWork <- fname_dbWork(path)
    res <- file.exists(dbWork)

    if (res) {
      con <- dbConnect2(dbWork, verbose = verbose)
      res <- inherits(con, "SQLiteConnection")

      if (res) {
        on.exit(dbDisconnect(con), add = TRUE)

        sql <- paste("UPDATE work SET completed = 0, failed = 0,",
          "inwork = 0, time_s = 0 WHERE include_YN = 1 AND runID_total = :x")
        rs <- dbSendStatement(con, sql)
        dbBind(rs, params = list(x = runIDs))
        res <- dbClearResult(rs)
      }
    }
  }

  invisible(res)
}


#' Check run status
#'
#' @inheritParams create_dbWork
#' @inheritParams dbWork_update_job
#' @inheritParams dbWork_redo
#' @return A data.frame with three columns \var{\dQuote{completed}},
#'   \var{\dQuote{failed}}, and \var{\dQuote{inwork}} and one row per
#'   \code{runIDs}.
#' @export
dbWork_check_run <- function(path, runIDs) {
  if (length(runIDs) > 0) {
    dbWork <- fname_dbWork(path)
    stopifnot(file.exists(dbWork))

    con <- dbConnect(SQLite(), dbname = dbWork,
      flags = SQLITE_RO)
    on.exit(dbDisconnect(con), add = TRUE)

    sql <- "SELECT completed, failed, inwork FROM work WHERE runID_total = :x"
    rs <- dbSendStatement(con, sql)
    dbBind(rs, params = list(x = runIDs))
    res <- dbFetch(rs)
    dbClearResult(rs)

  } else {
    res <- data.frame(completed = numeric(0), failed = numeric(0),
      inwork = numeric(0))
  }

  res
}


#' Estimate percentage of completed runs
#'
#' @inheritParams create_dbWork
#' @param use_granular_control A logical value. If \code{TRUE} and the granular
#'   table is present (see \code{\link{add_granular_dbWork}}), then calculate
#'   percentage of completed runs based on granular table instead of \code{work}
#'   table.
#' @param SFSW2_prj_meta An environment. If \code{use_granular_control}, then
#'   required as well as presence of \code{dbOutput}.
#'
#' @return A numeric value in \code{[0,100]} in percent. The proportion of
#'   output units that are reported as complete.
#'
#' @examples
#' \dontrun{
#' # `SFSW2_prj_meta` object as produced, e.g., for `TestPrj4`
#' if (exists("SFSW2_prj_meta")) {
#'   dbWork_report_completion(
#'     path = SFSW2_prj_meta[["project_paths"]][["dir_out"]],
#'     use_granular_control =
#'       SFSW2_prj_meta[["opt_out_fix"]][["use_granular_control"]],
#'     SFSW2_prj_meta = SFSW2_prj_meta)
#' }}
#'
#' @export
dbWork_report_completion <- function(path, use_granular_control = FALSE,
  SFSW2_prj_meta = NULL) {

  dbWork <- fname_dbWork(path)
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)

  if (use_granular_control && "need_outputs" %in% dbListTables(con) &&
    !is.null(SFSW2_prj_meta) &&
    file.exists(SFSW2_prj_meta[["fnames_out"]][["dbOutput"]])) {

    sql <- "SELECT COUNT(*) FROM need_outputs WHERE include_YN = 1"
    N1 <- dbGetQuery(con, sql)
    out_tables <- dbOutput_ListOutputTables(
      dbname = SFSW2_prj_meta[["fnames_out"]][["dbOutput"]])
    N <- N1 * length(out_tables)

    quoted_tables <- dbQuoteIdentifier(con, out_tables)
    n <- 0
    for (k in quoted_tables) {
      sql <- paste("SELECT COUNT(*) FROM need_outputs WHERE include_YN = 1 AND",
        k, "= 0")
      n <- n + dbGetQuery(con, sql)
    }

  } else {
    N <- dbGetQuery(con, "SELECT COUNT(*) FROM work WHERE include_YN = 1")
    n <- dbGetQuery(con,
      "SELECT COUNT(*) FROM work WHERE include_YN = 1 AND completed = 1")
  }

  as.numeric(100 * n / N)
}


#' Check granular run status
#'
#' @inheritParams create_dbWork
#' @inheritParams dbWork_update_job
#' @inheritParams dbWork_redo
#' @return A data.frame with columns as created by
#'   \code{\link{add_granular_dbWork}} and one row per \code{runIDs}.
#' @export
dbWork_check_granular <- function(path, runIDs) {
  dbWork <- fname_dbWork(path)
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)

  if (length(runIDs) > 0) {
    sql <- "SELECT * FROM need_outputs WHERE runID_total = :x"
    rs <- dbSendStatement(con, sql)
    dbBind(rs, params = list(x = runIDs))
    res <- dbFetch(rs)
    dbClearResult(rs)

  } else {
    sql <- "SELECT * FROM need_outputs LIMIT 0"
    res <- dbGetQuery(con, sql)
  }

  res
}


#' Check modification status of \code{dbWork} against \code{dbOut} and
#' \code{dbTempOut}
#'
#' @inheritParams create_dbWork
#' @param SFSW2_prj_meta An environment.
#'
#' @return A logical value. \code{TRUE} if the status of \code{dbWork} claims to
#'   be modified (out of sync) or if its time stamp is older (and suggest to be
#'   out of sync) than any modification time of the output generated for
#'   \code{dbOut} and/or \code{dbTempOut} -- the later is only checked if
#'   \code{SFSW2_prj_meta} is not null; \code{FALSE} if \code{dbWork} is deemed
#'   to be in sync with output progress of project.
#' @export
dbWork_check_status <- function(path, SFSW2_prj_meta = NULL) {
  dbWork <- fname_dbWork(path)
  stopifnot(file.exists(dbWork))

  con <- dbConnect(SQLite(), dbname = dbWork,
    flags = SQLITE_RO)
  on.exit(dbDisconnect(con), add = TRUE)

  # Consider modified if status is set to 'modified'
  sql <- "SELECT * FROM modification_status LIMIT 1"
  ms <- dbGetQuery(con, sql)

  res <- as.logical(ms[1, "status"])

  # If status claims to be not modified, then check that time_stamp is also
  # younger than modification times of dbOut and dbTempOut(s)
  if (!res && !is.null(SFSW2_prj_meta)) {
    tstatus <- as.POSIXct(ms[1, "time_stamp"], origin = "1970-01-01")

    ftemps <- c(SFSW2_prj_meta[["fnames_out"]][["dbOutput"]],
      get_fnames_dbTempOut(SFSW2_prj_meta[["project_paths"]][["dir_out_temp"]]))
    mtimes <- file.mtime(ftemps) # returns NA if file doesn't exist

    res <- any(tstatus < mtimes, na.rm = TRUE)

    if (res) {
      # Status is determined to be modified -> update dbWork
      dbWork_update_status(path = path, status = TRUE)
    }
  }

  res
}




#' Check that \code{dbWork} has an up-to-date structure of tables and fields
#'
#' @inheritParams create_dbWork
#' @param use_granular_control A logical value. If \code{TRUE} and the granular
#'   table is present (see \code{\link{add_granular_dbWork}}), then include the
#'   optional table \code{needs_output} in check.
#' @return A logical value. \code{FALSE} if no \code{dbWork} can be located or
#'   if any required table is missing or if not all required fields are present.
#'
#' @export
dbWork_check_design <- function(path, use_granular_control = FALSE) {
  res <- FALSE

  dbWork <- fname_dbWork(path)

  if (file.exists(dbWork)) {
    con <- dbConnect(SQLite(), dbname = dbWork,
      flags = SQLITE_RO)
    on.exit(dbDisconnect(con), add = TRUE)

    # Check that required tables exist
    need_tables <- c("work", "modification_status",
      if (use_granular_control) "need_outputs")

    has_tables <- need_tables %in% dbListTables(con)

    # If all tables exist, then check that each table has required fields
    res <- all(has_tables)
    if (res) for (k in seq_along(need_tables)) {
      has_fields <- dbListFields(con, need_tables[k])
      ftemp <- match.fun(paste("colnames", need_tables[k], "dbWork", sep = "_"))
      res <- res && all(ftemp() %in% has_fields)
    }
  }

  res
}



#' Re-create or update \var{\sQuote{dbWork}} based on \var{\sQuote{dbOutput}}
#'
#' @inheritParams create_dbWork
#' @param dbOutput A character string. Full name to the output database.
#' @param use_granular_control A logical vector.
#' @param SFSW2_prj_meta An environment. If not \code{NULL}, then \code{path},
#'  \code{dbOutput}, and/or \code{use_granular_control} may be missing.
#'  If not \code{NULL}, then code checks that no temporary output files remain
#'  unprocessed. This is because this function only checks output in
#'  the database \var{\sQuote{dbOutput}}, but not in the database(s) files
#'  \var{\sQuote{dbTempOut}}.
#'
#' @return A logical vector indicating success.
#'
#' @examples
#' \dontrun{
#' if (exists("SFSW2_prj_meta")) {
#'   # `SFSW2_prj_meta` object as produced, e.g., for `TestPrj4`
#'   recreate_dbWork(SFSW2_prj_meta = SFSW2_prj_meta)
#' }}
#'
#' @export
recreate_dbWork <- function(path, dbOutput, use_granular_control,
  SFSW2_prj_meta = NULL, verbose = FALSE, print.debug = FALSE) {

  res <- FALSE

  if (verbose) {
    t1 <- Sys.time()
    temp_call <- shQuote(match.call()[1])
    print(paste0("rSFSW2's ", temp_call, ": started at ", t1))

    on.exit(print(paste0("rSFSW2's ", temp_call, ": ended after ",
      round(difftime(Sys.time(), t1, units = "secs"), 2))), add = TRUE)
  }

  if (missing(path)) {
    path <- if (!is.null(SFSW2_prj_meta)) {
        SFSW2_prj_meta[["project_paths"]][["dir_out"]]
      } else stop("'recreate_dbWork': argument 'path' is missing.")
  }

  if (missing(dbOutput)) {
    dbOutput <- if (!is.null(SFSW2_prj_meta)) {
        SFSW2_prj_meta[["fnames_out"]][["dbOutput"]]
      } else stop("'recreate_dbWork': argument 'dbOutput' is missing.")
  }

  if (missing(use_granular_control)) {
    use_granular_control <- if (!is.null(SFSW2_prj_meta) &&
      !is.null(SFSW2_prj_meta[["opt_out_fix"]][["use_granular_control"]])) {
        SFSW2_prj_meta[["opt_out_fix"]][["use_granular_control"]]
      } else {
        stop("'recreate_dbWork': argument 'use_granular_control' is missing.")
      }
  }

  if (!is.null(SFSW2_prj_meta)) {
    # check that no temporary output files remain unprocessed
    temp1 <- get_fnames_temporaryOutput(
      dir_out_temp = SFSW2_prj_meta[["project_paths"]][["dir_out_temp"]],
      concatFile = file.path(
        SFSW2_prj_meta[["project_paths"]][["dir_out_temp"]],
        "sqlFilesInserted.txt"))
    temp2 <- get_fnames_dbTempOut(
      SFSW2_prj_meta[["project_paths"]][["dir_out_temp"]])

    tempN_todo <- length(temp1) + length(temp2)

    if (tempN_todo > 0) {
      stop("'recreate_dbWork' can only correctly re-create `dbWork` if",
        " all temporary output files have been moved to the database: \n",
        "Currently, n(unfinished temporary files) = ", tempN_todo, ".\n",
        "Use first, for instance, function `move_output_to_dbOutput` before ",
        "trying again.")
    }
  }


  if (file.exists(dbOutput)) {
    #--- Infer design of simulation experiment based on dbOutput
    con_dbOut <- dbConnect(SQLite(), dbname = dbOutput,
      flags = SQLITE_RO)
    on.exit(dbDisconnect(con_dbOut), add = TRUE)

    if (!all(sapply(c("runs", "sites"), function(x)
      dbExistsTable(con_dbOut, x)))) {
      stop("'recreate_dbWork': OutputDB ", shQuote(dbOutput), " has ",
        "incomplete structure; dbWork cannot be recreated from it.")
    }

    # List of output 'aggregation' tables
    out_tables <- dbOutput_ListOutputTables(con = con_dbOut)

    # Extract information from dbOutput table 'runs'
    infer_scN <- as.integer(dbGetQuery(con_dbOut,
      "SELECT MAX(scenario_id) FROM runs"))
    infer_runsN_total <- as.integer(dbGetQuery(con_dbOut,
      "SELECT MAX(label_id) FROM runs")) # TODO(drs): this field should really
      # be called 'runID'

    # Extract information from dbOutput table 'sites'
    infer_runsN_master <- as.integer(dbGetQuery(con_dbOut,
      "SELECT COUNT(*) FROM sites"))
    infer_include_YN <- as.logical(dbGetQuery(con_dbOut,
      "SELECT Include_YN FROM sites")[, 1])
    infer_runIDmax_sites <- as.integer(dbGetQuery(con_dbOut,
      "SELECT MAX(site_id) FROM sites"))

    # Extract information from dbOutput table 'experimental_labels'
    infer_expN <- if (dbExistsTable(con_dbOut, "experimental_labels")) {
        as.integer(dbGetQuery(con_dbOut,
          "SELECT COUNT(*) FROM experimental_labels"))
      } else 1


    #--- If dbWork exists, then copy timing information if design is current
    dbWork <- fname_dbWork(path)
    old_timing_s <- NULL
    dbWork_backup <- paste0(dbWork, "_backup")

    if (file.exists(dbWork)) {
      con_dbWork <- dbConnect(SQLite(), dbname = dbWork,
        flags = SQLITE_RW)
      on.exit(dbDisconnect(con_dbWork), add = TRUE)

      if (dbExistsTable(con_dbWork, "work")) {
        # Check whether to create new dbWork: number of total runs,
        # number of sites
        has_runsN_total <- as.integer(dbGetQuery(con_dbWork,
          "SELECT MAX(runID_total) FROM work"))
        has_count_work <- as.integer(dbGetQuery(con_dbWork,
          "SELECT COUNT(*) FROM work"))
        has_runIDmax_sites <- as.integer(dbGetQuery(con_dbWork,
          "SELECT MAX(runID_sites) FROM work"))

        if (has_runsN_total == has_count_work &&
            has_runsN_total == infer_runsN_total &&
            has_runIDmax_sites == infer_runIDmax_sites) {

          if (verbose) {
            print(paste0(Sys.time(), ": dbWork is present and of ",
              "adequate design, backup run timing data"))
          }

          old_timing_s <- dbGetQuery(con_dbWork,
            "SELECT runID_total, time_s FROM work ORDER BY runID_total")
        }
      }

      dbDisconnect(con_dbWork)

      if (verbose) {
        print(paste0(Sys.time(), ": previous dbWork is backed-up."))
      }
      file.rename(from = dbWork, to = dbWork_backup)
    }


    #--- Create new dbWork
    infer_sim_size <- list(runsN_master = infer_runsN_master,
      runsN_total = infer_runsN_total, expN = infer_expN)

    stopifnot(create_dbWork(path,
      jobs = create_job_df(infer_sim_size, infer_include_YN)))

    con_dbWork <- dbConnect(SQLite(), dbname = dbWork,
      flags = SQLITE_RW)
    on.exit(dbDisconnect(con_dbWork), add = TRUE)

    if (use_granular_control) {
      stopifnot(!is.null(SFSW2_prj_meta))

      if (verbose) {
        print(paste0(Sys.time(), ": add 'granular-level' table for ",
          "new dbWork."))
      }
      stopifnot(add_granular_dbWork(SFSW2_prj_meta))
    }

    # If dbWork existed previously and there was timing information, then insert
    ids_use <- old_timing_s[, "time_s"] > 0
    if (length(ids_use) > 0) {
      if (verbose) {
        print(paste0(Sys.time(), ": previous timing data is re-inserted ",
          "into new dbWork"))
      }
      sql <- paste("UPDATE work SET time_s = :t WHERE runID_total = :x")
      rs <- dbSendStatement(con_dbWork, sql)
      dbBind(rs, params = list(t = old_timing_s[ids_use, "time_s"],
        x = old_timing_s[ids_use, "runID_total"]))
      dbClearResult(rs)
    }


    #--- Update dbWork based on completed runs stored in dbOutput

    if (!use_granular_control) {
      # Note: below code is not memory-efficient because of large numbers of
      # Pids for large projects --> use `use_granular_control`
      if (verbose) {
        print(paste0(Sys.time(), ": update information on completed runs"))
      }

      # Get Pids for which simulation output is in each table of the dbOutput
      has_pids_per_table <- lapply(out_tables, function(x) {
          sql <- paste0("SELECT DISTINCT P_id FROM",
            dbQuoteIdentifier(con_dbOut, x), "ORDER BY P_id")
          dbGetQuery(con_dbOut, sql)[, 1]
        })

      #-- Update table 'work'
      has_pids_complete <- rSW2utils::intersect2(has_pids_per_table)

      if (length(has_pids_complete) > 0) {
        # Get runID from Pid
        temp_runIDs <- it_sim2(has_pids_complete, infer_scN)

        # runID is complete if present and if
        # count(runID) == number of scenarios
        has_complete_runIDs <- which(tabulate(temp_runIDs) == infer_scN)

        if (length(has_complete_runIDs) > 0) {
          # set complete runIDs
          sql <- paste("UPDATE work SET completed = 1, failed = 0, inwork = 0",
            "WHERE runID_total = :x")
          rs <- dbSendStatement(con_dbWork, sql)
          dbBind(rs, params = list(x = has_complete_runIDs))
          dbClearResult(rs)
        }
      }

    } else {
      quoted_tables <- dbQuoteIdentifier(con_dbWork, out_tables)

      sql <- paste("ATTACH", dbQuoteIdentifier(con_dbWork, dbOutput),
        "AS dbOut")
      dbExecute(con_dbWork, sql)

      #-- Update table 'need_outputs'
      for (k in seq_along(out_tables)) {
        if (verbose) {
          print(paste0(Sys.time(), ": update granular-level information for",
            " output table ", quoted_tables[k]))
        }

        # don't need to generate output (anymore) where output is present in
        # dbOut
        # TODO(drs): we currently assume that one entry per Pid even
        # for mean daily tables with soil layers is enough
        sql <- paste0("UPDATE need_outputs SET ", quoted_tables[k], " = 0 ",
          "WHERE Pid IN ",
          "(SELECT DISTINCT P_id FROM dbOut.", quoted_tables[k], ")")
        dbExecute(con_dbWork, sql)
      }

      dbExecute(con_dbWork, "DETACH dbOut")

      #-- Update table 'work' based on updated 'need_outputs'
      # a runID is complete if all associated Pids are complete, i.e.,
      # count(Pids) == infer_scN, for each output table
      if (verbose) {
        print(paste0(Sys.time(), ": update information on completed runs"))
      }

      sql <- paste0("UPDATE work SET completed = 1, failed = 0, inwork = 0 ",
        "WHERE runID_total IN (",
        "SELECT oa.runID_total FROM ",
        "(SELECT runID_total, COUNT(*) FROM need_outputs ",
        "WHERE ", paste(quoted_tables, "= 0", collapse = " AND "), " ",
        "GROUP BY runID_total) AS oa ",
        "WHERE oa.`COUNT(*)` = ", infer_scN, ")")
      dbExecute(con_dbWork, sql)
    }

    # Set modification status: up-to-date
    dbWork_update_status(path, status = FALSE, verbose = print.debug)

    unlink(dbWork_backup)
    res <- file.exists(dbWork)

  } else {
    stop("OutputDB ", shQuote(dbOutput), " not found on disk.")
  }

  invisible(res)
}


#' Update run information of a \pkg{rSFSW2} simulation project
#'
#' @inheritParams create_dbWork
#' @param runID An integer value. The identification number of the current run,
#'   i.e., a value out of \code{runIDs_total}, see \code{\link{indices}}.
#' @param status A character string. One of \var{\dQuote{completed}},
#'   \var{\dQuote{failed}}, \var{\dQuote{inwork}}.
#' @param time_s A numeric value. The execution time in seconds; used if
#'   \code{status} is one of \var{\dQuote{completed}} and \var{\dQuote{failed}}.
#' @param verbose A logical value. If \code{TRUE}, status messages about file
#'   lock and database access are printed
#'
#' @return A logical value whether the status was successfully updated.
dbWork_update_job <- function(path, runID, status, time_s = "NULL",
  verbose = FALSE) {

  dbWork <- fname_dbWork(path)
  success <- FALSE
  res <- 0L

  # Connect to dbWork
  con <- dbConnect2(dbWork, verbose = verbose)

  if (inherits(con, "SQLiteConnection")) {
    on.exit(dbDisconnect(con), add = TRUE)

    if (verbose) {
      t0 <- Sys.time()
    }

    # Update job record
    repeat {
      if (verbose) {
        print(paste0("'dbWork_update_job': ", Sys.time(), " (", runID, "-",
          status, ") attempt to update after ",
          round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
      }

      if (status == "completed") {
        res <- try(dbExecute(con, paste("UPDATE work SET completed = 1,",
          "failed = 0, inwork = 0, time_s =", time_s,
          "WHERE runID_total =", runID)), silent = !verbose)

      } else if (status == "failed") {
        res <- try(dbExecute(con, paste("UPDATE work SET completed = 0,",
          "failed = 1, inwork = 0, time_s =", time_s,
          "WHERE runID_total =", runID)), silent = !verbose)

      } else if (status == "inwork") {
        # https://sqlite.org/lang_transaction.html: "After a BEGIN IMMEDIATE,
        # no other database connection will be able to write to the database or
        # do a BEGIN IMMEDIATE or BEGIN EXCLUSIVE. Other processes can continue
        # to read from the database, however."
        res <- try(dbExecute(con, "BEGIN IMMEDIATE"), silent = !verbose)

        if (!inherits(res, "try-error")) {
          # Transaction with reserved lock established
          prev_status <- dbGetQuery(con,
            paste("SELECT inwork FROM work WHERE runID_total =", runID))$inwork

          res <- if (prev_status == 0) {
              try(dbExecute(con, paste("UPDATE work SET completed = 0,",
                "failed = 0, inwork = 1, time_s = 0",
                "WHERE runID_total =", runID)), silent = !verbose)
            } else {
              0L
            }
        }

        # End transaction
        if (inherits(res, "try-error")) {
          try(dbExecute(con, "ROLLBACK"), silent = !verbose)
        } else {
          try(dbExecute(con, "COMMIT"), silent = !verbose)

          if (verbose && prev_status != 0) {
            print(paste("'dbWork_update_job':", runID, "is already in work"))
          }
        }

      } else {
        if (verbose) {
          print(paste("'dbWork_update_job': value", shQuote(status),
            "of argument 'status' is not implemented."))
        }

        res <- 0L
      }

      success <- !inherits(res, "try-error")

      if (success) {
        if (verbose) {
          print(paste0("'dbWork_update_job': ", Sys.time(), " (", runID, "-",
            status, ") transaction confirmed after ",
            round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
        }
        break

      } else {
        if (verbose) {
          print(paste0("'dbWork_update_job': ", Sys.time(), " (", runID, "-",
            status, ") 'dbWork' is locked after ",
            round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
        }

        Sys.sleep(stats::runif(1, 0.02, 0.1))
      }
    }
  }

  identical(res, 1L)
}


#' Update granular run information of a \pkg{rSFSW2} simulation project
#'
#' @inheritParams create_dbWork
#' @param table A character string.
#' @param Pid An integer value. The identification number of the current output,
#'   i.e., see \code{\link{indices}}.
#' @param status A logical value. \code{FALSE} indicates "no longer needed",
#'   i.e., that the output for \code{table} has successfully been generated (see
#'   \code{\link{add_granular_dbWork}}).
#' @param verbose A logical value.
#'
#' @return A logical value whether the status was successfully updated.
dbWork_update_granular <- function(path, table, Pid, status, verbose = FALSE) {
  dbWork <- fname_dbWork(path)
  success <- FALSE
  res <- 0L

  # Connect to dbWork
  con <- dbConnect2(dbWork, verbose = verbose)

  if (inherits(con, "SQLiteConnection")) {
    on.exit(dbDisconnect(con), add = TRUE)

    if (verbose) {
      t0 <- Sys.time()
    }

    # Update job record
    repeat {
      if (verbose) {
        print(paste0("'dbWork_update_granular': ", Sys.time(), " (", Pid, "-",
          status, ") attempt to update after ",
          round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
      }

      sql <- paste("UPDATE need_outputs SET", shQuote(table), "=",
        if (isTRUE(status)) "1" else "0",
        "WHERE Pid =", as.integer(Pid))
      res <- try(dbExecute(con, sql), silent = !verbose)

      success <- !inherits(res, "try-error")

      if (success) {
        if (verbose) {
          print(paste0("'dbWork_update_granular': ", Sys.time(), " (", Pid, "-",
            status, ") transaction confirmed after ",
            round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
        }
        break

      } else {
        if (verbose) {
          print(paste0("'dbWork_update_granular': ", Sys.time(), " (", Pid, "-",
            status, ") 'dbWork' is locked after ",
            round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
        }

        Sys.sleep(stats::runif(1, 0.02, 0.1))
      }
    }
  }

  identical(res, 1L)
}



#' Update modification status of a \pkg{rSFSW2} simulation project
#'
#' @inheritParams create_dbWork
#' @param status A logical value. \code{FALSE} indicates "not modified", i.e.,
#'   that \code{dbWork} and generated output \code{dbOutput} and
#'   \code{dbTempOut} is synchronized; \code{TRUE} indicates "modified".
#' @param verbose A logical value.
#'
#' @seealso \code{\link{add_status_dbWork}}
#' @return A logical value whether the status was successfully updated.
dbWork_update_status <- function(path, status, verbose = FALSE) {
  dbWork <- fname_dbWork(path)
  success <- FALSE
  res <- 0L

  # Connect to dbWork
  con <- dbConnect2(dbWork, verbose = verbose)

  if (inherits(con, "SQLiteConnection")) {
    on.exit(dbDisconnect(con), add = TRUE)

    if (verbose) {
      t0 <- Sys.time()
    }

    # Update status
    repeat {
      if (verbose) {
        print(paste0("'dbWork_update_status': ", Sys.time(), " (", status,
          ") attempt to update after ",
          round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
      }

      sql <- paste("UPDATE modification_status SET ",
        "status =", if (isTRUE(status)) "1" else "0", ",",
        "time_stamp =", as.integer(Sys.time()))
      res <- try(dbExecute(con, sql), silent = !verbose)

      success <- !inherits(res, "try-error")

      if (success) {
        if (verbose) {
          print(paste0("'dbWork_update_status': ", Sys.time(), " (", status,
            ") transaction confirmed after ",
            round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
        }
        break

      } else {
        if (verbose) {
          print(paste0("'dbWork_update_status': ", Sys.time(), " (", status,
            ") 'dbWork' is locked after ",
            round(difftime(Sys.time(), t0, units = "secs"), 2), " s"))
        }

        Sys.sleep(stats::runif(1, 0.02, 0.1))
      }
    }
  }

  identical(res, 1L)
}

#------ End of dbWork functions
########################
Burke-Lauenroth-Lab/rSFSW2 documentation built on Aug. 14, 2020, 5:20 p.m.