# driver.R
# # tidyr ver1.0.0 causes problem with the usage of tidyr::complete
# if(!require("tidyr")){
# install.packages( "https://github.com/tidyverse/tidyr/archive/v0.8.3.tar.gz", repos = NULL, type = "libcurl", dependencies = TRUE)
# #devtools::install_version("tidyr", version="0.8.3", dependencies=TRUE)
# } else if(packageVersion("tidyr") == "1.0.0"){
# remove.packages("tidyr")
# install.packages( "https://github.com/tidyverse/tidyr/archive/v0.8.3.tar.gz", repos = NULL, type = "libcurl", dependencies = TRUE)
# } else {
# }
TEMP_DATA_INJECT <- "temp-data-inject/"
#' run_chunk
#'
#' @param chunk Chunk name, character
#' @param all_data Data required by chunk
#' @return Data generated by chunk
#' @details Putting this \code{do.call} into this one-line
#' function lets us mock it when testing.
run_chunk <- function(chunk, all_data) {
do.call(chunk, list(driver.MAKE, all_data))
}
#' check_chunk_outputs
#'
#' Checks chunk outputs for a variety of problems: correct structure,
#' correct attributes attached, matches promised outputs. Generates
#' warnings and/or errors if any deviance.
#'
#' @param chunk Chunk name, character
#' @param chunk_data Data produced by chunk, generally a tibble
#' @param chunk_inputs Names of chunk inputs, character
#' @param promised_outputs Names of chunk's promised outputs, character
#' @param outputs_xml Logical vector: are outputs XML?
check_chunk_outputs <- function(chunk, chunk_data, chunk_inputs, promised_outputs, outputs_xml) {
assert_that(is_data_list(chunk_data))
# Check that the chunk has provided required data for all objects
empty_precursors <- TRUE
pc_all <- c()
for(obj in names(chunk_data)) {
obj_flags <- get_flags(chunk_data[[obj]])
# Chunks have to returns tibbles, unless they're tagged as being XML
if(!outputs_xml[which(obj == promised_outputs)]) {
assert_that(tibble::is.tibble(chunk_data[[obj]]), msg = paste(obj, "is", class(obj)))
assert_that(! FLAG_XML %in% obj_flags, msg = obj)
# Make sure objects have required attributes
for(at in c(ATTR_TITLE, ATTR_UNITS, ATTR_COMMENTS)) {
if(is.null(attr(chunk_data[[obj]], at))) {
warning("No '", at, "' attached to ", obj, " - chunk ", chunk)
}
}
# Any 'year' column should be numeric - this is a frequent source of join problems
if("year" %in% names(chunk_data[[obj]]) && !is.numeric(chunk_data[[obj]]$year)) {
warning("'year' column not numeric in ", obj)
}
# If there's any 'logit.*' column then there HAS to be a 'logit.type' column (see #899)
if(any(grepl("^logit\\.", names(chunk_data[[obj]]))) &&
!(LOGIT_TYPE_COLNAME %in% names(chunk_data[[obj]]))) {
warning("No ", LOGIT_TYPE_COLNAME, " column in ", obj)
}
} else {
assert_that(FLAG_XML %in% obj_flags)
}
# Data precursors should all appear in input list
pc <- attr(chunk_data[[obj]], ATTR_PRECURSORS)
pc_all <- c(pc_all, pc)
empty_precursors <- empty_precursors & is.null(pc)
matches <- pc %in% c(chunk_inputs, promised_outputs)
if(!all(matches)) {
stop("Some precursors for '", obj, "' aren't inputs - chunk ", chunk)
}
if(obj %in% pc) {
stop("Precursors for '", obj, "' include itself - chunk ", chunk)
}
}
# Every input should be a precursor for something
if(!all(chunk_inputs %in% pc_all)) {
message("Inputs ", paste(setdiff(chunk_inputs, pc_all), collapse = ", "),
" don't appear as precursors for any outputs - chunk ", chunk)
}
# If chunk has inputs, some output should have a precursor
if(empty_precursors & length(chunk_inputs)) {
stop("No output precursors, but there are inputs - chunk ", chunk)
}
# Chunk should have returned EXACTLY what it promised
if(!identical(sort(names(chunk_data)), sort(promised_outputs))) {
stop("Chunk ", chunk, "is not returning what it promised!")
}
}
#' tibbelize_outputs
#'
#' Extract precursor and other metadata from chunk output data and convert to tibble form.
#'
#' @param chunk_data List of chunk outputs, a data list (see \code{\link{is_data_list}})
#' @param chunk_name Name of current chunk, character
#' @return A tibble with chunk name, output name, title, units, flags, precursors, and comments.
#' This table has one row per output name; multiple flags, precursors, etc., are concatenated into single entries.
tibbelize_outputs <- function(chunk_data, chunk_name) {
assert_that(is.character(chunk_name))
assert_that(is_data_list(chunk_data))
metadata <- list()
for(cd in names(chunk_data)) {
if(!is.null(chunk_data[[cd]]) & length(chunk_data[[cd]])) {
# Here we use paste both to collapse vectors into a single string, and deal with possible NULLs
metadata[[cd]] <- tibble(name = chunk_name,
output = cd,
precursors = paste(get_precursors(chunk_data[[cd]]), collapse = data.SEPARATOR),
title = paste(get_title(chunk_data[[cd]]), collapse = data.SEPARATOR),
units = paste(get_units(chunk_data[[cd]]), collapse = data.SEPARATOR),
comments = paste(get_comments(chunk_data[[cd]]), collapse = data.SEPARATOR),
flags = paste(get_flags(chunk_data[[cd]]), collapse = data.SEPARATOR))
}
}
bind_rows(metadata)
}
#' driver
#'
#' Run the entire data system.
#'
#' @param all_data Data to be pre-loaded into data system
#' @param quiet Suppress output?
#' @param stop_before Stop immediately before this chunk (character)
#' @param stop_after Stop immediately after this chunk (character)
#' @param return_inputs_of Return the data objects that are inputs for these chunks (character).
#' If \code{stop_before} is specified, by default that chunk's inputs are returned
#' @param return_outputs_of Return the data objects that are output from these chunks (character)
#' If \code{stop_after} is specified, by default that chunk's outputs are returned
#' @param return_data_names Return these data objects (character). By default this is the union of \code{return_inputs_of} and \code{return_outputs_of}
#' @param return_data_map_only Return only the precursor information? (logical) This overrides
#' the other \code{return_*} parameters above
#' @param write_outputs Write all chunk outputs to disk?
#' @param write_xml Write XML Batch chunk outputs to disk?
#' @param outdir Location to write output data (ignored if \code{write_outputs} is \code{FALSE})
#' @param xmldir Location to write output XML (ignored if \code{write_outputs} is \code{FALSE})
#' @return A list of all built data (or a data map tibble if requested).
#' @details The driver loads any necessary data from input files,
#' runs all code chunks in an order dictated by their dependencies,
#' does error-checking, and writes outputs. For more details, see
#' the relevant wiki page at \url{ https://github.com/bpbond/gcamdata/wiki/Driver}.
#' @importFrom magrittr "%>%"
#' @importFrom assertthat assert_that
#' @importFrom dplyr filter mutate select
#' @export
#' @author BBL
driver <- function(all_data = empty_data(),
stop_before = NULL, stop_after = NULL,
return_inputs_of = stop_before,
return_outputs_of = stop_after,
return_data_names = union(inputs_of(return_inputs_of),
outputs_of(return_outputs_of)),
return_data_map_only = FALSE,
write_outputs = !return_data_map_only,
write_xml = write_outputs,
outdir = OUTPUTS_DIR, xmldir = XML_DIR,
quiet = FALSE) {
# If users ask to stop after a chunk, but also specify they want particular inputs,
# or if they ask to stop before a chunk, while asking for outputs, that's confusing.
if(missing(return_outputs_of) && !missing(return_inputs_of) && !missing(stop_after)) {
return_outputs_of <- NULL # in this case don't want default of stop_before
}
if(missing(return_inputs_of) && !missing(return_outputs_of) && !missing(stop_before)) {
return_inputs_of <- NULL # in this case don't want default of stop_after
}
if(missing(return_data_names)) {
return_data_names <- union(inputs_of(return_inputs_of), outputs_of(return_outputs_of))
}
optional <- input <- from_file <- name <- NULL # silence notes from package check.
assert_that(is.null(stop_before) | is.character(stop_before))
assert_that(is.null(stop_after) | is.character(stop_after))
assert_that(is.null(return_inputs_of) | is.character(return_inputs_of))
assert_that(is.null(return_outputs_of) | is.character(return_outputs_of))
assert_that(is.null(return_data_names) | is.character(return_data_names))
assert_that(is.logical(return_data_map_only))
assert_that(is.logical(write_outputs))
assert_that(is.logical(write_xml))
assert_that(is.logical(quiet))
if(!quiet) cat("GCAM-Korea Data System v", as.character(utils::packageVersion("gcamkordata")), "\n", sep = "")
#read input list
chunklist <- find_chunks()
#write.csv(chunklist,"chunklist.csv")
if(!quiet) cat("Found", nrow(chunklist), "chunks\n")
chunkinputs <- chunk_inputs(chunklist$name)
if(!quiet) cat("Found", nrow(chunkinputs), "chunk data requirements\n")
chunkoutputs <- chunk_outputs(chunklist$name)
if(!quiet) cat("Found", nrow(chunkoutputs), "chunk data products\n")
# write.csv(chunkinputs,"chunkinputs_MEMO.csv")
# write.csv(chunkoutputs,"chunkoutputs_MEMO.csv")
# Keep track of chunk inputs for later pruning
chunkinputs %>%
group_by(input) %>%
summarise(n = n()) ->
chunk_input_counts
cic <- chunk_input_counts$n
names(cic) <- chunk_input_counts$input
warn_data_injects()
warn_datachunk_bypass()
warn_mismarked_fileinputs()
# Outputs should all be unique
dupes <- duplicated(chunkoutputs$output)
if(any(dupes)) {
stop("Outputs appear multiple times: ", chunkoutputs$output[dupes])
}
# If there are any unaccounted for input requirements,
# try to load them from csv files
unfound_inputs <- filter(chunkinputs, !input %in% chunkoutputs$output)
if(nrow(unfound_inputs)) {
# These should all be marked as 'from_file'
ff <- filter(unfound_inputs, !from_file)
if(nrow(ff)) {
stop("Unfound inputs not marked as from file: ", paste(ff$input, collapse = ", "),
" in ", paste(unique(ff$name), collapse = ", "))
}
if(!quiet) cat(nrow(unfound_inputs), "chunk data input(s) not accounted for\n")
}
# Extract metadata from the input data; we'll add output metadata as we run
metadata_info <- list()
# Initialize some stuff before we start to run the chunks
if(!missing(stop_before) || !missing(stop_after)) {
if(!missing(stop_after)) {
run_chunks <- stop_after
} else {
run_chunks <- stop_before
}
# calc min list
name.x <- name.y <- NULL # silence package check note
verts <- inner_join(bind_rows(chunkoutputs,
tibble(name = unfound_inputs$input,
output = unfound_inputs$input,
to_xml = FALSE)),
chunkinputs, by=c("output" = "input")) %>%
select(name.x, name.y) %>%
unique()
chunks_to_run <- dstrace_chunks(run_chunks, verts)
}
else {
chunks_to_run <- c(unfound_inputs$input, chunklist$name)
}
removed_count <- 0
save_chunkdata(empty_data(), create_dirs = TRUE, write_outputs=write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir) # clear directories
while(length(chunks_to_run)) {
nchunks <- length(chunks_to_run)
# Loop through all chunks and see who can run (i.e. all dependencies are available)
for(chunk in chunks_to_run) {
inputs <- filter(chunkinputs, name == chunk)
input_names <- inputs$input
required_inputs <- filter(inputs, !optional)
if(!all(required_inputs$input %in% names(all_data))) {
next # chunk's required inputs are not all available
}
if(chunk %in% stop_before) {
chunks_to_run <- character(0)
break
}
if(chunk %in% unfound_inputs$input) {
unfound_chunk = unfound_inputs[unfound_inputs$input == chunk, ]
chunk_data <- load_csv_files(unfound_chunk$input, unfound_chunk$optional, quiet = TRUE)
po <- chunk
if(return_data_map_only) {
if(!quiet) cat("Extracting metadata from inputs...\n")
metadata_info[[chunk]] <- tibbelize_outputs(chunk_data, chunk_name = "INPUT")
}
}
else {
if(!quiet) print(chunk)
# Order chunk to build its data
time1 <- Sys.time()
chunk_data <- run_chunk(chunk, all_data[input_names])
tdiff <- as.numeric(difftime(Sys.time(), time1, units = "secs"))
if(!quiet) print(paste("- make", format(round(tdiff, 2), nsmall = 2)))
po <- subset(chunkoutputs, name == chunk)$output # promised outputs
check_chunk_outputs(chunk, chunk_data, input_names,
promised_outputs = po,
outputs_xml = subset(chunkoutputs, name == chunk)$to_xml)
# Save precursor information and other metadata
if(return_data_map_only) {
metadata_info[[chunk]] <- tibbelize_outputs(chunk_data, chunk)
}
}
# Add this chunk's data to the global data store
all_data <- add_data(chunk_data, all_data)
# If any outputs don't appear in the chunk input list, and aren't requested for return,
# they can be immediately written out and removed
prunelist <- !po %in% names(cic) & !po %in% return_data_names
if(any(prunelist)) {
save_chunkdata(all_data[po[prunelist]], write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)
all_data <- remove_data(po[prunelist], all_data)
removed_count <- removed_count + length(po[prunelist])
}
# Decrement the file input count
cic[input_names] <- cic[input_names] - 1
# ...and remove if not going to be used again, and not requested for return
which_zero <- which(cic[input_names] == 0 & !input_names %in% return_data_names)
save_chunkdata(all_data[names(which_zero)], write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)
all_data <- remove_data(names(which_zero), all_data)
removed_count <- removed_count + length(which_zero)
if(chunk %in% stop_after) {
chunks_to_run <- character(0)
break
}
# Remove the current chunk from the to-run list
chunks_to_run <- chunks_to_run[chunks_to_run != chunk]
} # for
# We have to be able to run >=1 chunk every loop iteration
if(length(chunks_to_run) == nchunks) {
stop("No chunks were run--we are stuck")
}
} # while
# Finish up: write outputs, determine return data format
if(!quiet && (write_outputs || write_xml)) cat("Writing chunk data...\n")
save_chunkdata(all_data, write_outputs = write_outputs, write_xml = write_xml, outputs_dir = outdir, xml_dir = xmldir)
if(return_data_map_only) {
if(!quiet) cat("Returning data map.\n")
x <- bind_rows(metadata_info)
} else {
all_data <- all_data[return_data_names]
if(!quiet && length(all_data) > 0) cat("Returning", length(all_data), "tibbles.\n")
x <- all_data[return_data_names]
}
if(!quiet) cat("All done.\n")
invisible(x)
}
#' warn_data_injects
#'
#' Check whether chunks are using any temporary (old data system data) 'injected' inputs.
#'
#' @return Number of temporary data objects being used inappropriately.
warn_data_injects <- function() {
name <- input <- base_input <- upstream_chunk <- output <- NULL # silence package check
# Are any chunks are using temp-data-inject data that are also available to them through the data system?
ci <- chunk_inputs(find_chunks(include_disabled = FALSE)$name)
chunk_outputs(find_chunks(include_disabled = FALSE)$name) %>%
rename(upstream_chunk = name) ->
co
# Look for TEMP_DATA_INJECT pattern in the chunk input list
ci %>%
filter(grepl(TEMP_DATA_INJECT, input)) %>%
mutate(base_input = basename(input)) %>%
# Look for any tdi inputs that appear in the enabled chunks' outputs
filter(base_input %in% co$output) %>%
left_join(select(co, upstream_chunk, output), by = c("base_input" = "output")) ->
ci_tdi
# Print messages
for(i in seq_len(nrow(ci_tdi))) {
message("NOTE: chunk ", ci_tdi$name[i], " reads `", ci_tdi$input[i], "`\n\tbut this is available from ", ci_tdi$upstream_chunk[i])
}
nrow(ci_tdi)
}
#' warn_mismarked_fileinputs
#'
#' Look for mislabeled chunk file inputs
#'
#' @return Number of data chunk objects marked as \code{from_file} but without a directory path.
warn_mismarked_fileinputs <- function() {
from_file <- input <- NULL # silence package check
# Every input marked as from a file should have some directory structure in its name
chunk_inputs() %>%
filter(from_file, input == basename(input)) ->
mismarked
# Print messages
for(i in seq_len(nrow(mismarked))) {
message("NOTE: chunk ", mismarked$name[i], " file input `", mismarked$input[i], "` doesn't appear to be a file")
}
nrow(mismarked)
}
#' warn_datachunk_bypass
#'
#' Check whether chunks are bypassing a data chunk
#'
#' @details Data chunks read one or more particular files that are quirky: typically, files that generate
#' warnings or errors when read using the default \code{\link{get_data}} routine, but also ones that need
#' some special preprocessing. We look for chunks that read a file
#' @return Number of data chunk objects being read directly.
warn_datachunk_bypass <- function() {
from_file <- input <- NULL # silence package check
ci <- chunk_inputs()
co <- chunk_outputs()
ci %>%
filter(from_file,
basename(input) %in% co$output) %>%
mutate(baseinput = basename(input)) %>%
# The only time that chunks should be asking for a file whose name appears as another
# chunk's outputs is if it's ALSO an output of that chunk--this will be a prebuilt dataset
anti_join(co, c("name", "baseinput" = "output")) ->
bypassing
# Print messages
for(i in seq_len(nrow(bypassing))) {
message("NOTE: chunk ", bypassing$name[i], " reads `", bypassing$input[i], "` but this is handled by a data chunk")
}
nrow(bypassing)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.