R/driver.R

Defines functions driver_drake driver tibbelize_outputs check_chunk_outputs run_chunk

Documented in check_chunk_outputs driver driver_drake run_chunk tibbelize_outputs

# Copyright 2019 Battelle Memorial Institute; see the LICENSE file.

# driver.R


TEMP_DATA_INJECT <- "temp-data-inject/"

#' run_chunk
#'
#' @param chunk Chunk name, character
#' @param all_data Data required by chunk
#' @return Data generated by chunk
#' @details Putting this \code{do.call} into this one-line
#' function lets us mock it when testing.
run_chunk <- function(chunk, all_data) {
  do.call(chunk, list(driver.MAKE, all_data))
}


#' check_chunk_outputs
#'
#' Checks chunk outputs for a variety of problems: correct structure,
#' correct attributes attached, matches promised outputs. Generates
#' warnings and/or errors if any deviance.
#'
#' @param chunk Chunk name, character
#' @param chunk_data Data produced by chunk, generally a tibble
#' @param chunk_inputs Names of chunk inputs, character
#' @param promised_outputs Names of chunk's promised outputs, character
#' @param outputs_xml Logical vector: are outputs XML?
check_chunk_outputs <- function(chunk, chunk_data, chunk_inputs, promised_outputs, outputs_xml) {
  assert_that(is_data_list(chunk_data))

  # Check that the chunk has provided required data for all objects
  empty_precursors <- TRUE
  pc_all <- c()
  for(obj in names(chunk_data)) {
    obj_flags <- get_flags(chunk_data[[obj]])
    # Chunks have to returns tibbles, unless they're tagged as being XML
    if(!outputs_xml[which(obj == promised_outputs)]) {
      assert_that(tibble::is_tibble(chunk_data[[obj]]), msg = paste(obj, "is", class(obj)))
      assert_that(! FLAG_XML %in% obj_flags, msg = obj)

      # Make sure objects have required attributes
      for(at in c(ATTR_TITLE, ATTR_UNITS, ATTR_COMMENTS)) {
        if(is.null(attr(chunk_data[[obj]], at))) {
          warning("No '", at, "' attached to ", obj, " - chunk ", chunk)
        }
      }

      # Any 'year' column should be numeric - this is a frequent source of join problems
      if("year" %in% names(chunk_data[[obj]]) && !is.numeric(chunk_data[[obj]]$year)) {
        warning("'year' column not numeric in ", obj)
      }

      # If there's any 'logit.*' column then there HAS to be a 'logit.type' column (see #899)
      if(any(grepl("^logit\\.", names(chunk_data[[obj]]))) &&
         !(LOGIT_TYPE_COLNAME %in% names(chunk_data[[obj]]))) {
        warning("No ", LOGIT_TYPE_COLNAME, " column in ", obj)
      }

    } else {
      assert_that(FLAG_XML %in% obj_flags)
    }
    # Data precursors should all appear in input list
    pc <- attr(chunk_data[[obj]], ATTR_PRECURSORS)
    pc_all <- c(pc_all, pc)
    empty_precursors <- empty_precursors & is.null(pc)
    # Remove data.USER_MOD_POSTFIX from input names, as these won't be present in precursors
    matches <- pc %in% c(gsub(data.USER_MOD_POSTFIX, '', chunk_inputs), promised_outputs)
    if(!all(matches)) {
      stop("Some precursors for '", obj, "' aren't inputs - chunk ", chunk)
    }
    if(obj %in% pc) {
      stop("Precursors for '", obj, "' include itself - chunk ", chunk)
    }
  }

  # Every input should be a precursor for something
  if(!all(gsub(data.USER_MOD_POSTFIX, '', chunk_inputs) %in% pc_all)) {
    message("Inputs ", paste(dplyr::setdiff(chunk_inputs, pc_all), collapse = ", "),
            " don't appear as precursors for any outputs - chunk ", chunk)
  }

  # If chunk has inputs, some output should have a precursor
  if(empty_precursors & length(chunk_inputs)) {
    stop("No output precursors, but there are inputs - chunk ", chunk)
  }
  # Chunk should have returned EXACTLY what it promised
  if(!identical(sort(names(chunk_data)), sort(promised_outputs))) {
    stop("Chunk ", chunk, "is not returning what it promised!")
  }
}


#' tibbelize_outputs
#'
#' Extract precursor and other metadata from chunk output data and convert to tibble form.
#'
#' @param chunk_data List of chunk outputs, a data list (see \code{\link{is_data_list}})
#' @param chunk_name Name of current chunk, character
#' @return A tibble with chunk name, output name, title, units, flags, precursors, and comments.
#' This table has one row per output name; multiple flags, precursors, etc., are concatenated into single entries.
tibbelize_outputs <- function(chunk_data, chunk_name) {
  assert_that(is.character(chunk_name))
  assert_that(is_data_list(chunk_data))

  metadata <- list()
  for(cd in names(chunk_data)) {
    if(!is.null(chunk_data[[cd]]) & length(chunk_data[[cd]])) {
      # Here we use paste both to collapse vectors into a single string, and deal with possible NULLs
      metadata[[cd]] <- tibble(name = chunk_name,
                               output = cd,
                               precursors = paste(get_precursors(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               title = paste(get_title(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               units = paste(get_units(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               comments = paste(get_comments(chunk_data[[cd]]), collapse = data.SEPARATOR),
                               flags = paste(get_flags(chunk_data[[cd]]), collapse = data.SEPARATOR))
    }
  }
  bind_rows(metadata)
}


#' driver
#'
#' Run the entire data system.
#'
#' @param all_data Data to be pre-loaded into data system
#' @param quiet Suppress output?
#' @param stop_before Stop immediately before this chunk (character)
#' @param stop_after Stop immediately after this chunk  (character)
#' @param return_inputs_of Return the data objects that are inputs for these chunks (character).
#' If \code{stop_before} is specified, by default that chunk's inputs are returned
#' @param return_outputs_of Return the data objects that are output from these chunks (character)
#' If \code{stop_after} is specified, by default that chunk's outputs are returned
#' @param return_data_names Return these data objects (character). By default this is the union of \code{return_inputs_of} and \code{return_outputs_of}
#' @param return_data_map_only Return only the precursor information? (logical) This overrides
#' the other \code{return_*} parameters above
#' @param write_outputs Write all chunk outputs to disk?
#' @param write_xml Write XML Batch chunk outputs to disk?
#' @param outdir Location to write output data (ignored if \code{write_outputs} is \code{FALSE})
#' @param xmldir Location to write output XML (ignored if \code{write_outputs} is \code{FALSE})
#' @param user_modifications A list of function names which implement a user mod chunk. See vignettes/usermod_vignette.Rmd for more details and examples.
#' @param xml_suffix A suffix to be appended at the end of all XML file name if not null.  Such a feature is
#' useful when using \code{user_modifications} to generate alternative scenarios.
#' @return A list of all built data (or a data map tibble if requested).
#' @details The driver loads any necessary data from input files,
#' runs all code chunks in an order dictated by their dependencies,
#' does error-checking, and writes outputs. For more details, see
#' the relevant wiki page at \url{ https://github.com/bpbond/gcamdata/wiki/Driver}.
#' @importFrom magrittr "%>%"
#' @importFrom assertthat assert_that
#' @importFrom dplyr bind_rows filter group_by inner_join select summarise
#' @export
#' @author BBL
driver <- function(all_data = empty_data(),
                   stop_before = NULL, stop_after = NULL,
                   return_inputs_of = stop_before,
                   return_outputs_of = stop_after,
                   return_data_names = union(inputs_of(return_inputs_of),
                                             outputs_of(return_outputs_of)),
                   return_data_map_only = FALSE,
                   write_outputs = !return_data_map_only,
                   write_xml = write_outputs,
                   outdir = OUTPUTS_DIR, xmldir = XML_DIR,
                   quiet = FALSE,
                   user_modifications = NULL,
                   xml_suffix = NULL) {

  # If users ask to stop after a chunk, but also specify they want particular inputs,
  # or if they ask to stop before a chunk, while asking for outputs, that's confusing.
  if(missing(return_outputs_of) && !missing(return_inputs_of) && !missing(stop_after)) {
    return_outputs_of <- NULL  # in this case don't want default of stop_before
  }
  if(missing(return_inputs_of) && !missing(return_outputs_of) && !missing(stop_before)) {
    return_inputs_of <- NULL  # in this case don't want default of stop_after
  }
  if(missing(return_data_names)) {
    return_data_names <- union(inputs_of(return_inputs_of), outputs_of(return_outputs_of))
  }

  optional <- input <- from_file <- name <- NULL    # silence notes from package check.

  assert_that(is.null(stop_before) | is.character(stop_before))
  assert_that(is.null(stop_after) | is.character(stop_after))
  assert_that(is.null(return_inputs_of) | is.character(return_inputs_of))
  assert_that(is.null(return_outputs_of) | is.character(return_outputs_of))
  assert_that(is.null(return_data_names) | is.character(return_data_names))
  assert_that(is.logical(return_data_map_only))
  assert_that(is.logical(write_outputs))
  assert_that(is.logical(write_xml))
  assert_that(is.logical(quiet))

  # we need to use package data to set this in effect in such a way that drake does not notice
  # and think all XML files need to be rebuilt with the suffix
  if (!is.null(xml_suffix)){
    xml.XML_SUFFIX <<- xml_suffix
  }
  if(!is.null(user_modifications) && is.null(xml_suffix)) {
    warning("It is highly reccommended to utilize `xml_suffix` to distinguish XML inputs derived from `user_modifications`")
    }

  if(!quiet) cat("GCAM Data System v", as.character(utils::packageVersion("gcamdata")), "\n", sep = "")

  chunklist <- find_chunks()
  if(!quiet) cat("Found", nrow(chunklist), "chunks\n")
  chunkinputs <- chunk_inputs(chunklist$name)
  if(!quiet) cat("Found", nrow(chunkinputs), "chunk data requirements\n")
  chunkoutputs <- chunk_outputs(chunklist$name)
  if(!quiet) cat("Found", nrow(chunkoutputs), "chunk data products\n")

  # check if any user chunks are set in which case we need to adjust the
  # chunklist/chunkinputs/chunkoutputs to shim the user chunk in place
  if(!is.null(user_modifications)) {
    # a user modification chunk uses a special command: driver.DECLARE_MODIFY
    # to indicate which ds "objects" it wants to modify
    # that chunk will then require those objects as inputs AND produce those
    # objects as output
    # these chunks will also be allowed to specify regular driver.DECLARE_INPUTS
    # which will be passed as input but NOT output back out

    # in order to shim the user modification in we adjust all chunk_inputs
    # for any object to be modified to input the object with the constant
    # data.USER_MOD_POSTFIX appended to the end instead
    # and the user chunk will input the original object and output the appended
    # data name

    # first get a list of all objects that are to be modified
    lapply(user_modifications, chunk_inputs, driver.DECLARE_MODIFY) %>%
      bind_rows() %>%
      # generate the mod data name by appending data.USER_MOD_POSTFIX
      mutate(data_mod = paste0(input, data.USER_MOD_POSTFIX)) ->
      modify_table

    # adjust chunkinputs so chunks that require as input any object that is
    # to be modified will now input the data.USER_MOD_POSTFIX appended name
    chunkinputs %>%
      left_join(select(modify_table, input, data_mod), by = c("input")) %>%
      mutate(input = if_else(is.na(data_mod), input, data_mod),
             from_file = if_else(is.na(data_mod), from_file, FALSE)) %>%
      select(-data_mod) %>%
      # add on the input requirements for the user mod chunk which are the
      # original object names as well as any other driver.DECLARE_INPUTS they
      # require
      bind_rows(select(modify_table, -data_mod),
                lapply(user_modifications, chunk_inputs, driver.DECLARE_INPUTS)) ->
      chunkinputs

    # add in the outputs from the user mod chunks which are the modified object names
    # appended with data.USER_MOD_POSTFIX
    lapply(user_modifications, chunk_outputs, driver.DECLARE_MODIFY) %>%
      bind_rows() %>%
      mutate(output = paste0(output, data.USER_MOD_POSTFIX)) %>%
      bind_rows(chunkoutputs) ->
      chunkoutputs

    # now we just need to add the user mod chunks to the chunklist
    bind_rows(chunklist,
              tibble(name = user_modifications,
                     module = "user",
                     chunk = user_modifications,
                     disabled = FALSE)) ->
      chunklist
  }

  # Keep track of chunk inputs for later pruning
  chunkinputs %>%
    group_by(input) %>%
    summarise(n = dplyr::n()) ->
    chunk_input_counts
  cic <- chunk_input_counts$n
  names(cic) <- chunk_input_counts$input

  warn_data_injects()
  warn_datachunk_bypass()
  warn_mismarked_fileinputs()

  # Outputs should all be unique
  dupes <- duplicated(chunkoutputs$output)
  if(any(dupes)) {
    stop("Outputs appear multiple times: ", chunkoutputs$output[dupes])
  }

  # If there are any unaccounted for input requirements,
  # try to load them from csv files
  unfound_inputs <- filter(chunkinputs, !input %in% chunkoutputs$output)
  if(nrow(unfound_inputs)) {

    # These should all be marked as 'from_file'
    ff <- filter(unfound_inputs, !from_file)
    if(nrow(ff)) {
      stop("Unfound inputs not marked as from file: ", paste(ff$input, collapse = ", "),
           " in ", paste(unique(ff$name), collapse = ", "))
    }

    if(!quiet) cat(nrow(unfound_inputs), "chunk data input(s) not accounted for\n")
  }

  # Extract metadata from the input data; we'll add output metadata as we run
  metadata_info <- list()

  # Initialize some stuff before we start to run the chunks
  if(!missing(stop_before) || !missing(stop_after)) {
    if(!missing(stop_after)) {
      run_chunks <- stop_after
    } else {
      run_chunks <- stop_before
    }
    # calc min list
    name.x <- name.y <- NULL  # silence package check note
    verts <- inner_join(bind_rows(chunkoutputs,
                                  tibble(name = unfound_inputs$input,
                                         output = unfound_inputs$input,
                                         to_xml = FALSE)),
                        chunkinputs, by=c("output" = "input")) %>%
      select(name.x, name.y) %>%
      unique()

    chunks_to_run <- dstrace_chunks(run_chunks, verts)
  }
  else {
    chunks_to_run <- c(unfound_inputs$input, chunklist$name)
  }
  removed_count <- 0
  save_chunkdata(empty_data(), create_dirs = TRUE, write_outputs=write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir) # clear directories

  while(length(chunks_to_run)) {
    nchunks <- length(chunks_to_run)
    if(!quiet) {
      message("Chunks left: ", nchunks)
    }

    # Loop through all chunks and see who can run (i.e. all dependencies are available)
    for(chunk in chunks_to_run) {

      inputs <- filter(chunkinputs, name == chunk)
      input_names <- inputs$input
      required_inputs <- filter(inputs, !optional)
      if(!all(required_inputs$input %in% names(all_data))) {
        next  # chunk's required inputs are not all available
      }

      if(chunk %in% stop_before) {
        chunks_to_run <- character(0)
        break
      }

      if(chunk %in% unfound_inputs$input) {
        unfound_chunk = unfound_inputs[unfound_inputs$input == chunk, ]
        chunk_data <- load_csv_files(unfound_chunk$input, unfound_chunk$optional, quiet = TRUE)
        po <- chunk
        if(return_data_map_only) {
          if(!quiet) cat("Extracting metadata from inputs...\n")
          metadata_info[[chunk]] <- tibbelize_outputs(chunk_data, chunk_name = "INPUT")
        }
      }
      else {

      if(!quiet) print(chunk)

      # Order chunk to build its data
      time1 <- Sys.time()
      chunk_data <- run_chunk(chunk, all_data[input_names])
      tdiff <- as.numeric(difftime(Sys.time(), time1, units = "secs"))
      if(!quiet) print(paste("- make", format(round(tdiff, 2), nsmall = 2)))
      po <- subset(chunkoutputs, name == chunk)$output  # promised outputs

      check_chunk_outputs(chunk, chunk_data, input_names,
                          promised_outputs = po,
                          outputs_xml = subset(chunkoutputs, name == chunk)$to_xml)

      # Save precursor information and other metadata
      if(return_data_map_only) {
        metadata_info[[chunk]] <- tibbelize_outputs(chunk_data, chunk)
      }
      }

      # Add this chunk's data to the global data store
      all_data <- add_data(chunk_data, all_data)

      # If any outputs don't appear in the chunk input list, and aren't requested for return,
      # they can be immediately written out and removed
      prunelist <- !po %in% names(cic) & !po %in% return_data_names
      if(any(prunelist)) {
        save_chunkdata(all_data[po[prunelist]], write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)
        all_data <- remove_data(po[prunelist], all_data)
        removed_count <- removed_count + length(po[prunelist])
      }

      # Decrement the file input count
      cic[input_names] <- cic[input_names] - 1
      # ...and remove if not going to be used again, and not requested for return
      which_zero <- which(cic[input_names] == 0 & !input_names %in% return_data_names)
      save_chunkdata(all_data[names(which_zero)], write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)
      all_data <- remove_data(names(which_zero), all_data)
      removed_count <- removed_count + length(which_zero)

      if(chunk %in% stop_after) {
        chunks_to_run <- character(0)
        break
      }

      # Remove the current chunk from the to-run list
      chunks_to_run <- chunks_to_run[chunks_to_run != chunk]
    } # for

    # We have to be able to run >=1 chunk every loop iteration
    if(length(chunks_to_run) == nchunks) {
      stop("No chunks were run--we are stuck")
    }
  } # while

  # Finish up: write outputs, determine return data format

  if(!quiet && (write_outputs || write_xml)) cat("Writing chunk data...\n")
  save_chunkdata(all_data, write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)

  if(return_data_map_only) {
    if(!quiet) cat("Returning data map.\n")
    x <- bind_rows(metadata_info)
  } else {
    all_data <- all_data[return_data_names]

    if(!quiet && length(all_data) > 0) cat("Returning", length(all_data), "tibbles.\n")
    x <- all_data[return_data_names]
  }
  if(!quiet) cat("All done.\n")
  invisible(x)
}

#' driver_drake
#'
#' Run the entire data system using drake to manage the process which
#' gives us a number of features, most notably the "make" capabilities
#' so that subsequent invocations only need to rebuild targets that
#' have actually changed.
#'
#' The interface mostly mimics that of \code{driver} with the exception
#' that writing outputs is not optional since drake needs to include all
#' outputs in the cache as a matter of course.  In addition we allow users
#' to pass any additional arguments to \code{driver_drake} which will be
#' forwarded on to \code{drake::make}.
#'
#' @param stop_before Stop immediately before this chunk (character)
#' @param stop_after Stop immediately after this chunk  (character)
#' @param return_inputs_of Return the data objects that are inputs for these chunks (character).
#' If \code{stop_before} is specified, by default that chunk's inputs are returned
#' @param return_outputs_of Return the data objects that are output from these chunks (character)
#' If \code{stop_after} is specified, by default that chunk's outputs are returned
#' @param return_data_names Return these data objects (character). By default this is the union of \code{return_inputs_of} and \code{return_outputs_of}
#' @param return_data_map_only Return only the precursor information? (logical) This overrides
#' the other \code{return_*} parameters above
#' @param return_plan_only Return only the drake plan (logical)
#' @param write_xml Write XML Batch chunk outputs to disk?
#' @param xmldir Location to write output XML (ignored if \code{write_outputs} is \code{FALSE})
#' @param quiet Suppress output?
#' @param user_modifications A list of function names which implement a user mod chunk. See vignettes/usermod_vignette.Rmd for more details and examples.
#' @param xml_suffix A suffix to be appended at the end of all XML file name if not null.  Such a feature is
#' useful when using \code{user_modifications} to generate alternative scenarios.
#' @param ... Additional arguments to be forwarded on to \code{make}
#' @return A list of all built data (or a data map tibble if requested).
#' @importFrom magrittr "%>%"
#' @importFrom assertthat assert_that
#' @importFrom dplyr bind_rows filter group_by inner_join select summarise
#' @export
driver_drake <- function(
  stop_before = NULL,
  stop_after = NULL,
  return_inputs_of = stop_before,
  return_outputs_of = stop_after,
  return_data_names = union(inputs_of(return_inputs_of),
                            outputs_of(return_outputs_of)),
  return_data_map_only = FALSE,
  return_plan_only = FALSE,
  write_xml = !return_data_map_only,
  xmldir = XML_DIR,
  quiet = FALSE,
  user_modifications = NULL,
  xml_suffix = NULL,
  ...){


  # We merely suggest drake as we can still run the data system via driver
  # with out it.  Ensure we have it before proceeding.
  if(!requireNamespace('drake')) {
    stop("The `drake` package is required to run `driver_drake()`.  Either install it or fall back to `driver()`")
  }

  # If users ask to stop after a chunk, but also specify they want particular inputs,
  # or if they ask to stop before a chunk, while asking for outputs, that's confusing.
  if(missing(return_outputs_of) && !missing(return_inputs_of) && !missing(stop_after)) {
    return_outputs_of <- NULL  # in this case don't want default of stop_before
  }
  if(missing(return_inputs_of) && !missing(return_outputs_of) && !missing(stop_before)) {
    return_inputs_of <- NULL  # in this case don't want default of stop_after
  }
  if(missing(return_data_names)) {
    return_data_names <- union(inputs_of(return_inputs_of), outputs_of(return_outputs_of))
  }

  optional <- input <- from_file <- name <- to_xml <-  NULL    # silence notes from package check.

  assert_that(is.null(stop_before) | is.character(stop_before))
  assert_that(is.null(stop_after) | is.character(stop_after))
  assert_that(is.null(return_inputs_of) | is.character(return_inputs_of))
  assert_that(is.null(return_outputs_of) | is.character(return_outputs_of))
  assert_that(is.null(return_data_names) | is.character(return_data_names))
  assert_that(is.logical(return_data_map_only))
  assert_that(is.logical(return_plan_only))
  assert_that(is.logical(write_xml))
  assert_that(is.logical(quiet))

  # we need to use package data to set this in effect in such a way that drake does not notice
  # and think all XML files need to be rebuilt with the suffix
  if (!is.null(xml_suffix)){
    xml.XML_SUFFIX <<- xml_suffix
  }
  if(!is.null(user_modifications) && is.null(xml_suffix)) {
    warning("It is highly reccommended to utilize `xml_suffix` to distinguish XML inputs derived from `user_modifications`")
  }

  if(return_plan_only) {
    assert_that(!return_data_map_only)
  }

  if(!quiet) message("GCAM Data System v", as.character(utils::packageVersion("gcamdata")), sep = "")

  chunklist <- find_chunks()
  if(!quiet) message("Found ", nrow(chunklist), " chunks")
  chunkinputs <- chunk_inputs(chunklist$name)
  if(!quiet) message("Found ", nrow(chunkinputs), " chunk data requirements")
  chunkoutputs <- chunk_outputs(chunklist$name)
  if(!quiet) message("Found ", nrow(chunkoutputs), " chunk data products")

  # check if any user chunks are set in which case we need to adjust the
  # chunklist/chunkinputs/chunkoutputs to shim the user chunk in place
  if(!is.null(user_modifications)) {
    # a user modification chunk uses a special command: driver.DECLARE_MODIFY
    # to indicate which ds "objects" it wants to modify
    # that chunk will then require those objects as inputs AND produce those
    # objects as output
    # these chunks will also be allowed to specify regular driver.DECLARE_INPUTS
    # which will be passed as input but NOT output back out

    # in order to shim the user modification in we adjust all chunk_inputs
    # for any object to be modified to input the object with the constant
    # data.USER_MOD_POSTFIX appended to the end instead
    # and the user chunk will input the original object and output the appended
    # data name

    # first get a list of all objects that are to be modified
    lapply(user_modifications, chunk_inputs, driver.DECLARE_MODIFY) %>%
      bind_rows() %>%
      # generate the mod data name by appending data.USER_MOD_POSTFIX
      mutate(data_mod = paste0(input, data.USER_MOD_POSTFIX)) ->
      modify_table

    # adjust chunkinputs so chunks that require as input any object that is
    # to be modified will now input the data.USER_MOD_POSTFIX appended name
    chunkinputs %>%
      left_join(select(modify_table, input, data_mod), by = c("input")) %>%
      mutate(input = if_else(is.na(data_mod), input, data_mod),
             from_file = if_else(is.na(data_mod), from_file, FALSE)) %>%
      select(-data_mod) %>%
      # add on the input requirements for the user mod chunk which are the
      # original object names as well as any other driver.DECLARE_INPUTS they
      # require
      bind_rows(select(modify_table, -data_mod),
                lapply(user_modifications, chunk_inputs, driver.DECLARE_INPUTS)) ->
      chunkinputs

    # add in the outputs from the user mod chunks which are the modify object names
    # appended with data.USER_MOD_POSTFIX
    lapply(user_modifications, chunk_outputs, driver.DECLARE_MODIFY) %>%
      bind_rows() %>%
      mutate(output = paste0(output, data.USER_MOD_POSTFIX)) %>%
      bind_rows(chunkoutputs) ->
      chunkoutputs

    # now we just need to add the user mod chunks to the chunklist
    bind_rows(chunklist,
              tibble(name = user_modifications,
                     module = "user",
                     chunk = user_modifications,
                     disabled = FALSE)) ->
      chunklist
  }

  # Keep track of chunk inputs for later pruning
  chunkinputs %>%
    group_by(input) %>%
    summarise(n = dplyr::n()) ->
    chunk_input_counts
  cic <- chunk_input_counts$n
  names(cic) <- chunk_input_counts$input

  warn_data_injects()
  warn_datachunk_bypass()
  warn_mismarked_fileinputs()

  # Outputs should all be unique
  dupes <- duplicated(chunkoutputs$output)
  if(any(dupes)) {
    stop("Outputs appear multiple times: ", chunkoutputs$output[dupes])
  }

  # If there are any unaccounted for input requirements,
  # try to load them from csv files
  unfound_inputs <- filter(chunkinputs, !input %in% chunkoutputs$output)
  if(nrow(unfound_inputs)) {

    # These should all be marked as 'from_file'
    ff <- filter(unfound_inputs, !from_file)
    if(nrow(ff)) {
      stop("Unfound inputs not marked as from file: ", paste(ff$input, collapse = ", "),
           " in ", paste(unique(ff$name), collapse = ", "))
    }

    if(!quiet) message(nrow(unfound_inputs), " chunk data input(s) not accounted for")
  }

  # Extract metadata from the input data; we'll add output metadata as we run
  metadata_info <- list()

  # Initialize some stuff before we start to run the chunks
   if(!missing(stop_before) || !missing(stop_after)) {
     if(!missing(stop_after)) {
       run_chunks <- stop_after
     } else {
       run_chunks <- stop_before
     }
     # calc min list
     name.x <- name.y <- NULL  # silence package check note
     verts <- inner_join(bind_rows(chunkoutputs,
                                   tibble(name = unfound_inputs$input,
                                          output = unfound_inputs$input,
                                          to_xml = FALSE)),
                         chunkinputs, by=c("output" = "input")) %>%
       select(name.x, name.y) %>%
       unique()

     chunks_to_run <- dstrace_chunks(run_chunks, verts)
   }
   else {
    chunks_to_run <- c(unfound_inputs$input, chunklist$name)
  }

  dir.create(xmldir, showWarnings = FALSE, recursive = TRUE)

  # Loop over each chunk and add a target for it and the command to build it
  # as appropriate for if it is just loading a FILE or running an actual chunk.
  target <- c()
  command <- c()
  for(chunk in chunks_to_run) {

    inputs <- filter(chunkinputs, name == chunk)
    input_names <- inputs$input

    # chunk is "unfound" aka loaded from file
    if(chunk %in% unfound_inputs$input) {
      # get the file details including if it was optional and the actual file name
      unfound_chunk = unfound_inputs[unfound_inputs$input == chunk, ]
      optional <- all(unfound_chunk$optional)
      fqfn <- find_csv_file(chunk, optional, quiet = TRUE)
      # add the chunk to the target list
      target <- c(target, make.names(chunk))
      if(is.null(fqfn)) {
        assert_that(optional)
        # In the case of optional missing data just set it to missing with command:
        # `target <- list( chunk = missing_data() )`
        command <- c(command, paste0("list('", chunk, "' = missing_data())"))
      }
      else {
        # We have a file to load so generate the load_csv_files command:
        # `target <- load_csv_files( chunk, optional, quiet = TRUE, dummy = file_in(fqfn) )`
        # Note the dummy = file_in is a work around so that we can signal to drake that
        # target is actually coming from a file in the file system since the chunk names
        # as we use them in gcamdata would not themselves be sufficient to tell drake where
        # to find it.
        command <- c(command, paste0("load_csv_files('", chunk, "', ", optional, ", quiet = TRUE, dummy = file_in('", fqfn, "'))"))
      }
    }
    else {
      po <- subset(chunkoutputs, name == chunk)$output  # promised outputs
      if(length(po) == 0) {
        # chunks may get disabled due to configuration options in constants.R
        # so if they are not currently configured to return any outputs we can just
        # skip it
        next
      }
      # add the chunk to the target list
      target <- c(target, chunk)
      # Generate the command to run the chunk as:
      # `target <- gcamdata:::chunk( "MAKE", c(input1, input2, inputN) )`
      # Note we bypass run_chunk here otherwise drake isn't able to associate
      # the source code for the chunk with the command and the "make" functionality
      # would break.
      # Also note we explicitly list just the inputs required for the chunk which is
      # different than in driver where we give `all_data`, again this is for drake so it
      # can match up target names to commands and develop the dependencies between them.
      nsprefix <- if_else(chunk %in% user_modifications, "", "gcamdata:::")
      command <- c(command, paste0(nsprefix, chunk, "('", driver.MAKE, "', c(", paste(make.names(input_names), collapse = ","), "))"))

      # A chunk should in principle generate many output targets however drake assumes
      # one target per command.  We get around this by unpacking the list of outputs
      # from a chunk as an explicit target+command:
      # ```
      # output1 <- chunk["output1"]
      # output2 <- chunk["output2"]
      # outputN <- chunk["outputN"]
      #```
      # The downside is data is likely to be duplicated in the cache.
      target <- c(target, make.names(po))
      command <- c(command, paste(chunk, '["', po, '"]', sep = ""))

      # We need to seperate out XML outputs so that we can add commands
      # to actually run the XML conversion and write out the gcam inputs
      po_xml <- subset(chunkoutputs, name == chunk & to_xml)$output
      if(write_xml && length(po_xml) > 0) {
        # Add the xmldir to the XML output name and include those in the
        # target list.
        po_xml_path = file.path(xmldir, po_xml) %>% gsub("/{2,}", "/", .)# Don't want multiple consecutive slashes, as drake views that as separate object
        target <- c(target, make.names(po_xml_path))
        # Generate the command to run the XML conversion:
        # `xml/out1.xml <- run_xml_conversion(set_xml_file_helper(out1.xml, file_out("xml/out1.xml")))`
        # Note, the `file_out()` wrapper notifies drake the XML file is an output
        # of this plan and allows it to know to re-produce missing/altered XML files
        command <- c(command, paste0("run_xml_conversion(set_xml_file_helper(", po_xml, "[[1]],
             file_out('", po_xml_path, "')))"))
      }
    }


  } # for



  # At this point we can pull the target and command list together
  # as a "plan" tibble which can be passed to drake::make.
  # We may have duplicate targets at this point from loading files
  # which drake will not allow.  We can "summarize" with unique
  # to collapase those while still maintaining some error checking
  # in case we somehow specified different commands for them.
  gcamdata_plan <- tibble(target = target, command = command) %>%
    group_by(target) %>%
    summarise(command = unique(command)) %>%
    ungroup()

  # Create separate plan for constants
  # Doing this allows us to build constants first
  # If it is part of gcamdata_plan, it will be the last to build, so if there are any errors,
  # everything will get re-written, which is unnecessary
  constants_plan <- tibble(target = "constants", command = "file_in('R/constants.R')")

  # Separate plan for prebuilt_data -- we will track each name and clear the cache for it's chunk if
  # there are any changes
  prebuilt_data_plan <- tibble(target = names(PREBUILT_DATA)) %>%
    mutate(command = paste0("gcamdata::PREBUILT_DATA[['", target, "']]"),
           target = paste0("PREBUILT_", target))
  # Still bind to gcamdata_plan so that the first time drake is used, constants will be cached
  gcamdata_plan <- bind_rows(constants_plan, prebuilt_data_plan, gcamdata_plan)

  # Have drake figure out what needs to be done and do it!
  # Any additional arguments given are passed directly on to make
 if(!return_plan_only){
   # Clean if the cache exists and constants is out of date
   if (!is.null(drake::find_cache()) & length(drake::outdated(constants_plan, do_prework = F)) > 0){
     if(!quiet) message("NOTE: constants.R has been changed, all cached data will be removed and the data system will be re-run")
      drake::clean()
      # Make here in case there is an error making gcamdata_plan, in which case, constants might not
      # have been made yet, and then we'd clear the cache unnecessarily
      drake::make(constants_plan, ...)
   } else if (!is.null(drake::find_cache()) & length(drake::outdated(prebuilt_data_plan, do_prework = F)) > 0){
     # Clean specific chunks if the cache exists and prebuilt_data is out of date
     if(!quiet) message("NOTE: PREBUILT_DATA has been changed, the relevant cached data will be removed")
     for (nm in drake::outdated(prebuilt_data_plan, do_prework = F)){
        nm_fix <- gsub("PREBUILT_", "", nm)
        chunk_to_clear <- filter(gcamdata_plan, target == nm_fix)$command
        chunk_to_clear <- gsub('\\[.*$', '', chunk_to_clear)
      if (length(chunk_to_clear) == 1){
        drake::clean(chunk_to_clear)
      }
     }
     # Make here in case there is an error making gcamdata_plan, in which case, the prebuilt data might not
     # have been made yet, and then we'd clear the cache unnecessarily
     drake::make(prebuilt_data_plan, ...)
   }
     drake::make(gcamdata_plan, ...)
 }

  if(return_data_map_only) {
    # user requested data map only so create it from the cache
    x <- create_datamap_from_cache(gcamdata_plan, ...)
  }
  else if (return_plan_only){
    x <- gcamdata_plan
  }
  else {
    # return any requested data by loading it from the cache
    x <- load_from_cache(return_data_names, ...)
  }


  if(!quiet) cat("All done.\n")

  invisible(x)
}

#' warn_data_injects
#'
#' Check whether chunks are using any temporary (old data system data) 'injected' inputs.
#'
#' @return Number of temporary data objects being used inappropriately.
warn_data_injects <- function() {

  name <- input <- base_input <- upstream_chunk <- output <- NULL # silence package check

  # Are any chunks are using temp-data-inject data that are also available to them through the data system?
  ci <- chunk_inputs(find_chunks(include_disabled = FALSE)$name)
  chunk_outputs(find_chunks(include_disabled = FALSE)$name) %>%
    rename(upstream_chunk = name) ->
    co

  # Look for TEMP_DATA_INJECT pattern in the chunk input list
  ci %>%
    filter(grepl(TEMP_DATA_INJECT, input)) %>%
    mutate(base_input = basename(input)) %>%
    # Look for any tdi inputs that appear in the enabled chunks' outputs
    filter(base_input %in% co$output) %>%
    left_join(select(co, upstream_chunk, output), by = c("base_input" = "output")) ->
    ci_tdi

  # Print messages
  for(i in seq_len(nrow(ci_tdi))) {
    message("NOTE: chunk ", ci_tdi$name[i], " reads `", ci_tdi$input[i], "`\n\tbut this is available from ", ci_tdi$upstream_chunk[i])
  }
  nrow(ci_tdi)
}

#' warn_mismarked_fileinputs
#'
#' Look for mislabeled chunk file inputs
#'
#' @return Number of data chunk objects marked as \code{from_file} but without a directory path.
warn_mismarked_fileinputs <- function() {

  from_file <- input <- NULL  # silence package check

  # Every input marked as from a file should have some directory structure in its name
  chunk_inputs() %>%
    filter(from_file, input == basename(input)) ->
    mismarked

  # Print messages
  for(i in seq_len(nrow(mismarked))) {
    message("NOTE: chunk ", mismarked$name[i], " file input `", mismarked$input[i], "` doesn't appear to be a file")
  }
  nrow(mismarked)
}

#' warn_datachunk_bypass
#'
#' Check whether chunks are bypassing a data chunk
#'
#' @details Data chunks read one or more particular files that are quirky: typically, files that generate
#' warnings or errors when read using the default \code{\link{get_data}} routine, but also ones that need
#' some special preprocessing. We look for chunks that read a file
#' @return Number of data chunk objects being read directly.
warn_datachunk_bypass <- function() {

  from_file <- input <- NULL  # silence package check

  ci <- chunk_inputs()
  co <- chunk_outputs()
  ci %>%
    filter(from_file,
           basename(input) %in% co$output) %>%
    mutate(baseinput = basename(input)) %>%
    # The only time that chunks should be asking for a file whose name appears as another
    # chunk's outputs is if it's ALSO an output of that chunk--this will be a prebuilt dataset
    anti_join(co, c("name", "baseinput" = "output")) ->
    bypassing

  # Print messages
  for(i in seq_len(nrow(bypassing))) {
    message("NOTE: chunk ", bypassing$name[i], " reads `", bypassing$input[i], "` but this is handled by a data chunk")
  }
  nrow(bypassing)
}
JGCRI/gcamdata documentation built on March 21, 2023, 2:19 a.m.