
#' NIflow Class
#' @docType class
#' @importFrom R6 R6Class
#' @export
#' @keywords data
#' @return Object of \code{\link{R6Class}} and \code{NIflow}.
#' @format \code{\link{R6Class}} object.
#' @examples
#' NIflow$new()
#' @section Methods:
#' \describe{
#'   \item{Documentation}{For full documentation of each method follow the corresponding link. }
#'   \item{\code{initialize(name, inputs = list())}}{Create a new flow with the given name and inputs. Documented in \link{NIflow.initialize}.}
#'   \item{\code{name()}}{Returns the name of the flow. Documented in \link{NIflow.name}.}
#'   \item{\code{get_private()}}{Returns the private environment inside the class object. Documented in \link{NIflow.get_private}.}
#'   \item{\code{set_name(new_name)}}{Changes the name of the flow. Documented in \link{NIflow.set_name}.}
#'   \item{\code{get_inputs()}}{Returns the inputs of the flow. Documented in \link{NIflow.get_inputs}.}
#'   \item{\code{get_outputs()}}{Returns the outputs of the flow. Documented in \link{NIflow.get_outputs}.}
#'   \item{\code{get_required_inputs(outputs)}}{Returns the inputs required to compute given outputs. Documented in \link{NIflow.get_required_inputs}.}
#'   \item{\code{get_process(output)}}{Returns the function that computes the given output. Documented in \link{NIflow.get_process}.}
#'   \item{\code{get_dependencies()}}{Returns the list of package dependencies of the flow. Documented in \link{NIflow.get_dependencies}.}
#'   \item{\code{check_dependencies()}}{Checks if all flow dependencies are installed. Documented in \link{NIflow.check_dependencies}.}
#'   \item{\code{replace(output, with)}}{Replace a function that computes an output
#'   with another function. Documented in \link{NIflow.replace}.}
#'   \item{\code{add(what = NULL, inputs = NULL, output = NULL, ...)}}{Adds a function to the flow,
#'   that transforms the given inputs into the given output. Documented in \link{NIflow.add}.}
#'   \item{\code{execute(inputs = list(), desired_outputs = NULL, initialize_outputs = TRUE, ...)}}{Execute the flow to obtain some outputs, given input files. Documented in \link{NIflow.execute}.}
#'   \item{\code{run(...)}}{Just a wrapper for \code{execute}. Documented in \link{NIflow.run}.}
#'   \item{\code{log(level = c("DEBUG", "INFO", "WARNING", "ERROR"), message = '...')}}{Add a message to the flow log, including the level of the message and a timestamp. Documented in \link{NIflow.log}.}
#'   \item{\code{print_log(level = c("DEBUG", "WARNING", "INFO", "ERROR"))}}{Prints the log of the flow. Documented in \link{NIflow.print_log}.}
#'   \item{\code{errors()}}{Just prints the errors in the flow log. Documented in \link{NIflow.errors}.}
#'   \item{\code{warnings()}}{Just prints the warnings in the flow log. Documented in \link{NIflow.warnings}.}
#'   \item{\code{save_log(filename, level = c("DEBUG", "WARNING", "INFO", "ERROR"))}}{Saves a the flow log to file. Documented in \link{NIflow.save_log}.}
#'   \item{\code{graph()}}{Returns the graph of the flow. Documented in \link{NIflow.graph}.}
#'   \item{\code{plot()}}{Plots the graph of the flow. Documented in \link{NIflow.plot}.}
#'   \item{\code{memory_used()}}{Total memory used by the flow, including its internal variables. Documented in \link{NIflow.memory_used}.}
#'   \item{\code{save(path = tempdir(), file_prefix = self$name())}}{Save the flow to disk. Documented in \link{NIflow.save}.}
#'   \item{\code{load(filename)}}{Load a flow from disk. Documented in \link{NIflow.load}.}
#'   \item{\code{deep_clone()}}{Clone a flow. Documented in \link{NIflow.deep_clone}.}
#'   \item{\code{subset(outputs)}}{Take a subset of the flow, for given outputs. Documented in \link{NIflow.subset}.}
#'   \item{\code{to_package(path, package_name = self$name())}}{Creates a package with functions provided by the execution of the flow. Documented in \link{NIflow.to_package}.}
#'  }
NIflow <- R6::R6Class(

  classname = "NIflow",

  public = list(

    initialize = function(name = "",
                          work_dir = tempdir(),
                          inputs = list()) {

      E <- try(eval(inputs), silent = TRUE)

      if (!inherits(E, "try-error")) {

        inputs <- E

      } else {

        expr <- substitute(inputs)
        inputs <- as.character(expr)

        if (class(expr) == "call")
          inputs <- inputs[-1]

        env <- parent.frame(2)
        inputs <- inputs %>% .search_names(envir = env)


      flow_env <- .create_flow(name = name,
                               work_dir = normalizePath(work_dir),
                               inputs = inputs)

      self$.__enclos_env__$private <- flow_env


    name = function() {



    get_workdir = function() {



    clean_workdir = function(all = FALSE) {

      my_dir <- private$work_dir

      if (all && dir.exists(my_dir)) {

        unlink(my_dir, recursive = TRUE, force = TRUE)



      names_to_keep <- self$get_outputs()

      all_files <- list.files(path = my_dir,
                              full.names = TRUE)

      output_files <- lapply(names_to_keep,
                             function(s) {

                               list.files(path = my_dir,
                                          pattern = s,
                                          full.names = TRUE)


      output_files <- unlist(output_files)

      files_to_remove <- setdiff(all_files, output_files)



    get_private = function() {



    set_name = function(new_name) {

      private$name <- new_name


    get_inputs = function() {



    get_outputs = function() {



    get_required_inputs = function(outputs) {

      my_flow <- self$get_private()

      E <- try(eval(outputs), silent = TRUE)

      if (!inherits(E, "try-error") & is.character(E)) {

        outputs <- E

      } else {

        expr <- substitute(outputs)
        outputs <- as.character(expr)

        if (class(expr) == "call")
          outputs <- outputs[-1]

        env <- parent.frame(2)
        outputs <- outputs %>% .search_names(envir = env)


             function(s) my_flow$outputs[s] %>% unname())


    get_process = function(output) {

      E <- try(eval(output), silent = TRUE)

      if (!inherits(E, "try-error")) {

        output <- E

      } else {

        expr <- substitute(output)
        output <- as.character(expr)

        if (class(expr) == "call")
          output <- output[-1]

        env <- parent.frame(2)
        output <- output %>% .search_names(envir = env)

      my_flow <- self$get_private()



    get_dependencies = function() {

      my_flow <- self$get_private()


    check_dependencies = function() {

      pkgs <- self$get_dependencies()

      res <- all(pkgs %in% row.names(installed.packages()))

      if (!res) {

        has_crayon <- requireNamespace("crayon", quietly = TRUE)

        cat("Currently, not all required packages for this flow are installed. Please install\n them before executing this flow:\n") #nocov

        missing <- pkgs[!(pkgs %in% installed.packages())] #nocov

        missing_deps <- str_flatten(missing, collapse = ", ") #nocov
        if (has_crayon) missing_deps <- str_flatten(red(missing), collapse = ", ") #nocov

        cat(missing_deps, "\n\n") #nocov




    replace = function(output, with) {

      E <- try(eval(output), silent = TRUE)

      if (!inherits(E, "try-error")) {

        output <- E

      } else {

        expr <- substitute(output)
        output <- as.character(expr)

        if (class(expr) == "call")
          output <- output[-1]

        env <- parent.frame(2)
        output <- output %>% .search_names(envir = env)


      if (!(output %in% self$get_outputs())) {

        message <- paste0("No current definition for output = ", output)


      my_flow <- self$get_private()
      my_flow$processes[[output]] <- with


    add = function(what = NULL, inputs = NULL, output = NULL, ...) {

      my_flow <- self$get_private()

      E <- try(eval(inputs), silent = TRUE)

      if (!inherits(E, "try-error") & is.character(E)) {

        inputs <- E

      } else {

        expr <- substitute(inputs)
        inputs <- as.character(expr)

        if (class(expr) == "call")
          inputs <- inputs[-1]

        env <- parent.frame(2)
        inputs <- inputs %>% .search_names(envir = env)


      E <- try(eval(output), silent = TRUE)

      if (!inherits(E, "try-error") & is.character(E)) {

        output <- E

      } else {

        expr <- substitute(output)
        output <- as.character(expr)

        if (class(expr) == "call")
          output <- output[-1]

        env <- parent.frame(2)
        output <- output %>% .search_names(envir = env)


      if (length(output) == 0L) {

        output <- NULL


      # Add an input
      if (is.null(what) & is.null(output)) {

        if (is.null(inputs)) {

          stop("At least inputs must be specified.")


        my_flow %>% .add_inputs(inputs = inputs)



      # Add a function
      if (inherits(what, "function")) {

        if (is.null(output)) {

          stop("An output name must be provided to add a function.")


        if (is.null(inputs)) {

          my_flow %>% .add_process(proc = what, output = output, ...)

        } else {

          my_flow %>% .add_process(proc = what, inputs = inputs, output = output, ...)




    execute = function(inputs = list(),
                       desired_outputs = NULL,
                       initialize_outputs = TRUE,
                       ...) {

      expr <- substitute(desired_outputs)
      desired_outputs <- as.character(expr)

      if (class(expr) == "call")
        desired_outputs <- desired_outputs[-1]

      env <- parent.frame(2)
      desired_outputs <- desired_outputs %>% .search_names(envir = env)

      my_flow <- self$get_private()
      my_flow %>% .execute_flow(inputs = inputs,
                                desired_outputs = desired_outputs,
                                initialize_outputs = initialize_outputs,


    run = function(...) {



    compute = function(what = NULL,
                       from = list(),
                       ...) {

      expr <- substitute(what)
      what <- as.character(expr)

      if (class(expr) == "call")
        what <- what[-1]

      env <- parent.frame(2)
      what <- what %>% .search_names(envir = env)

      my_flow <- self$get_private()
      my_flow %>% .execute_flow(inputs = from,
                                desired_outputs = what,
                                initialize_outputs = TRUE,


    log = function(level = c("DEBUG", "INFO", "WARNING", "ERROR"),
                   message = "...") {

      line_to_add <- paste0("(", format(Sys.time(), "%Y-%m-%d %H:%M:%S"), ") [",
                            level[1], "] ",

      private$log_lines <- c(private$log_lines, line_to_add)


    print_log = function(level = c("DEBUG", "WARNING", "INFO", "ERROR")) {

      all_lines <- private$log_lines

      lines <- c()

      if ("DEBUG" %in% level)
        level <- c("DEBUG", "INFO", "WARNING", "ERROR")

      for (i in seq_along(level)) {

        lines <- c(lines, grep(all_lines, pattern = level[i]))


      lines <- sort(lines)

      lines <- private$log_lines[lines]

      cat(lines, sep = "\n")


    errors = function() {

      self$print_log(level = "ERROR")


    warnings = function() {

      self$print_log(level = "WARNING")


    save_log = function(filename, level = c("DEBUG", "WARNING", "INFO", "ERROR")) {

      all_lines <- private$log_lines

      lines <- c()

      if ("DEBUG" %in% level)
        level <- c("DEBUG", "INFO", "WARNING", "ERROR")

      for (i in seq_along(level)) {

        lines <- c(lines, grep(all_lines, pattern = level[i]))


      lines <- sort(lines)

      lines <- private$log_lines[lines]

      cat(lines, sep = "\n", file = filename)


    graph = function() {

      my_flow <- self$get_private()


    plot = function(to_file = "") {

      my_flow <- self$get_private()
      my_flow %>% .plot_flow(to_file)


    memory_used = function() {

      object_size(private) + object_size(self)


    save = function(path = tempdir(), file_prefix = self$name()) {

      my_flow <- self$get_private()
      my_flow %>% .save_flow(path = path, file_prefix = file_prefix)


    load = function(filename) {

      self$.__enclos_env__$private <- .load_flow(filename)


    deep_clone = function() {

      self$get_private() %>% .clone_flow()


    subset = function(outputs) {

      expr <- substitute(outputs)
      outputs <- as.character(expr)

      if (class(expr) == "call")
        outputs <- outputs[-1]

      my_flow <- self$get_private()
      new_flow_env <- my_flow %>% .subset_flow(outputs = outputs)

      new_flow <- NIflow$new(name = self$name())
      new_flow$.__enclos_env__$private <- new_flow_env



    to_package = function(path, package_name = self$name()) {

      empty_env <- new.env()

      my_outputs <- self$get_outputs()
      my_inputs <- self$get_inputs()
      my_flow <- self$get_private()

      # Add specific functions to compute outputs
      for (output in setdiff(my_outputs, my_inputs)) {

        required_inputs <- unlist(my_flow$outputs[my_flow$required_inputs[[output]]])
        f <- .build_compute_function(args = required_inputs,
                                     output = output)

        eval(expr = parse(text = f), envir = empty_env)


      # Create package
      package.skeleton(name = package_name,
                       path = path,
                       environment = empty_env)

      pkg <- file.path(path, package_name)

      # Remove "Read-and-delete-me" file
      unlink(x = file.path(pkg, "Read-and-delete-me"), force = TRUE)

      # Add dependencies to the DESCRIPTION file
      deps <- self$get_dependencies()

      invisible(sapply(deps, usethis::use_package, pkg = pkg))

      # Save the flow
      data_folder <- file.path(pkg, "inst", "flow")

      self$save(path = data_folder, file_prefix = package_name)

      # Add zzz.R which loads the flow as "flow"
      zzz_file <- file.path(pkg, "R", "zzz.R")

      string <- paste0("flow_file <- system.file('flow', '",
                       package_name , "_flow.zip', package = '",
                       package_name, "')\n\n",
                       "flow <- wf4ni::load_flow(flow_file)\n")

      cat(string, file = zzz_file)



  lock_objects = FALSE

