R/flow-r6.R

# R6Flow class and methods


# !diagnostics suppress=self, public, .STATE_KEY


# R6Flow ----
R6Flow <- R6::R6Class(
    classname = "R6Flow",
    public = list(
        # original fn to be flow-ed 
        fn = NULL,
        # hash of fn's arguments (as defined) and its body (excludes comments)
        fn_key = character(),
        # original function name, mostly for debug purposes
        fn_name = character(),
        # from flow_options
        fn_id = NULL,
        excluded_arg = character(),
        eval_arg_fn = NULL,
        split_bare_list = TRUE,
        split_dataframe = FALSE,
        split_fn = NULL,
        # link to R6Eddy obj where data is stored
        eddy = NULL,
        # internal states
        state = NULL,
        state_index = 0L,   # 0 ==> missing (do not use NA)
        # data frame to store elements of fn output
        state_output = NULL,
        # an local cache used for lazy calc
        state_env = NULL,
        # functions with the same arguments as fn
        # (functions declared as obj to bypass locking of R6 methods)
        calc_in_hash = NULL,
        rf_fn = NULL,
        # init
        initialize = function(fn,
                              fn_key,
                              fn_name,
                              fn_id,
                              flow_options = get_flow_options()) {},
        # state
        which_state = function(in_hash) {},
        get_state = function(index = NULL) {},
        add_state = function(in_hash,
                             out_hash,
                             elem_args,
                             make_current = TRUE) {},
        update_state = function(index,
                                in_hash,
                                out_hash,
                                out_data,
                                make_current = TRUE) {},
        forget_state = function(index) {},
        delete_state = function(index) {},
        add_state_output = function(out_hash,
                                    elem_name,
                                    elem_hash,
                                    elem_data) {},
        delete_state_output = function(out_hash) {},
        # elements
        get_out_hash = function(name = NULL) {},
        get_element = function(name = NULL) {},
        # eval & collect
        compute = function() {},
        collect = function(name = NULL) {},
        # misc
        check_all = function() {},
        forget_all = function() {},
        save = function() {},
        print = function() {},
        is_good_index = function(index = NULL) {},
        require_good_index = function(index = NULL) {},
        is_valid_at_index = function(index = NULL) {},
        require_valid_at_index = function(index = NULL) {}
    ),
    active = list(
        is_current = function() {},
        is_valid = function() {}
    )
)


# calc_in_hash ----
R6Flow$set("public", "calc_in_hash_default", function(rf_env = parent.frame()) {
    # goal: replace Element obj with their $elem_hash, then hash the list
    
    elem_args <- rf_env$elem_args
    data_hash_args <-
        elem_args %>% 
        discard_at(self$excluded_arg) %>%
        purrr::map_if(
            .p = ~ inherits(., "Element"),
            .f = function(x) {
                x$self$require_good_index()
                state <- x$self$get_state()
                # we must return some data even if elem is not yet computed
                # uniquely identify fn, its input state, and which elem
                list(
                    fn_key = state$fn_key,
                    in_hash = state$in_hash,
                    elem_name = x$elem_name
                )
            }
        )
    in_hash <- self$eddy$digest(data_hash_args)
    
    in_hash
}, overwrite = TRUE)


R6Flow$set("public", "calc_in_hash_custom", function(rf_env = parent.frame()) {
    
    # we already checked that fn and eval_arg_fn have the same formals
    # ignore excluded_arg
    match_call <- rf_env$match_call
    match_call[[1L]] <- self$eval_arg_fn
    excluded_arg <- self$excluded_arg %if_in% names(match_call)
    for (nm in excluded_arg) {
        match_call[[nm]] <- NULL
    }
    # called from $rf_fn, need to go back two frames when evaluating
    res <- eval(match_call, envir = parent.frame(n = 2))
    in_hash <- self$eddy$digest(res)
    
    in_hash
}, overwrite = TRUE)


# rf_fn ----
R6Flow$set("public", "rf_fn_default", function(...) {
    # when called, the formals already match the original fn
    match_call <- match.call()
    
    # follow memoise logic to separate supplied and default arguments
    # we are still at symbolic stage, have not yet evaluated them
    # https://cran.r-project.org/doc/manuals/r-release/R-lang.html
    #     #Argument-evaluation
    
    # supplied arguments, in the order received, might not be named
    supplied_args <- as.list(match_call)[-1]
    
    # default arguments that have not been supplied
    # excluded arguments have defaults, drop them from hash / eval / saving
    default_args <-
        as.list(formals()) %>%
        purrr::discard(~ identical(., quote(expr = ))) %>%      # nolint
        discard_at(names(supplied_args))
    # supplied args eval in the evaluation frame of the calling function
    # default args eval in the evaluation frame of the original function
    eval_args <- c(
        lapply(supplied_args, eval, envir = parent.frame()),
        lapply(default_args, eval, envir = environment(self$fn))
    )
    
    # for consistency, transform any R6Flow into its Element
    elem_args <-
        eval_args %>%
        purrr::map_if(
            .p = ~ inherits(., "R6Flow"),
            .f = ~ .$get_element(name = NULL)
        )
    
    # prevent simple recursivity issues; TODO update @ reactivity
    is_recursive_lgl <- elem_args %>%
        purrr::keep(~ inherits(., "Element")) %>%
        purrr::map_lgl(~ identical(.$self$fn_key, self$fn_key))
    if (any(is_recursive_lgl)) {
        rlang::abort("Recursive calls cannot be processed.")
    }
    
    in_hash <- self$calc_in_hash()
    
    # check if there is a state associated with in_hash
    found_state_idx <- self$which_state(in_hash)
    if (found_state_idx > 0L) {
        if (found_state_idx != self$state_index) {
            self$state_index <- found_state_idx
        }
        if (!self$is_valid_at_index(found_state_idx)) {
            # state exists but no output cached ==> prep for re-compute
            self$state_env[[in_hash]] <- elem_args
        }
    } else {
        # not in cache, prepare for lazy eval: save args to be called later
        self$add_state(
            in_hash = in_hash, 
            out_hash = NA_character_,
            elem_args = elem_args,
            make_current = TRUE
        )
    }
    
    # return the R6Flow obj instead of its data, use $collect() to get data
    # we could have returned a structure similar to $element(), but
    # - $collect() would require $self$collect(), or
    # - adding a new collect function preserves its encl envir, takes memory
    self
}, overwrite = TRUE)


# initialize ----
R6Flow$set("public", "initialize", function(
        fn,
        fn_key,
        fn_name,
        fn_id,
        flow_options = get_flow_options()
) {
    stopifnot(is_not_flow_fn(fn))
    require_keys(fn_key, fn_name)
    
    # init self$
    self$fn <- fn
    self$fn_key <- fn_key
    self$fn_name <- fn_name
    self$fn_id <- fn_id
    self$excluded_arg <- flow_options$excluded_arg
    self$eval_arg_fn <- flow_options$eval_arg_fn
    self$split_bare_list <- flow_options$split_bare_list
    self$split_dataframe <- flow_options$split_dataframe
    self$split_fn <- flow_options$split_fn
    self$eddy <- flow_options$eddy
    
    # 'group' in cache (do not use $rflow_ yet); does it have state data?
    if (self$eddy$cache$has_key(fn_key, .STATE_KEY)) {
        flow_data <- self$eddy$cache$get_data(fn_key, .STATE_KEY)
        self$state <- flow_data$state
        self$state_output <- flow_data$state_output
    } else {
        # state
        self$state <- tibble::data_frame(
            fn_key = character(),
            in_hash = character(),
            out_hash = character(),
            time_stamp = now_utc(0L)
        )
        # output state
        self$state_output <- tibble::data_frame(
            out_hash = character(),
            elem_name = character(),
            elem_hash = character()
        )
    }
    self$state_index <- 0L
    self$state_env <- new.env(hash = TRUE, parent = emptyenv())
    
    # calc_in_hash
    if (!is.null(self$eval_arg_fn)) {
        self$calc_in_hash <- self$calc_in_hash_custom
    } else {
        self$calc_in_hash <- self$calc_in_hash_default
    }
    
    # rf_fn: its enclosing envir is not changed to preserve access to self$
    self$rf_fn <- self$rf_fn_default
    formals(self$rf_fn) <- formals(args(fn))
    
    # register itself in eddy (error if fn_key already present)
    if (!self$eddy$add_flow(fn_key, self)) {
        rlang::abort(paste("Failed to register flow:", fn_key))
    }
    
    # after registering into eddy, remove itself if error
    tryCatch({
        self$check_all()
    }, error = function(e) {
        self$eddy$remove_flow(fn_key)
        stop(e)
    })
    
    invisible(NULL)
}, overwrite = TRUE)


# which_state ----
R6Flow$set("public", "which_state", function(in_hash) {
    
    # since we just looking for the index, we do not check if the
    # eddy contains the cached values
    found_state_idx <- which(
        self$state$fn_key == self$fn_key &
        self$state$in_hash == in_hash
    )
    len <- length(found_state_idx)
    # there should not be duplicate states for the same fn_key and in_hash
    stopifnot(len <= 1L)
    
    if (len == 0L) {
        0L
    } else {
        found_state_idx
    }
}, overwrite = TRUE)


# get_state ----
R6Flow$set("public", "get_state", function(index = NULL) {
    
    if (is.null(index)) index <- self$state_index
    if (self$is_good_index(index)) {
        state <- self$state[index, , drop = FALSE]
    } else {
        # to preserve type, return a zero-row df if index not valid
        state <- self$state[0L, , drop = FALSE]
    }
    
    state
}, overwrite = TRUE)


# add_state ----
R6Flow$set("public", "add_state", function(in_hash,
                                           out_hash,
                                           elem_args,
                                           make_current = TRUE) {
    require_keys(in_hash)
    if (in_hash %in% self$state$in_hash) {
        rlang::abort(paste("`in_hash` already present:", in_hash))
    }
    
    self$state <-
        self$state %>%
        tibble::add_row(
            fn_key = self$fn_key,
            in_hash = in_hash,
            out_hash = out_hash,
            time_stamp = now_utc()
        )
    self$state_env[[in_hash]] <- elem_args
    if (make_current) self$state_index <- nrow(self$state)
    
    # return TRUE if state can be found
    self$which_state(in_hash) == nrow(self$state)
}, overwrite = TRUE)


# update_state ----
R6Flow$set("public", "update_state", function(index,
                                              in_hash,
                                              out_hash,
                                              out_data,
                                              make_current = TRUE) {
    self$require_good_index(index)
    require_keys(in_hash, out_hash)
    
    # store out_data in cache
    eddy_add_OK <- self$eddy$add_data(self$fn_key, out_hash, out_data)
    if (!eddy_add_OK) return(FALSE)
    
    self$state[index, ] <- list(
        fn_key = self$fn_key,
        in_hash = in_hash,
        out_hash = out_hash,
        time_stamp = now_utc()
    )
    if (!is.na(out_hash) && 
        base::exists(in_hash, where = self$state_env, inherits = FALSE)
    ) {
        # the main reason to update is to add out_hash ==> args no longer needed
        base::rm(list = in_hash, pos = self$state_env)
    }
    if (make_current) self$state_index <- index
    
    # return TRUE if state can be found
    self$which_state(in_hash) == index
}, overwrite = TRUE)


# forget_state ----
R6Flow$set("public", "forget_state", function(index) {
    
    self$require_good_index(index)
    old_state <- self$state[index, , drop = FALSE]
    
    # update state
    self$state[index, ] <- list(
        fn_key = self$fn_key,
        in_hash = old_state$in_hash,
        out_hash = NA_character_,
        time_stamp = now_utc()
    )
    # delete data from cache
    # only if not present somewhere else (same output for the same input)
    delete_key <- old_state$out_hash %if_not_in% self$out_hash
    if (is_key(delete_key)) {
        self$eddy$delete_data(self$fn_key, delete_key)
    }
    # delete data from state_output
    self$delete_state_output(old_state$out_hash)
    
    # return TRUE if state can still be found
    self$which_state(old_state$in_hash) == index
}, overwrite = TRUE)


# delete_state ----
R6Flow$set("public", "delete_state", function(index) {
    
    self$require_good_index(index)
    old_state <- self$state[index, , drop = FALSE]
    
    self$forget_state(index)
    # delete state
    self$state <- self$state[-index, , drop = FALSE]
    # update index
    self$state_index <- self$which_state(old_state$in_hash)
    
    # return TRUE if state cannot be found
    self$which_state(old_state$in_hash) == 0L
}, overwrite = TRUE)


# add_state_output ----
R6Flow$set("public", "add_state_output", function(out_hash,
                                                  elem_name,
                                                  elem_hash,
                                                  elem_data) {
    require_keys(out_hash, elem_name, elem_hash)
    
    # store elem data in cache
    self$eddy$add_data(self$fn_key, elem_hash, elem_data)
    
    state_output <- self$state_output
    found_state_idx <- which(
        state_output$out_hash == out_hash &
        state_output$elem_name == elem_name
    )
    len <- length(found_state_idx)
    stopifnot(len <= 1L)
    if (len == 1L) {
        state_output <- state_output[-found_state_idx, , drop = FALSE]
    }
    
    self$state_output <-
        state_output %>%
        tibble::add_row(
            out_hash = out_hash,
            elem_name = elem_name,
            elem_hash = elem_hash
        )
    
    invisible(NULL)
}, overwrite = TRUE)


# delete_state_output ----
R6Flow$set("public", "delete_state_output", function(out_hash) {
    
    if (length(out_hash) == 0L) return(invisible(NULL))
    
    keep_lgl <- self$state_output$out_hash != out_hash
    old_state_output <- self$state_output[!keep_lgl, , drop = FALSE]
    
    # delete form state_output
    self$state_output <- self$state_output[keep_lgl, , drop = FALSE]
    # delete data from cache
    # only if not present somewhere else (same output for the same input)
    delete_keys <- 
        old_state_output$elem_hash %if_not_in% self$state_output$elem_hash
    deleted_keys <- 
        delete_keys %>%
        rlang::set_names() %>%
        purrr::map_lgl(~ self$eddy$delete_data(self$fn_key, .))
    if (any(!deleted_keys)) {
        txt <- paste(names(deleted_keys[!deleted_keys]), collapse = ", ")
        rlang::warn(paste("flow", self$fn_key, "- cannot delete keys:", txt))
    }
    
    invisible(NULL)
}, overwrite = TRUE)


# get_out_hash ----
R6Flow$set("public", "get_out_hash", function(name = NULL) {
    # no current state OK; not yet computed OK
    
    if (!self$is_current) {
        # no current state, cannot talk about hashes
        return(NULL)
    }
    if (!self$is_valid) {
        # invalid, but not yet computed
        return(NA_character_)
    }
    
    state <- self$get_state()
    if (is.null(name)) {
        # valid & computed - no element
        out_hash <- state$out_hash
    } else {
        # valid & computed - element requested
        found_state_idx <- which(
            self$state_output$out_hash == state$out_hash &
            self$state_output$elem_name == name
        )
        if (length(found_state_idx) == 1L) {
            out_hash <- self$state_output$elem_hash[found_state_idx]
        } else {
            # invalid element
            out_hash <- NA_character_ 
        }
    }
    
    out_hash
}, overwrite = TRUE)


# get_element ----
R6Flow$set("public", "get_element", function(name = NULL) {
    # invalid state OK; not yet computed OK
    
    elem_hash <- self$get_out_hash(name = name)
    if (is.null(elem_hash)) {
        # no current state, cannot talk about hashes
        is_current <- FALSE
        is_valid <- FALSE
    } else if (is.na(elem_hash)) {
        # invalid, but not yet computed
        is_current <- TRUE
        is_valid <- FALSE
    } else {
        is_current <- TRUE
        # main result or an element might be missing from cache
        is_valid <- self$eddy$has_key(self$fn_key, elem_hash)
    }
    
    # class does not inherit R6Flow since it has a different structure
    flow_elem <- list(
        self = self,
        is_current = is_current,
        is_valid = is_valid,
        elem_name = name,
        elem_hash = elem_hash
    )
    class(flow_elem) <- c("Element", "list")
    
    flow_elem
}, overwrite = TRUE)


# compute ----
R6Flow$set("public", "compute", function() {
    # do not compute if already computed
    # return TRUE/FALSE and not the actual value since there might be elements
    
    if (self$is_valid) return(TRUE)
    if (!self$is_current) return(FALSE)
    state <- self$get_state()
    
    if (!base::exists(
        state$in_hash, where = self$state_env, inherits = FALSE)
    ) {
        # cannot find input args ==> cannot compute
        return(FALSE)
    }
    elem_args <- self$state_env[[state$in_hash]]
    
    # need to collect output of Elements (if any)
    data_args <-
        elem_args %>%
        purrr::map_if(
            .p = ~ inherits(., "Element"),
            .f = function(x) {
                x$self$collect(x$elem_name)
            }
        )
    
    # eval in .GlobalEnv to avoid name collisions
    out_data <- withVisible(do.call(
        what = self$fn, args = data_args, envir = globalenv()))
    # we store the out_hash to avoid (re)hashing for flow objects
    out_hash <- self$eddy$digest(out_data)
    
    # update the current state
    update_OK <- self$update_state(
        index = self$state_index, 
        in_hash = state$in_hash, 
        out_hash = out_hash,
        out_data = out_data
    )
    if (!update_OK) return(FALSE)
    
    # split into elements by function
    split_using_fn <- !is.null(self$split_fn)
    split_bare_list <- 
        self$split_bare_list && rlang::is_bare_list(out_data$value)
    split_dataframe <- self$split_dataframe && is.data.frame(out_data$value)
    if (split_using_fn || split_bare_list || split_dataframe) {
        abort_split <- FALSE
        if (split_using_fn) {
            out_lst <- self$split_fn(out_data$value)
            if (!rlang::is_dictionaryish(out_lst)) {
                rlang::warn(paste(
                    "Cannot create flow elements,",
                    "`split_fn` must return a list with unique names."))
                abort_split <- TRUE
            }
        } else if (split_bare_list) {
            out_lst <- out_data$value
            if (!rlang::is_dictionaryish(out_lst)) {
                rlang::inform(paste(
                    "Cannot create flow elements,",
                    "the returned list must have unique names."))
                abort_split <- TRUE
            }
        } else {
            out_lst <- as.list(out_data$value)
            if (!rlang::is_dictionaryish(out_lst)) {
                rlang::inform(paste(
                    "Cannot create flow elements,",
                    "the returned data frame must have unique names."))
                abort_split <- TRUE
            }
        }
        if (!abort_split) {
            for (elem_name in names(out_lst)) {
                # reconstruct the withVisible list for each element
                vis_elem_lst <- list(
                    value = out_lst[[elem_name]],
                    visible = out_data$visible
                )
                elem_hash <- self$eddy$digest(vis_elem_lst)
                self$add_state_output(
                    out_hash, elem_name, elem_hash, vis_elem_lst)
            }
        }
    }
    
    self$save()
}, overwrite = TRUE)


# collect ----
R6Flow$set("public", "collect", function(name = NULL) {
    
    # require valid state since cannot return NULL (NULL can be a valid result)
    self$require_good_index()
    
    # if not yet computed ==> trigger compute
    if (!self$compute()) {
        rlang::abort(paste0(
            "fn_key=", self$fn_key, ": cannot compute the current state."))
    }
    
    out_hash <- self$get_out_hash(name = name)
    # cannot be NULL (require_good_index), but it may be invalid if wrong name
    if (is.na(out_hash)) {
        # return NULL, as is the case with env and lists
        vis_out_lst <- list(
            value = NULL,
            visible = TRUE
        )
    } else {
        if (!self$eddy$has_key(self$fn_key, out_hash)) {
            rlang::abort(paste0(
                "fn_key=", self$fn_key, 
                ": cached output is missing for out_hash ", out_hash))
        }
        vis_out_lst <- self$eddy$get_data(self$fn_key, out_hash)
    }
    
    # preserve the output visibility of the original result
    if (vis_out_lst$visible) {
        vis_out_lst$value
    } else {
        invisible(vis_out_lst$value)
    }
}, overwrite = TRUE)


# check_all ----
R6Flow$set("public", "check_all", function() {
    
    # save current index / in_hash
    if (self$is_current) {
        in_hash <- self$state$in_hash[self$state_index]
    } else {
        in_hash <- NA_character_
    }
    
    # file caching will also return state key(s), which we do not need
    keys <- self$eddy$list_keys(self$fn_key) %if_not_in% c(
        .STATE_KEY, .ROW_CACHE)
    changed <- FALSE
    
    # state: forget states that are missing from cache
    changed_lgl <- !(self$state$out_hash %in% keys)
    changed <- changed || any(changed_lgl)
    self$state$out_hash[changed_lgl] <- NA_character_
    
    # output state: forget implies deleting rows
    keep_rows_lgl <- (self$state_output$elem_hash %in% keys) &
        (self$state_output$out_hash %in% self$state$out_hash)
    changed <- changed || any(!keep_rows_lgl)
    self$state_output <- self$state_output[keep_rows_lgl, , drop = FALSE]
    
    # delete cache of missing states
    delete_keys <- keys %if_not_in% c(
        self$state$out_hash, self$state_output$elem_hash)
    changed <- changed || (length(delete_keys) > 0L)
    if (length(delete_keys) > 0L) {
        deleted_keys <- 
            delete_keys %>%
            rlang::set_names() %>%
            purrr::map_lgl(~ self$eddy$delete_data(self$fn_key, .))
        if (any(!deleted_keys)) {
            txt <- paste(names(deleted_keys[!deleted_keys]), collapse = ", ")
            rlang::warn(paste(
                "flow", self$fn_key, "- cannot delete keys:", txt))
        }
        changed <- TRUE
    }
    
    if (changed) {
        # update index
        self$state_index <- self$which_state(in_hash)
        # do not save: still in init & init incomplete (called with super$)
    }
    
    changed
}, overwrite = TRUE)


# forget_all ----
R6Flow$set("public", "forget_all", function() {
    
    # clear all states
    self$state <- self$state[0L, , drop = FALSE]
    self$state_index <- 0L
    self$state_output <- self$state_output[0L, , drop = FALSE]
    
    # clear cache
    self$eddy$forget_flow(self$fn_key)
}, overwrite = TRUE)


# save ----
R6Flow$set("public", "save", function() {
    
    flow_data <- list(
        fn_key = self$fn_key,
        state = self$state,
        state_output = self$state_output
    )
    
    # returns TRUE if cache for fn_key contains the key .STATE_KEY
    save_ok <- self$eddy$add_data(self$fn_key, .STATE_KEY, flow_data)
    if (!save_ok) {
        rlang::warn("flow cannot save its own state")
    }
    save_ok
}, overwrite = TRUE)


# print ----
# nocov start
R6Flow$set("public", "print", function(n = NULL) {
    
    emph_obj <- paste0("<", crayon::italic(class(self)[[1L]]), ">")
    fn_name <- paste(crayon::bold(self$fn_name), "/", self$fn_id)
    cat(emph_obj, "for function", fn_name, "\n",
        " - number of states:", nrow(self$state), "\n",
        " - current state index:", self$state_index, "\n",
        " - is_current:", self$is_current, "\n",
        " - is_valid:", self$is_valid, "\n"
    )
    print(self$state, n = n)
    
    invisible(self)
}, overwrite = TRUE)


print.Element <- function(x, ...) {
    
    if (length(list(...)) > 0L) warning("all other arguments ignored")
    
    emph_obj1 <- paste0("<", crayon::italic("Element"), ">")
    emph_obj2 <- paste0("<", crayon::italic(class(x$self)[[1L]]), ">")
    fn_name <- paste(crayon::bold(x$self$fn_name), "/", x$self$fn_id)
    cat(emph_obj1, "of", emph_obj2, "for function", fn_name, "\n",
        " - elem_name:", x$elem_name %||% "<full result>", "\n",
        " - elem_hash:", x$elem_hash, "\n",
        " - is_current:", x$self$is_current, "\n",
        " - is_valid:", x$self$is_valid, "\n"
    )
    
    invisible(x)
}
# nocov end


# is_good_index ----
R6Flow$set("public", "is_good_index", function(index = NULL) {
    
    if (is.null(index)) index <- self$state_index
    
    !is.na(index) && 
            rlang::is_scalar_integerish(index) &&
            is.finite(index) &&
            index >= 1L && 
            index <= nrow(self$state)
}, overwrite = TRUE)


# require_good_index ----
R6Flow$set("public", "require_good_index", function(index = NULL) {
    
    if (is.null(index)) index <- self$state_index
    if (!self$is_good_index(index)) {
        if (identical(index, self$state_index)) {
            rlang::abort(paste("Invalid current state, index =", index))
        } else {
            rlang::abort(paste("Invalid state, index =", index))
        }
    }
}, overwrite = TRUE)


# is_valid_at_index ----
R6Flow$set("public", "is_valid_at_index", function(index = NULL) {
    
    if (is.null(index)) index <- self$state_index
    if (!self$is_good_index(index)) return(FALSE)
    
    out_hash <- self$state$out_hash[index]
    if (is.na(out_hash)) return(FALSE)
    
    if (!self$eddy$has_key(self$fn_key, out_hash)) {
        # no value in cache ==> not valid
        self$state$out_hash[index] <- NA_character_
        FALSE
    } else {
        TRUE
    }
}, overwrite = TRUE)


# require_valid_at_index ----
R6Flow$set("public", "require_valid_at_index", function(index = NULL) {
    
    if (is.null(index)) index <- self$state_index
    if (!self$is_valid_at_index(index)) {
        if (identical(index, self$state_index)) {
            rlang::abort(paste("Not-computed current state, index =", index))
        } else {
            rlang::abort(paste("Not-computed state, index =", index))
        }
    }
}, overwrite = TRUE)


# is_current ----
R6Flow$set("active", "is_current", function() {
    # so far, we look only at the index, but this might change
    
    self$is_good_index(self$state_index)
}, overwrite = TRUE)


# is_valid ----
R6Flow$set("active", "is_valid", function() {
    
    self$is_valid_at_index(self$state_index)
}, overwrite = TRUE)
numeract/rflow documentation built on May 28, 2019, 3:39 p.m.