R/driver.R

Defines functions warn_datachunk_bypass warn_mismarked_fileinputs warn_data_injects driver tibbelize_outputs check_chunk_outputs run_chunk

Documented in check_chunk_outputs driver run_chunk tibbelize_outputs warn_datachunk_bypass warn_data_injects warn_mismarked_fileinputs

# driver.R
# # tidyr ver1.0.0 causes problem with the usage of tidyr::complete
# if(!require("tidyr")){
#   install.packages( "https://github.com/tidyverse/tidyr/archive/v0.8.3.tar.gz", repos = NULL, type = "libcurl", dependencies = TRUE)
#   #devtools::install_version("tidyr", version="0.8.3", dependencies=TRUE)
# } else if(packageVersion("tidyr") == "1.0.0"){
#   remove.packages("tidyr")
#   install.packages( "https://github.com/tidyverse/tidyr/archive/v0.8.3.tar.gz", repos = NULL, type = "libcurl", dependencies = TRUE)
# } else {
# }


TEMP_DATA_INJECT <- "temp-data-inject/"

#' run_chunk
#'
#' @param chunk Chunk name, character
#' @param all_data Data required by chunk
#' @return Data generated by chunk
#' @details Putting this \code{do.call} into this one-line
#' function lets us mock it when testing.
run_chunk <- function(chunk, all_data) {
  do.call(chunk, list(driver.MAKE, all_data))
}


#' check_chunk_outputs
#'
#' Checks chunk outputs for a variety of problems: correct structure,
#' correct attributes attached, matches promised outputs. Generates
#' warnings and/or errors if any deviance.
#'
#' @param chunk Chunk name, character
#' @param chunk_data Data produced by chunk, generally a tibble
#' @param chunk_inputs Names of chunk inputs, character
#' @param promised_outputs Names of chunk's promised outputs, character
#' @param outputs_xml Logical vector: are outputs XML?
check_chunk_outputs <- function(chunk, chunk_data, chunk_inputs, promised_outputs, outputs_xml) {
  assert_that(is_data_list(chunk_data))

  # Check that the chunk has provided required data for all objects
  empty_precursors <- TRUE
  pc_all <- c()
  for(obj in names(chunk_data)) {
    obj_flags <- get_flags(chunk_data[[obj]])
    # Chunks have to returns tibbles, unless they're tagged as being XML
    if(!outputs_xml[which(obj == promised_outputs)]) {
      assert_that(tibble::is.tibble(chunk_data[[obj]]), msg = paste(obj, "is", class(obj)))
      assert_that(! FLAG_XML %in% obj_flags, msg = obj)

      # Make sure objects have required attributes
      for(at in c(ATTR_TITLE, ATTR_UNITS, ATTR_COMMENTS)) {
        if(is.null(attr(chunk_data[[obj]], at))) {
          warning("No '", at, "' attached to ", obj, " - chunk ", chunk)
        }
      }

      # Any 'year' column should be numeric - this is a frequent source of join problems
      if("year" %in% names(chunk_data[[obj]]) && !is.numeric(chunk_data[[obj]]$year)) {
        warning("'year' column not numeric in ", obj)
      }

      # If there's any 'logit.*' column then there HAS to be a 'logit.type' column (see #899)
      if(any(grepl("^logit\\.", names(chunk_data[[obj]]))) &&
         !(LOGIT_TYPE_COLNAME %in% names(chunk_data[[obj]]))) {
        warning("No ", LOGIT_TYPE_COLNAME, " column in ", obj)
      }

    } else {
      assert_that(FLAG_XML %in% obj_flags)
    }
    # Data precursors should all appear in input list
    pc <- attr(chunk_data[[obj]], ATTR_PRECURSORS)
    pc_all <- c(pc_all, pc)
    empty_precursors <- empty_precursors & is.null(pc)
    matches <- pc %in% c(chunk_inputs, promised_outputs)
    if(!all(matches)) {
      stop("Some precursors for '", obj, "' aren't inputs - chunk ", chunk)
    }
    if(obj %in% pc) {
      stop("Precursors for '", obj, "' include itself - chunk ", chunk)
    }
  }

  # Every input should be a precursor for something
  if(!all(chunk_inputs %in% pc_all)) {
    message("Inputs ", paste(setdiff(chunk_inputs, pc_all), collapse = ", "),
            " don't appear as precursors for any outputs - chunk ", chunk)
  }

  # If chunk has inputs, some output should have a precursor
  if(empty_precursors & length(chunk_inputs)) {
    stop("No output precursors, but there are inputs - chunk ", chunk)
  }
  # Chunk should have returned EXACTLY what it promised
  if(!identical(sort(names(chunk_data)), sort(promised_outputs))) {
    stop("Chunk ", chunk, "is not returning what it promised!")
  }
}


#' tibbelize_outputs
#'
#' Extract precursor and other metadata from chunk output data and convert to tibble form.
#'
#' @param chunk_data List of chunk outputs, a data list (see \code{\link{is_data_list}})
#' @param chunk_name Name of current chunk, character
#' @return A tibble with chunk name, output name, title, units, flags, precursors, and comments.
#' This table has one row per output name; multiple flags, precursors, etc., are concatenated into single entries.
tibbelize_outputs <- function(chunk_data, chunk_name) {
  assert_that(is.character(chunk_name))
  assert_that(is_data_list(chunk_data))

  metadata <- list()
  for(cd in names(chunk_data)) {
    if(!is.null(chunk_data[[cd]]) & length(chunk_data[[cd]])) {
      # Here we use paste both to collapse vectors into a single string, and deal with possible NULLs
      metadata[[cd]] <- tibble(name = chunk_name,
                               output = cd,
                               precursors = paste(get_precursors(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               title = paste(get_title(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               units = paste(get_units(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               comments = paste(get_comments(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               flags = paste(get_flags(chunk_data[[cd]]), collapse = data.SEPARATOR))
    }
  }
  bind_rows(metadata)
}


#' driver
#'
#' Run the entire data system.
#'
#' @param all_data Data to be pre-loaded into data system
#' @param quiet Suppress output?
#' @param stop_before Stop immediately before this chunk (character)
#' @param stop_after Stop immediately after this chunk  (character)
#' @param return_inputs_of Return the data objects that are inputs for these chunks (character).
#' If \code{stop_before} is specified, by default that chunk's inputs are returned
#' @param return_outputs_of Return the data objects that are output from these chunks (character)
#' If \code{stop_after} is specified, by default that chunk's outputs are returned
#' @param return_data_names Return these data objects (character). By default this is the union of \code{return_inputs_of} and \code{return_outputs_of}
#' @param return_data_map_only Return only the precursor information? (logical) This overrides
#' the other \code{return_*} parameters above
#' @param write_outputs Write all chunk outputs to disk?
#' @param write_xml Write XML Batch chunk outputs to disk?
#' @param outdir Location to write output data (ignored if \code{write_outputs} is \code{FALSE})
#' @param xmldir Location to write output XML (ignored if \code{write_outputs} is \code{FALSE})
#' @return A list of all built data (or a data map tibble if requested).
#' @details The driver loads any necessary data from input files,
#' runs all code chunks in an order dictated by their dependencies,
#' does error-checking, and writes outputs. For more details, see
#' the relevant wiki page at \url{ https://github.com/bpbond/gcamdata/wiki/Driver}.
#' @importFrom magrittr "%>%"
#' @importFrom assertthat assert_that
#' @importFrom dplyr filter mutate select
#' @export
#' @author BBL
driver <- function(all_data = empty_data(),
                   stop_before = NULL, stop_after = NULL,
                   return_inputs_of = stop_before,
                   return_outputs_of = stop_after,
                   return_data_names = union(inputs_of(return_inputs_of),
                                             outputs_of(return_outputs_of)),
                   return_data_map_only = FALSE,
                   write_outputs = !return_data_map_only,
                   write_xml = write_outputs,
                   outdir = OUTPUTS_DIR, xmldir = XML_DIR,
                   quiet = FALSE) {

  # If users ask to stop after a chunk, but also specify they want particular inputs,
  # or if they ask to stop before a chunk, while asking for outputs, that's confusing.
  if(missing(return_outputs_of) && !missing(return_inputs_of) && !missing(stop_after)) {
    return_outputs_of <- NULL  # in this case don't want default of stop_before
  }
  if(missing(return_inputs_of) && !missing(return_outputs_of) && !missing(stop_before)) {
    return_inputs_of <- NULL  # in this case don't want default of stop_after
  }
  if(missing(return_data_names)) {
    return_data_names <- union(inputs_of(return_inputs_of), outputs_of(return_outputs_of))
  }

  optional <- input <- from_file <- name <- NULL    # silence notes from package check.

  assert_that(is.null(stop_before) | is.character(stop_before))
  assert_that(is.null(stop_after) | is.character(stop_after))
  assert_that(is.null(return_inputs_of) | is.character(return_inputs_of))
  assert_that(is.null(return_outputs_of) | is.character(return_outputs_of))
  assert_that(is.null(return_data_names) | is.character(return_data_names))
  assert_that(is.logical(return_data_map_only))
  assert_that(is.logical(write_outputs))
  assert_that(is.logical(write_xml))
  assert_that(is.logical(quiet))

  if(!quiet) cat("GCAM-Korea Data System v", as.character(utils::packageVersion("gcamkordata")), "\n", sep = "")

  #read input list
  chunklist <- find_chunks()
  #write.csv(chunklist,"chunklist.csv")

  if(!quiet) cat("Found", nrow(chunklist), "chunks\n")
  chunkinputs <- chunk_inputs(chunklist$name)

  if(!quiet) cat("Found", nrow(chunkinputs), "chunk data requirements\n")
  chunkoutputs <- chunk_outputs(chunklist$name)

  if(!quiet) cat("Found", nrow(chunkoutputs), "chunk data products\n")

   # write.csv(chunkinputs,"chunkinputs_MEMO.csv")
   # write.csv(chunkoutputs,"chunkoutputs_MEMO.csv")

  # Keep track of chunk inputs for later pruning
  chunkinputs %>%
    group_by(input) %>%
    summarise(n = n()) ->
    chunk_input_counts

  cic <- chunk_input_counts$n
  names(cic) <- chunk_input_counts$input

  warn_data_injects()
  warn_datachunk_bypass()
  warn_mismarked_fileinputs()

  # Outputs should all be unique
  dupes <- duplicated(chunkoutputs$output)
  if(any(dupes)) {
    stop("Outputs appear multiple times: ", chunkoutputs$output[dupes])
  }

  # If there are any unaccounted for input requirements,
  # try to load them from csv files
  unfound_inputs <- filter(chunkinputs, !input %in% chunkoutputs$output)
  if(nrow(unfound_inputs)) {

    # These should all be marked as 'from_file'
    ff <- filter(unfound_inputs, !from_file)
    if(nrow(ff)) {
      stop("Unfound inputs not marked as from file: ", paste(ff$input, collapse = ", "),
           " in ", paste(unique(ff$name), collapse = ", "))
    }

    if(!quiet) cat(nrow(unfound_inputs), "chunk data input(s) not accounted for\n")
  }

  # Extract metadata from the input data; we'll add output metadata as we run
  metadata_info <- list()

  # Initialize some stuff before we start to run the chunks
  if(!missing(stop_before) || !missing(stop_after)) {
    if(!missing(stop_after)) {
      run_chunks <- stop_after
    } else {
      run_chunks <- stop_before
    }
    # calc min list
    name.x <- name.y <- NULL  # silence package check note
    verts <- inner_join(bind_rows(chunkoutputs,
                                  tibble(name = unfound_inputs$input,
                                         output = unfound_inputs$input,
                                         to_xml = FALSE)),
                        chunkinputs, by=c("output" = "input")) %>%
      select(name.x, name.y) %>%
      unique()

    chunks_to_run <- dstrace_chunks(run_chunks, verts)
  }
  else {
    chunks_to_run <- c(unfound_inputs$input, chunklist$name)
  }
  removed_count <- 0
  save_chunkdata(empty_data(), create_dirs = TRUE, write_outputs=write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir) # clear directories

  while(length(chunks_to_run)) {
    nchunks <- length(chunks_to_run)

    # Loop through all chunks and see who can run (i.e. all dependencies are available)
    for(chunk in chunks_to_run) {

      inputs <- filter(chunkinputs, name == chunk)
      input_names <- inputs$input
      required_inputs <- filter(inputs, !optional)
      if(!all(required_inputs$input %in% names(all_data))) {
        next  # chunk's required inputs are not all available
      }

      if(chunk %in% stop_before) {
        chunks_to_run <- character(0)
        break
      }

      if(chunk %in% unfound_inputs$input) {
        unfound_chunk = unfound_inputs[unfound_inputs$input == chunk, ]
        chunk_data <- load_csv_files(unfound_chunk$input, unfound_chunk$optional, quiet = TRUE)
        po <- chunk
        if(return_data_map_only) {
          if(!quiet) cat("Extracting metadata from inputs...\n")
          metadata_info[[chunk]] <- tibbelize_outputs(chunk_data, chunk_name = "INPUT")
        }
      }
      else {

      if(!quiet) print(chunk)

      # Order chunk to build its data
      time1 <- Sys.time()
      chunk_data <- run_chunk(chunk, all_data[input_names])
      tdiff <- as.numeric(difftime(Sys.time(), time1, units = "secs"))
      if(!quiet) print(paste("- make", format(round(tdiff, 2), nsmall = 2)))
      po <- subset(chunkoutputs, name == chunk)$output  # promised outputs

      check_chunk_outputs(chunk, chunk_data, input_names,
                          promised_outputs = po,
                          outputs_xml = subset(chunkoutputs, name == chunk)$to_xml)

      # Save precursor information and other metadata
      if(return_data_map_only) {
        metadata_info[[chunk]] <- tibbelize_outputs(chunk_data, chunk)
      }
      }

      # Add this chunk's data to the global data store
      all_data <- add_data(chunk_data, all_data)

      # If any outputs don't appear in the chunk input list, and aren't requested for return,
      # they can be immediately written out and removed
      prunelist <- !po %in% names(cic) & !po %in% return_data_names
      if(any(prunelist)) {
        save_chunkdata(all_data[po[prunelist]], write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)
        all_data <- remove_data(po[prunelist], all_data)
        removed_count <- removed_count + length(po[prunelist])
      }

      # Decrement the file input count
      cic[input_names] <- cic[input_names] - 1
      # ...and remove if not going to be used again, and not requested for return
      which_zero <- which(cic[input_names] == 0 & !input_names %in% return_data_names)
      save_chunkdata(all_data[names(which_zero)], write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)
      all_data <- remove_data(names(which_zero), all_data)
      removed_count <- removed_count + length(which_zero)

      if(chunk %in% stop_after) {
        chunks_to_run <- character(0)
        break
      }

      # Remove the current chunk from the to-run list
      chunks_to_run <- chunks_to_run[chunks_to_run != chunk]
    } # for

    # We have to be able to run >=1 chunk every loop iteration
    if(length(chunks_to_run) == nchunks) {
      stop("No chunks were run--we are stuck")
    }
  } # while

  # Finish up: write outputs, determine return data format

  if(!quiet && (write_outputs || write_xml)) cat("Writing chunk data...\n")
  save_chunkdata(all_data, write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)

  if(return_data_map_only) {
    if(!quiet) cat("Returning data map.\n")
    x <- bind_rows(metadata_info)
  } else {
    all_data <- all_data[return_data_names]

    if(!quiet && length(all_data) > 0) cat("Returning", length(all_data), "tibbles.\n")
    x <- all_data[return_data_names]
  }
  if(!quiet) cat("All done.\n")
  invisible(x)
}


#' warn_data_injects
#'
#' Check whether chunks are using any temporary (old data system data) 'injected' inputs.
#'
#' @return Number of temporary data objects being used inappropriately.
warn_data_injects <- function() {

  name <- input <- base_input <- upstream_chunk <- output <- NULL # silence package check

  # Are any chunks are using temp-data-inject data that are also available to them through the data system?
  ci <- chunk_inputs(find_chunks(include_disabled = FALSE)$name)
  chunk_outputs(find_chunks(include_disabled = FALSE)$name) %>%
    rename(upstream_chunk = name) ->
    co

  # Look for TEMP_DATA_INJECT pattern in the chunk input list
  ci %>%
    filter(grepl(TEMP_DATA_INJECT, input)) %>%
    mutate(base_input = basename(input)) %>%
    # Look for any tdi inputs that appear in the enabled chunks' outputs
    filter(base_input %in% co$output) %>%
    left_join(select(co, upstream_chunk, output), by = c("base_input" = "output")) ->
    ci_tdi

  # Print messages
  for(i in seq_len(nrow(ci_tdi))) {
    message("NOTE: chunk ", ci_tdi$name[i], " reads `", ci_tdi$input[i], "`\n\tbut this is available from ", ci_tdi$upstream_chunk[i])
  }
  nrow(ci_tdi)
}

#' warn_mismarked_fileinputs
#'
#' Look for mislabeled chunk file inputs
#'
#' @return Number of data chunk objects marked as \code{from_file} but without a directory path.
warn_mismarked_fileinputs <- function() {

  from_file <- input <- NULL  # silence package check

  # Every input marked as from a file should have some directory structure in its name
  chunk_inputs() %>%
    filter(from_file, input == basename(input)) ->
    mismarked

  # Print messages
  for(i in seq_len(nrow(mismarked))) {
    message("NOTE: chunk ", mismarked$name[i], " file input `", mismarked$input[i], "` doesn't appear to be a file")
  }
  nrow(mismarked)
}

#' warn_datachunk_bypass
#'
#' Check whether chunks are bypassing a data chunk
#'
#' @details Data chunks read one or more particular files that are quirky: typically, files that generate
#' warnings or errors when read using the default \code{\link{get_data}} routine, but also ones that need
#' some special preprocessing. We look for chunks that read a file
#' @return Number of data chunk objects being read directly.
warn_datachunk_bypass <- function() {

  from_file <- input <- NULL  # silence package check

  ci <- chunk_inputs()
  co <- chunk_outputs()
  ci %>%
    filter(from_file,
           basename(input) %in% co$output) %>%
    mutate(baseinput = basename(input)) %>%
    # The only time that chunks should be asking for a file whose name appears as another
    # chunk's outputs is if it's ALSO an output of that chunk--this will be a prebuilt dataset
    anti_join(co, c("name", "baseinput" = "output")) ->
    bypassing

  # Print messages
  for(i in seq_len(nrow(bypassing))) {
    message("NOTE: chunk ", bypassing$name[i], " reads `", bypassing$input[i], "` but this is handled by a data chunk")
  }
  nrow(bypassing)
}
rohmin9122/gcam-korea-release documentation built on Nov. 26, 2020, 8:11 a.m.