R/pipeline.R

#' Run an analysis pipeline
#'
#' Run the analyses specified in the json file on the data
#'
#' @param json The path to the pipeline json file or a list in pipeline format
#' @param data A vector of data tables or paths to data files. If null, read from the json file
#' @return A list of study parameters (class pipeline_study)
#'
#' @examples
#'
#' iat_json_path <- system.file("extdata", "iat.json", package = "pipeline")
#' iat_data_path <- system.file("extdata", "iat.csv", package = "pipeline")
#' pipeline(iat_json_path, iat_data_path)
#'
#' @export
#'
pipeline <- function(json, data = NULL) {
  if (is.character(json)) {
    study <- jsonlite::read_json(json)
  } else if (is.list(json)) {
    study <- json
  } else {
    stop("json needs to be a list or a file path for a .json file")
  }

  class(study) <- c(class(study), "pipeline_study")

  dataname <- "Data" # default name for the data frame

  if (is.character(data)) {
    dataname <- data
    data <- rio::import(dataname)
  }

  if (is.data.frame(data)) {
    # FIX: only handles one data frame for now
    if (length(study$data) == 0) {
      study$data = list(list(name = dataname))
    }
    study$data[[1]]$data <- data
  } else if (exists("study$data[[1]]$data")) {
    if (is.list(study$data[[1]]$data)) {
      # read data from json file
      listdata <- lapply(study$data[[1]]$data, function(x) {
        x[sapply(x, is.null)] <- NA
        unlist(x)
      })
      data <- do.call("rbind", listdata) %>%
        as.data.frame()
    }
  }

  if (is.null(data)) return(study)

  # run each analysis
  analysis_n <- length(study$analyses)
  for (i in 1:analysis_n) {
    func <- study$analyses[[i]]$func
    params <- study$analyses[[i]]$params
    # replace any params equal to ".data" with the data frame
    replace_data <- grep("^\\.data$", params)
    if (length(replace_data)) {
      for (j in replace_data) {
        params[[j]] <- data
      }
    }
    # replace any params equal to ".data$col" with the column vector
    replace_data_cols <- grep("^\\.data\\$", params)
    if (length(replace_data_cols)) {
      for (j in replace_data_cols) {
        col <- gsub("^\\.data\\$", "", params[j])
        params[[j]] <- data[[col]]
      }
    }

    study$analyses[[i]]$results <- do.call(func,params)
  }

  # evaluate each hypothesis
  hypothesis_n <- length(study$hypotheses)
  for (i in 1:hypothesis_n) {
    h <- study$hypotheses[[i]]

    # evaluate each criterion
    criteria_n <- length(h$criteria)
    criteria <- vector()
    for (j in 1:criteria_n) {
      criterion <- h$criteria[[j]]
      analysis <- grep(criterion$analysis, study$analyses, fixed = TRUE)

      value <- study$analyses[[analysis]]$results[[criterion$result]]
      if (criterion$direction == "<") {
        conclusion <- value < criterion$comparator
      } else if (criterion$direction == ">") {
        conclusion <- value > criterion$comparator
      } else if (criterion$direction == "=") {
        conclusion <- value == criterion$comparator
      } else if (criterion$direction == "!=") {
        conclusion <- value != criterion$comparator
      } else {
        conclusion <- NA
      }
      criteria[j] <- conclusion
      study$hypotheses[[i]]$criteria[[j]]$conclusion <- conclusion
    }

    # evaluate hypothesis
    eval <- tolower(study$hypotheses[[i]]$evaluation)
    if (eval %in% c("&", "and")) {
      study$hypotheses[[i]]$conclusion = (mean(criteria) == TRUE)
    } else if (eval %in% c("|", "or")) {
      study$hypotheses[[i]]$conclusion = (mean(criteria) > 0)
    } else {
      study$hypotheses[[i]]$conclusion = NA
    }
  }

  return(study)
}
debruine/pipeline documentation built on May 8, 2019, 8:59 a.m.