R/sge_make_week_index.R

Defines functions sge_make_week_index

Documented in sge_make_week_index

#' Index the SGE accounting file for ISO-8601 weeks
#'
#' @param file (character) The SGE \file{accounting} file to read.
#'
#' @param index (numeric vector) File byte offsets of job entries.
#'
#' @param until (character vector) If a week read that is part of this
#' set, the terminate the scanning.  The weeks should be specified
#' using `%GW%V` week format, e.g. `2019W01`.
#'
#' @param n_max (numeric) The maximum number of weeks to read.
#'
#' @param delta (numeric) Maximum number of rows to skip forward and
#' backword in the divide-and-conquer search strategy.
#'
#' @param debug (logical) If TRUE, display debug messages.
#'
#' @return A [tibble::tibble] with fields:
#'  * `week` (character) - the ISO-8601 week name, e.g. `"2021W48"`
#'  * `nbr_of_jobs` (numeric) - number of jobs
#'  * `file_offset` (numeric) - the file byte offset
#' The week names uses `%GW%V` week format.
#'
#' @importFrom progressr progressor
#' @importFrom tibble as_tibble
#' @importFrom utils str
#' @export
sge_make_week_index <- function(file, index, until = NULL, n_max = Inf, delta = 100000, debug = FALSE) {
  ntry <- function(expr, envir = parent.frame(), tries = 5L, wait = 1.0) {
    expr <- substitute(expr)
    for (kk in seq_len(tries)) {
      value <- tryCatch({
        eval(expr, envir = envir)
      }, error = identity)
      if (!inherits(value, "error")) return(value)
      Sys.sleep(wait)
    }
    eval(expr, envir = envir)
  } # ntry()
  
  weeks <- list()
  delta_org <- delta

  by <- "end_time"

  parse_time <- local({
    origin <- as.POSIXct("1970-01-01 00:00.00 UTC", tz = "GMT")
    function(value) {
      if (value == 0) value <- NA_real_
      as.POSIXct(value, origin = origin)
    }
  })
  
  ## Report on progress along 'index' every 1000:th entries
  p <- progressor(length(index) / 1000 + 2)
  
  pos <- 1

  con <- file(file, open = "rb")
  on.exit(close(con))
  
  ## Assume first read is the start of the first week
  last <- pos
  offset <- index[pos]
  job <- read_raw_sge_accounting(con, offset = offset, n_max = 1L, progress = FALSE)
  time <- parse_time(job[[by]])
  job <- NULL ## Not needed anymore
  week <- format(time, "%GW%V")
  weeks[[week]] <- offset
  last_week <- week
  p(week)

  ## Find the beginning of next week
  delta <- delta_org
  last_same <- last
  pos <- last + delta
  count <- 0L
  forward <- TRUE
  while (pos <= length(index)) {
    offset <- index[pos]
    job <- read_raw_sge_accounting(con, offset = offset, n_max = 1L, progress = FALSE)
    time <- parse_time(job[[by]])
    job <- NULL ## Not needed anymore
    week <- format(time, "%GW%V")
    if (debug) str(list(count = count, pos = pos, week  = week, last_week = last_week))
    if (identical(week, last_week)) {
      last_same <- pos
      if (debug) message("Move forward")
      if (!forward) delta <- max(floor(delta / 2), 1)
      pos <- pos + delta
      forward <- TRUE
      p(amount = 0)
    } else {
      ## Invalid entry?
      if (is.na(week)) {
        pos <- pos + if (forward) +1 else -1
        if (pos <= last_same) {
          pos <- last_same + 1
          forward <- TRUE
        } else if (pos >= length(index)) {
          pos <- length(index) - 1
          forward <- FALSE
        }
        if (pos <= last_same) {
          attr(weeks, "terminated") <- "NA"
          break
        }
        p(amount = 0)
        next
      }
      
      if (delta == 1) {
        if (debug) message("Found next week")
        weeks[[week]] <- c(weeks[[week]], offset)
        amount <- (pos - last)/1000
        if (debug) message(sprintf("%s: amount = %.0f / %.0f", week, amount, length(index)/1000))
        p(week, amount = amount)
        last_week <- week
        delta <- delta_org
        last_same <- pos
        last <- pos
        pos <- pos + delta
        forward <- TRUE
        if (length(weeks) >= n_max) {
          attr(weeks, "terminated") <- "n_max"
          if (debug) message("n_max fulfilled: ", n_max)
          break
        } else if (week %in% until) {
          attr(weeks, "terminated") <- "until"
          if (debug) message("until fulfilled: ", week)
          break
        }
      }
      if (debug) message("Move back")
      if (forward) delta <- max(floor(delta / 2), 1)
      pos <- pos - delta
      forward <- FALSE
      if (pos <= last_same) {
        pos <- last_same + 1
        forward <- TRUE
      }
      p(amount = 0)
    }
    if (debug) str(list(count = count, pos = pos, last_same = last_same, delta = delta))
    stopifnot(pos > last_same)
    count <- count + 1L
  } ## while (pos <= length(index)
  p(step = length(index) / 1000)

  ## Coerce to a named vector
  stopifnot(all(lengths(weeks) == 1))
  weeks <- unlist(weeks, use.names = TRUE)
  weeks <- weeks[!duplicated(weeks)]
  p("number of jobs per week")

  week_index <- data.frame(
    week        = names(weeks),
    nbr_of_jobs = c(diff(match(weeks, index)), +Inf),
    file_offset = unname(weeks)
  )
  week_index <- as_tibble(week_index)
  class(week_index) <- c("sge_accounting_index_by_week", "sge_accounting_index", class(week_index))

  p()

  week_index
}
UCSF-HPC/wyntonquery documentation built on March 6, 2025, 1:12 a.m.