R/job_tracking_functions.R

Defines functions populate_list_arg get_tracked_job_status update_tracked_job_status add_tracked_job_parent insert_tracked_job create_tracking_db reset_tracking_sqlite_db submit_tracking_query

Documented in add_tracked_job_parent create_tracking_db get_tracked_job_status insert_tracked_job reset_tracking_sqlite_db submit_tracking_query update_tracked_job_status

#' Internal helper function to submit a query to the tracking SQLite database
#'
#' @param str Character string SQLite query
#' @param sqlite_db Path to SQLite database used for tracking
#' @param param List of parameters/arguments to be used in query
#'
#' @keywords internal
submit_tracking_query = function(str, sqlite_db, param = NULL) {
  # previously called submit_sqlite()
  checkmate::assert_string(str)

  # check if tracking db exists; if not, create it
  if (!checkmate::test_file_exists(sqlite_db)) {
    create_tracking_db(sqlite_db)
  }

  # open sqlite connection and execute query
  submit_sqlite_query(str = str, sqlite_db = sqlite_db, param = param)

}

#' Internal helper function to reset tracking SQLite database
#'
#' @param sqlite_db Path to SQLite database used for tracking
#'
#' @keywords internal
reset_tracking_sqlite_db = function(sqlite_db) {
  # this file has the SQL syntax to setup (and reset) the database
  # reset_sql <- "
  # SET foreign_key_checks = 0;
  # DROP TABLE IF EXISTS job_tracking;
  # SET foreign_key_checks = 1;
  # "
  reset_sql <- "DELETE FROM job_tracking" # delete all records
  submit_tracking_query(str = reset_sql, sqlite_db = sqlite_db)
}


#' Internal helper function to create the tracking SQLite database
#'
#' @param sqlite_db Path to SQLite database used for tracking
#'
#' @keywords internal
create_tracking_db = function(sqlite_db) {
  # previously called create_sqlite_db()
  job_spec_sql <- "
    CREATE TABLE job_tracking (
      id INTEGER PRIMARY KEY,
      parent_id INTEGER,
      job_id VARCHAR NOT NULL UNIQUE,
      job_name VARCHAR,
      batch_directory VARCHAR,
      batch_file VARCHAR,
      compute_file VARCHAR,
      code_file VARCHAR,
      n_nodes INTEGER CHECK (n_nodes >= 1),
      n_cpus INTEGER CHECK (n_cpus >= 1),
      wall_time VARCHAR,
      mem_per_cpu VARCHAR,
      mem_total VARCHAR,
      scheduler VARCHAR,
      scheduler_options VARCHAR,
      job_obj BLOB,
      time_submitted INTEGER,
      time_started INTEGER,
      time_ended INTEGER,
      status VARCHAR(24),
      FOREIGN KEY (parent_id) REFERENCES job_tracking (id)
    );
    "
  # open sqlite connection
  submit_sqlite_query(str = job_spec_sql, sqlite_db = sqlite_db)
}


#' Internal helper funciton to insert a job into the tracking SQLite database
#'
#' @param sqlite_db Path to SQLite database used for tracking
#'
#' @keywords internal
insert_tracked_job = function(sqlite_db, job_id, tracking_args = list()) {
  # previously called sqlite_insert_job()
  if (is.null(sqlite_db) || is.null(job_id)) return(invisible(NULL)) # skip out if not using DB or if job_id is NULL
  if (is.numeric(job_id)) job_id <- as.character(job_id)
  if (is.null(tracking_args$status)) tracking_args$status <- "QUEUED" # default value of first status

  sql <- "INSERT INTO job_tracking
    (job_id, job_name, batch_directory,
    batch_file, compute_file, code_file,
    n_nodes, n_cpus, wall_time,
    mem_per_cpu, mem_total,
    scheduler, scheduler_options, job_obj,
    time_submitted, status)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"

  # gather tracking parameters into a list
  param <- list(job_id, tracking_args$job_name, tracking_args$batch_directory, 
                tracking_args$batch_file, tracking_args$compute_file, tracking_args$code_file, 
                tracking_args$n_nodes, tracking_args$n_cpus, tracking_args$wall_time, 
                tracking_args$mem_per_cpu, tracking_args$mem_total, tracking_args$scheduler,
                tracking_args$scheduler_options, tracking_args$job_obj, as.character(Sys.time()), tracking_args$status)
  
  for (i in 1:length(param)) {
    param[[i]] <- ifelse(is.null(param[[i]]), NA, param[[i]]) # convert NULL values to NA for dbExecute
  }
  
  # order the tracking arguments to match the query; status is always 'QUEUED' when first added to the database
  submit_tracking_query(str = sql, sqlite_db = sqlite_db, param = param)
}


#' Add parent/child id relationship to tracking database
#'
#' @param sqlite_db Path to SQLite database used for tracking
#' @param job_id Job id of job for which to add a parent
#' @param parent_job_id Job id of the parent job to job_id
#'
#' @importFrom DBI dbConnect dbExecute dbDisconnect
#' @export
add_tracked_job_parent = function(sqlite_db = NULL, job_id = NULL, parent_job_id = NULL) {
  # skip out if not using DB or job_id/parent_id is NULL
  if (is.null(sqlite_db) || is.null(job_id) || is.null(parent_job_id)) return(invisible(NULL)) 
  if (is.numeric(job_id)) job_id <- as.character(job_id)
  if (is.numeric(parent_job_id)) parent_job_id <- as.character(parent_job_id)

  sql <- "UPDATE job_tracking
  SET parent_id = (SELECT id FROM job_tracking WHERE job_id = ?)
  WHERE job_id = ?"
  
  tryCatch({
    # open sqlite connection and execute query
    submit_sqlite_query(str = sql, sqlite_db = sqlite_db, param = list(parent_job_id, job_id))
  }, error = function(e) { print(e); return(NULL)})
}


#' Update job status in tracking SQLite database
#' 
#' @param sqlite_db Character string specifying the SQLite database used for job tracking
#' @param job_id Character string specifying the job id to update as failed
#' @param status Character string specifying the job status to set. Must be one of: 
#'   "QUEUED", "STARTED", "FAILED", "COMPLETED", "FAILED_BY_EXT"
#' @param exclude Any job ids to ignore when cascading a status
#' @importFrom glue glue
#' @importFrom DBI dbConnect dbExecute dbDisconnect
#' @export
update_tracked_job_status <- function(sqlite_db = NULL, job_id = NULL, status, cascade = FALSE, exclude = NULL) {
  
  if (!checkmate::test_string(sqlite_db)) return(invisible(NULL))
  if (is.numeric(job_id)) job_id <- as.character(job_id)
  if (!checkmate::test_string(job_id)) return(invisible(NULL)) # quiet failure on invalid job id

  checkmate::assert_string(status)
  status <- toupper(status)
  checkmate::assert_subset(status, c("QUEUED", "STARTED", "FAILED", "COMPLETED", "FAILED_BY_EXT"))
  if (cascade & status %in% c("QUEUED", "STARTED", "COMPLETED")) {
    cascade <- FALSE
    warning("Only status FAILED or FAILED_BY_EXT can cascade in `update_tracked_job_status`")
  }
  
  now <- as.character(Sys.time())
  time_field <- switch(status,
                       QUEUED = "time_submitted",
                       STARTED = "time_started",
                       FAILED = "time_ended",
                       COMPLETED = "time_ended",
                       FAILED_BY_EXT = "time_ended"
  )
  
  tryCatch({
    submit_sqlite_query(str = glue("UPDATE job_tracking SET STATUS = ?, {time_field} = ? WHERE job_id = ?"),
                        sqlite_db = sqlite_db, param = list(status, now, job_id))
  }, error = function(e) { print(e); return(NULL)})
  
  # recursive function for "cascading" failures using status "FAILED_BY_EXT"
  if (cascade) {
    if (is.numeric(exclude)) exclude <- as.character(exclude)
    
    status_tree <- fmri.pipeline::get_tracked_job_status(job_id, return_children = TRUE, sqlite_db = sqlite_db) # retreive current job and children
    job_ids <- status_tree$job_id # get list of job ids
    
    for (child_job in setdiff(job_ids, c(job_id, exclude))) {
      child_status <- with(status_tree, status[which(job_ids == child_job)]) # check status
      if (child_status != "FAILED") fmri.pipeline::update_tracked_job_status(child_job, sqlite_db = sqlite_db, status = "FAILED_BY_EXT", cascade = TRUE)
    }
  }
  
  return(invisible(NULL))
  
}

#' Query job status in tracking SQLite database
#' 
#' @param job_id The job id for which to retreive the status
#' @param sqlite_db Character string of sqlite database
#' @param return_children Return child jobs of this job
#' @param return_parent Return parent jobs of this job
#' 
#' @return An R data.frame version of the tracking database
#' @importFrom DBI dbConnect
#' @importFrom RSQLite SQLite
#' @export
get_tracked_job_status <- function(job_id = NULL, return_children = FALSE, return_parent = FALSE, sqlite_db) {
  
  on.exit(try(dbDisconnect(con)))

  if (is.numeric(job_id)) job_id <- as.character(job_id)
  if (!checkmate::test_string(job_id)) return(invisible(NULL))
  checkmate::assert_logical(return_children)
  checkmate::assert_logical(return_parent)
  if (!checkmate::test_file_exists(sqlite_db)) {
    warning("Cannot find SQLite database at: ", sqlite_db)
  }
  
  con <- dbConnect(RSQLite::SQLite(), sqlite_db)
  
  
  str <- paste0("SELECT * FROM job_tracking WHERE job_id = ?", 
                ifelse(return_children, " OR parent_id = (SELECT id FROM job_tracking WHERE job_id = ?)", ""),
                ifelse(return_parent, " OR id = (SELECT parent_id FROM job_tracking WHERE job_id = ?)", ""))
  df <- dbGetQuery(con, str, param = as.list(rep(job_id, 1 + return_children + return_parent)))
  
  # rehydrate job_obj back into R6 class
  if (nrow(df) > 0L) df$job_obj <- lapply(df$job_obj, function(x) if (!is.null(x)) unserialize(x))
  return(df)
}


# Internal helper function to update tracker_args object
#'
#' @param list_to_populate The list whose argument will be populated/updated
#' @param arg_name The named list element to update
#' @param new_value The new value to update the element with
#' @param append If TRUE, appends the new value to the current value using the paste function. Default: FALSE
#'
#' @keywords internal
populate_list_arg = function(list_to_populate, arg_name, new_value = NULL, append = FALSE) {
  
  checkmate::assert_list(list_to_populate)
  if (is.null(new_value)) return(list_to_populate) # if the new_value arg is NULL just return the list as is

  if(is.null(list_to_populate[[arg_name]]) || is.na(list_to_populate[[arg_name]])) {
    list_to_populate[[arg_name]] <- new_value # if the current arg is NULL, update with new value
  } else if (append) {
    # if it's not NULL but append is TRUE, appends new value to beginning of old value
    list_to_populate[[arg_name]] <- paste(list_to_populate[[arg_name]], new_value, sep = "\n")  
  }
  return(list_to_populate)
}
UNCDEPENdLab/fmri.pipeline documentation built on April 3, 2025, 3:21 p.m.