# node --------------------------------------------------------------------
#' @title node class
#' @description Node is a basic building block of an rflow DAG.
#' @docType class
#' @importFrom R6 R6Class
#' @export
#' @keywords node
#' @return Object of \code{\link{R6Class}} with basic functionality for rflows.
#' @format \code{\link{R6Class}} object.
#' @field id character; unique id
#' @field env character; name of a environment (container or group of objects)
#' @field name character; object's name
#' @field desc character; description
#' @field depends character; vector of upstream nodes (other nodes this node is depending on)
#'
#' @details
#' Nodes may represent different types of data targets (R objects, files, database tables), or jobs.
#' Interconnected nodes together function as basic building elements of directed acyclic graph (DAG).
#'
#' Nodes are implemented using R6 classes. There are various flavors of the basic node class (r_node, db_node, file_node) that have their specific requirements and features
#'
#'
#' Constructing nodes
#' * from stored state
#' * from lists (coming from TOML, YAML...)
#'
#' @usage
#' node$new(id = "node")
#' @section Methods:
#' \describe{
#' \item{\code{$make()} or \code{make(node)}}{This method is used to build/make targets. It recursively solves dependencies by building all targets for nodes declared or detected as a dependency.}
#' \item{\code{$eval()}}{Compared to `make()`, `eval()` only runs the code/job associated with the node alone. It does not try to ensure all the dependencies are ready. }
#' \item{\code{$process()}}{`process()` is usually used internally by `eval()` and should not be called directly}
#' \item{\code{$value}}{Returns target's value. Targets such as database tables or csv files are automatically convert into R values.}
#' \item{\code{$exists()}}{Checks whether the target exists or not.}
#' \item{\code{$time_to_eval()}}{Returns a record(s) of time to evaluate.}
#' }
node <- R6::R6Class(
classname = "node",
private = list(
.last_evaluated = NULL, # datetime when target value was last evaluated/made
.last_changed = NULL, # datetime when target value changed the last time (it might be evaluated without any change)
.time_to_eval = list(),
.trigger_defchange = FALSE,
.vis_params_default = list(
shape = "diamond"
)
),
public = list(
id = NULL, # unique withing an rflow
env = NULL, # name of container (real such as R env or database schema, or abstract such as group of files)
name = NULL, # name is unique withing its env
desc = NULL, # brief description of the object
tags = NULL, # character vector of tags
depends = NULL, # character vector of dependencies (upstream nodes)
upstream = NULL, # vector of references to dependencies
downstream = NULL, # vector of references to dependants
definition_hash = NULL,
persistence = NULL,
trigger_manual = FALSE,
trigger_condition = NULL,
logging = NULL,
loggers = list(),
validators = NULL,
vis_params = NULL,
vis_params_process = function(params) {
union.list(
private$.vis_params_default,
params
)
},
# derive object's ID from given identificators
set_id = function(id, name, env) {
if (is.null(id) & (is.null(name) | is.null(env))) stop("Missing id or (env + name)!") # either 'id' or 'name' + 'env' arguments have to be provided
self$id <- if (is.null(id)) paste0(env, ".", name) else id
},
# set up persistency
set_persistence = function(persistence) {
self$persistence <-
if (is.list(persistence) && length(persistence$path)) {
if (dir.exists(persistence$path)) {
list(
enabled = TRUE,
path = persistence$path,
file = filename_from_id(self$id)
)
} else stop(persistence$path, " does not exist.")
} else {
list(
enabled = FALSE
)
}
invisible(TRUE)
},
print = function(...) {
cat("<", class(self)[1], "> ", crayon::red(self$id), ": ", self$name, "\n", sep = "")
if (length(self$desc)) cat(" desc: ", crayon::italic(self$desc), "\n", sep = "")
if (length(self$tags)) cat(" tags: ", paste0(self$tags, collapse = " "), "\n", sep = "")
cat(" depends: ", paste0(self$depends, collapse = ", "), "\n", sep = "")
cat(" evaluated: ", as.character(self$last_evaluated), "\n",
" changed: ", as.character(self$last_changed), "\n", sep = "")
cat(" triggers: ", "\n", sep = "")
cat(" definition: ", self$trigger_defchange, "\n", sep = "")
cat(" manual: ", self$trigger_manual, "\n", sep = "")
if (length(self$trigger_condition)) cat(" condition: ", self$check_trigger_condition(), "\n", sep = "")
},
label = function() {
label <- paste0(self$env, "\n", self$name)
return(label)
},
title = function() {
paste0("<b>", self$id, "</b>", " <", class(self)[1], ">", "<br>") -> .
# insert description
if (length(self$desc) && !is.na(self$desc))
. <- paste0(
.,
"<em>",
stringr::str_replace_all(
stringr::str_wrap(self$desc, width = 80), stringr::fixed("\n"), "<br>"),
"</em><br>"
)
# insert date of last evaluation
. <- paste0(
.,
"<font size=\"-2\">",
"evaluated: <em>", self$last_evaluated, "</em><br>",
# "changed: <em>", self$last_changed, "</em><br>",
"</font>"
)
return(.)
},
initialize =
function(
id = NULL,
name = NULL,
env = NULL,
desc = NULL,
tags = NULL,
depends = NULL,
trigger_defchange = NULL,
trigger_condition = NULL,
definition_hash = NULL,
persistence = list(enabled = FALSE),
.last_evaluated = NULL,
.last_changed = NULL,
.trigger_defchange = NULL,
.time_to_eval = NULL,
cache = NULL, # this will be ignored, it's here to catch the arguments if the subclass nodes do not use it
mode = NULL, # this will be ignored, it's here to catch the arguments if the subclass nodes do not use it
vis_params = NULL,
validators = NULL,
logging = FALSE,
loggers = NULL,
store = TRUE,
...
) {
self$set_id(id = id, name = name, env = env)
self$name <- name
self$env <- env
self$desc <- desc
self$logging <- isTRUE(logging)
add_loggers(self, loggers)
log_record(self, self$id, paste0("Starting initialization for ", paste0(class(self), collapse = " - ")))
log_record(self, self$id, "Generic node class initialization")
self$definition_hash <- definition_hash
self$set_persistence(persistence)
self$tags <- as.character(tags)
self$validators <- validators
self$vis_params <- self$vis_params_process(vis_params)
depends_char <- if (is.character(depends)) depends else names(depends)
self$depends <- depends_char
if (length(.trigger_defchange))
private$.trigger_defchange <- as.logical(.trigger_defchange) else
if (length(trigger_defchange)) # TODO: drop this when all projects are migrated
private$.trigger_defchange <- as.logical(trigger_defchange)
if (!is.null(trigger_condition))
self$trigger_condition <- as_r_expr(trigger_condition)
private$.last_evaluated <- if (length(.last_evaluated)) .last_evaluated else as.POSIXct(NA)
private$.last_changed <- if (length(.last_changed)) .last_changed else as.POSIXct(NA)
private$.time_to_eval <- .time_to_eval
other_args <- list(...)
if (length(other_args)) warning("These unknown properties are ignored: ", paste(names(other_args), collapse = ", "), "\n")
# this should be always at the very end
# storing is not neccesary when e.g. update_definition() is called from ancestor's method
if (self$persistence$enabled && store) self$store_state()
return(invisible(TRUE))
},
# save function for state persistency
# currently only file-based store backend is supported
store_state = function(
public_fields = NULL,
private_fields = NULL
) {
log_record(self, self$id, "Writing to persistence storage")
public_fields <-
unique(
c(
"id", "name", "env",
"desc", "tags",
"depends",
"logging",
"validators",
"definition_hash",
"trigger_condition",
"vis_params",
public_fields
)
)
private_fields <-
unique(
c(
".last_evaluated", ".last_changed",
".time_to_eval",
".trigger_defchange",
private_fields)
)
saveRDS(
object = list(
self = c(type = class(self)[1],
if (length(public_fields)) mget(public_fields, envir = self) else NULL),
private = if (length(private_fields)) mget(private_fields, envir = private) else NULL
),
file = file.path(self$persistence$path, self$persistence$file)
)
},
# allows to change properties of existing nodes (after they are initialized, e.g. from persistence
update_definition =
function(
id = NULL,
type = NULL,
desc = NULL,
tags = NULL,
depends = NULL,
definition_hash = NULL,
trigger_condition = NULL,
vis_params = NULL,
validators = NULL,
...,
store = TRUE,
logging = FALSE,
loggers = NULL,
verbose = TRUE
) {
self$logging <- isTRUE(logging)
add_loggers(self, loggers)
log_record(self, self$id, "Updating definition")
self$definition_hash <- definition_hash
# graphics params need to be processed before checking
vis_params <- self$vis_params_process(vis_params)
if (!identical(vis_params, self$vis_params)) {
self$vis_params <- vis_params
}
# changes in dependencies
depends_char <- if (is.character(depends)) depends else names(depends) # TODO: remove this? the idea was that the argument can be a named list() where every entry/dependency with specified with some specific criterias
if (!identical(self$depends, depends_char)) {
if (verbose) notify_update(self$id, "dependencies")
self$depends <- depends_char
private$.trigger_defchange <- TRUE
}
# changes in description
if (!identical(as.character(self$desc), as.character(desc))) {
if (verbose) notify_update(self$id, "desc")
self$desc <- as.character(desc)
}
# changes in tags
if (!identical(as.character(self$tags), as.character(tags))) {
if (verbose) notify_update(self$id, "tags")
self$tags <- as.character(tags)
}
# changes in validators
if (!identical(self$validators, validators)) {
if (verbose) notify_update(self$id, "validators")
self$validators <- validators
private$.trigger_defchange <- TRUE
}
# changes in user defined trigger
trigger_condition <- suppressWarnings(as_r_expr(trigger_condition))
if (!identical(as.character(self$trigger_condition), as.character(trigger_condition))) {
if (verbose) notify_update(self$id, "trigger condition")
self$trigger_condition <- trigger_condition
}
# currently all the remaigning arguments are ignored, TODO ?
# other_args <- list(...)
# if (length(other_args)) warning("Not updating ", paste(names(other_args), collapse = ", "), " properties.\n")
# this should be always at the very end
# storing is not neccesary when e.g. update_definition() is called from ancestor's method
if (self$persistence$enabled && store) self$store_state()
return(invisible(TRUE))
},
# connect object to upstream
set_upstream = function(obj) {
if (!in.R6(obj, self$upstream)) {
self$upstream <- c(self$upstream, obj)
}
},
# connect this object to its upsream (currently not used)
set_downstream = function(obj) {
if (!in.R6(self, obj$downstream)) {
obj$downstream <- c(obj$downstream, self)
}
},
# make the references to both upstream and downstream
connect_to = function(id) {
# connect object to upstream
if (exists(id, where = parent.env(self))) {
obj <- get(x = id, envir = parent.env(self))
self$set_upstream(obj)
self$set_downstream(obj)
} else {
warning("Node", crayon::red(id), " does not exists...?")
return(FALSE)
}
return(TRUE)
},
# make the references to all relevant upstream objects
connect = function(verbose = TRUE) {
log_record(self, "Connecting to upstream nodes")
if (verbose) cat("Connecting ", crayon::red(self$id), " to: ", crayon::red(self$depends, collapse = " "), "\n", sep = "")
sapply(self$depends, self$connect_to)
},
get = function() {
log_record(self, self$id, "Call to an empty remove() method.")
warning("This is just empty method...")
},
remove = function() {
log_record(self, self$id, "Call to an empty remove() method.")
warning("This is just empty method...")
},
check_trigger_condition = function() {
isTRUE(eval(self$trigger_condition, envir = self))
},
# run checks that triggers evaluation
# note that the order of checking the triggers is important (short circuit)
check_triggers = function(verbose = TRUE, verbose_prefix = "") {
log_record(self, self$id, "Checking triggers")
if (!isFALSE(self$trigger_defchange)) {if (verbose) notify_trigger(self$id, "change in eval. expression", verbose_prefix = paste0(verbose_prefix, "\u2514 ")); return(TRUE)}
if (!isFALSE(self$trigger_manual)) {if (verbose) notify_trigger(self$id, "manual trigger", verbose_prefix = paste0(verbose_prefix, "\u2514 ")); return(TRUE)}
if (!isFALSE(self$check_trigger_condition())) {if (verbose) notify_trigger(self$id, "custom trigger condition", verbose_prefix = paste0(verbose_prefix, "\u2514 ")); return(TRUE)}
if (!length(self$last_evaluated) || is.na(self$last_evaluated)) {if (verbose) notify_trigger(self$id, "unknown datetime of last eval", verbose_prefix = paste0(verbose_prefix, "\u2514 ")); return(TRUE)}
return(FALSE)
},
# to reset triggers (e.g. after successfull evaluation)
reset_triggers = function() {
log_record(self, self$id, "Resetting triggers")
private$.trigger_defchange <- FALSE
self$trigger_manual <- FALSE
# make changes persistent
if (self$persistence$enabled) self$store_state()
return(invisible(TRUE))
},
# main evaluation function
eval = function(verbose = TRUE, verbose_prefix = "") {
log_record(self, self$id, "Evaluation started")
time_start <- Sys.time()
self$process(verbose = verbose, verbose_prefix = verbose_prefix)
private$.last_evaluated <- Sys.time()
private$.time_to_eval[[length(private$.time_to_eval) + 1]] <- private$.last_evaluated - time_start
log_record(self, self$id, "Evaluation finished")
haschanged <- self$changed(verbose = verbose, verbose_prefix = verbose_prefix)
# all triggers should be reset now
self$reset_triggers()
# make changes persistent
if (self$persistence$enabled) self$store_state()
# by default return no change (but descendant classes with some actual logic in eval() should implement detection of changes or return TRUE by default)
return(haschanged)
},
process = function(verbose = TRUE, verbose_prefix = "") {
log_record(self, self$id, "Call to an empty eval() method.")
},
changed = function(verbose = TRUE, verbose_prefix = "") {
FALSE # by default signal no change
},
# generic make function that recursively solves all dependencies
# it is the workhorse of the Rflow framework
# in most cases, this is not supposed to be overloaded in custom subclasses
make = function(force = FALSE, verbose = TRUE, verbose_prefix = "", .visited = as.environment(list(ids = character()))) {
log_record(self, "Solving make")
if (verbose) {
cat(verbose_prefix, "\u250C Solving node ", crayon::red(crayon::bold(self$id)), "\n", sep = "")
}
# resolve/make upstream nodes ------------------
if (length(self$upstream) != length(self$depends)) {
log_record(self, "Connections to upstream nodes does not match dependencies.")
self$connect() # check that it's connected to upstream
}
# do not evaluate unless triggered
trigger_from_upstream <- FALSE
# collect triggers from upstream
log_record(self, "Resolving upstream nodes")
upstream_changed_ids <- character()
for (y in self[["upstream"]]) {
# skip nodes that were already visited
if (!isTRUE(y$id %in% .visited$ids)) {
# upstream can trigger me if it has changed since my last eval
y_changed <- y$make(force = force, verbose = verbose, verbose_prefix = paste0(verbose_prefix, "\u2502 "), .visited = .visited) # TODO: this might be redundant if do the next line anyawy
} else {
if (verbose) {
cat(verbose_prefix, "\u251C ", crayon::red(crayon::bold(y$id)), ": skipping", "\n", sep = "")
}
y_changed <- NULL
}
y_newer <- isTRUE(y_changed) || !isFALSE(y$last_changed > self$last_evaluated)
# increment list of changed nodes in upstream
# there is another trigger for cases when self$last_evaluated in NULL/NA, therefore isTRUE() should be safe and is better for logging
if (!isFALSE(y_newer) || isTRUE(y_changed))
upstream_changed_ids <- c(upstream_changed_ids, y$id)
trigger_from_upstream <- trigger_from_upstream || y_newer # TODO: can trigger_upstream_newer ever be different from trigger_upstream_changed?
}
triggered <-
if (isTRUE(force)) { # forced evaluation
log_record(self, "Triggered by manual trigger")
if (verbose) notify_trigger(self$id, trigger = "manual trigger", verbose_prefix = paste0(verbose_prefix, "\u251C "))
TRUE
} else if (!length(self$last_evaluated) || is.na(self$last_evaluated)) {
log_record(self, "Triggered by missing datetime of last evaluation")
if (verbose) notify_trigger(self$id, trigger = "unknown datetime of last eval", verbose_prefix = paste0(verbose_prefix, "\u251C "))
TRUE
} else if (!isFALSE(trigger_from_upstream)) {
log_record(self, paste0("Triggered by changes of upstream nodes' values(", paste0(upstream_changed_ids, collapse = ", "), ")"))
if (verbose) notify_trigger(self$id, trigger = paste0("changes in upstream nodes (", paste0(upstream_changed_ids, collapse = ", "), ")"), verbose_prefix = paste0(verbose_prefix, "\u251C "))
TRUE
} else if (!isFALSE(self$check_triggers(verbose = verbose, verbose_prefix = verbose_prefix))) {
TRUE
} else {
log_record(self, "Not triggered")
if (verbose) cat(verbose_prefix, "\u2514 ", crayon::silver(self$id, " not triggered.", sep = ""), "\n", sep = "")
.visited$ids <- unique(c(.visited$ids, self$id))
return(invisible(FALSE))
}
# then make the object itself
# eval should return if the object was evaluated/changed etc...
trigger_downstream <- !isFALSE(self$eval(verbose = verbose, verbose_prefix = paste0(verbose_prefix, "\u2502 ")))
# validate the result
validations <- self$validate(verbose = TRUE, verbose_prefix = "")
if (length(validations)) {
results <- as.data.table(validations)[(!passed)]
if (nrow(results)) {
notify_invalid(
self$id,
validator_names = results[, validator],
validator_signals = results[, signal],
verbose_prefix = paste0(verbose_prefix, "\u2502 ")
)
}
if (nrow(results[signal == "stop"])) {
log_record(self, "stopping the make() proccess due to invalid builds.")
stop("Stopping the make() process!")
}
}
# all triggers should be resetted now
self$reset_triggers()
if (verbose) cat(verbose_prefix, "\u2514 ", crayon::silver(self$id, " finished.", sep = ""), "\n", sep = "")
.visited$ids <- unique(c(.visited$ids, self$id))
# return whether dependants should be triggered or not
return(invisible(trigger_downstream))
},
validate = function(verbose = TRUE, verbose_prefix = "") {
if (length(self$validators))
confront(self$validators, self$get())
},
time_to_eval = function(what = getOption("RFLOW_TIME_TO_EVAL", "LAST")) {
switch(
what,
"LAST" = unlist(tail(private$.time_to_eval, 1)),
"ALL" = unlist(private$.time_to_eval),
"MEAN" = mean(unlist(private$.time_to_eval)),
NULL = unlist(tail(private$.time_to_eval, 1))
)
}
),
active = list(
value = function(value) {
if (missing(value)) {
return(self$get())
} else {
stop("Can't set `$value this way. Use make() or eval() instead.")
}
},
# datetime of last evaluation
last_evaluated = function(value) {
if (missing(value)) {
return(private$.last_evaluated)
} else {
private$.last_evaluated <- value
private$.last_changed <- value
}
},
# datetime of last change (does not have to equal to last evaluation unless the target value really changes)
last_changed = function(value) {
if (missing(value)) {
return(private$.last_evaluated)
} else {
stop("Can't set `$last_changed")
}
},
trigger_defchange = function(value) {
if (missing(value)) {
return(private$.trigger_defchange)
} else {
warning("Setting `$trigger_defchange = TRUE` in a non-standard way")
private$.trigger_defchange <- value
}
}
)
)
# tools -------------------------------------------------------------------
# construct/initiate a node object
#' instantiate a node
#' @export
as_node <- function(x, ...) {
UseMethod("as_node", x)
}
#' @export
as_node.list <- function(
x,
type = if (!is.null(x$type)) x$type else "r_node", # if not given type is r_node by default
... # other arguments to the initialize() constructor
) {
do.call(get(type)$new, args = union.list(x, list(...)))
}
#' @export
as_node.node <- function(x) {
x
}
#' Compose an id value from env and name
#'
#' @param obj object definition
#'
#' @return Node's id as a character value constructed from env + name fields.
#' @export
compose_id <- function(x) {
paste0(x$env, ".", x$name)
}
#' Get node's id
#'
#' @param obj
#'
#' @return Node's id as a character scalar
#' @export
get_id <- function(x) {
UseMethod("get_id", x)
}
#' @export
get_id.list <- function(x) {
if (length(x$id)) x$id else compose_id(x)
}
#' @export
get_id.node <- function(x) {
x$id
}
#' @export
get_id.rflow <- function(x) {
ls(x)
}
#' @export
get_id.node_list <- function(x) {
sapply(x, get_id)
}
#' Returns a list with all node objects
#'
#' @param x an rflow object
#'
#' @return
#' List of all node object from given rflow
#' @export
nodes <- function(x) {
UseMethod("nodes", x)
}
#' @export
nodes.rflow <- function(x) {
mget(get_id(x), envir = x)
}
#' Obtain value/object represented by a node
#'
#' @param x a node or node's id
#' @param rflow rflow object
#'
#' @return Value/object represented by given node.
#' @export
get_value <- function(x, ...) {
UseMethod("get_value", x)
}
#' @method get_value character
#' @export
get_value.character <- function(x, rflow) {
rflow[[x]]$get()
}
#' @method get_value node
#' @export
get_value.node <- function(x) {
x$get()
}
#' @title Evaluates a node
#'
#' @description Evaluates a node assuming all its requirements are met
#'
#' @param id node's id
#' @param ... args passed to node's eval() method
#'
#' @details
#' As oposed to `make()`, this function is not indended for frequent use by a user.
#'
#' @export
eval_node <- function(x, ...) {
UseMethod("eval_node", x)
}
#' @export
eval_node.node <- function(x, ...) {
x$eval(...)
}
#' @export
eval_node.character <- function(x, rflow) {
rflow[[x]]$eval()
}
env_name_from_id <- function(id) {
if (!identical(base::make.names(id), id)) stop(id, "is not a valid id!")
strm <- stringr::str_match(string = id, pattern = "^([0-9a-zA-Z_]+?\\.)?([0-9a-zA-Z]+?[0-9a-zA-Z._]*?)$")
strm[, 2] <- stringr::str_replace(strm[, 2], "\\.$", "")
data.table(
id = strm[, 1],
env = strm[, 2],
name = strm[, 3]
)
}
#' Ensure all identifiers are set up correctly
#'
#' @param obj_defs list of nodes' definitions (list of lists)
#'
#' @export
process_obj_defs <- function(obj_defs) {
for (obj_ind in seq_along(obj_defs)) {
obj_name <- names(obj_defs[obj_ind])
has_id_outside <- length(obj_name) && obj_name != ""
has_env_inside <- length(obj_defs[[obj_ind]]$env)
has_name_inside <- length(obj_defs[[obj_ind]]$name)
has_id_inside <- length(obj_defs[[obj_ind]]$id)
if (!(has_id_outside || (has_env_inside & has_name_inside) || has_id_inside)) stop("Either named list or id or env + name have to be specified")
if (has_id_outside | has_id_inside) {
obj_id_inside <- names(obj_defs[obj_ind])
obj_id_outside <- obj_defs[[obj_ind]][["id"]]
obj_id <- unique(c(obj_id_outside, obj_id_inside))
if (length(obj_id) > 1) {
warning(stringr::str_interp("id '${obj_id_inside}' does not match with object/file name '${obj_id_outside}'. Choosing '${obj_id_inside}' as an id."))
obj_id <- obj_id_inside
}
node_env <- env_name_from_id(obj_id)[["env"]][1]
node_name <- env_name_from_id(obj_id)[["name"]][1]
# node_id <- env_names[id == "obj_id", id]
if (!has_env_inside)
obj_defs[[obj_ind]]$env <- node_env else
if (obj_defs[[obj_ind]]$env != node_env)
stop(stringr::str_interp("${obj_id}: env '${obj_defs[[obj_ind]]$env}' does not match"))
if (!has_env_inside)
obj_defs[[obj_ind]]$name <- node_name else
if (obj_defs[[obj_ind]]$name != node_name)
stop(stringr::str_interp("${obj_id}: name '${obj_defs[[obj_ind]]$name}' does not match"))
} else {
obj_id <- compose_id(obj_defs[[obj_ind]])
}
full_names <- names(obj_defs)
full_names[obj_ind] <- obj_id
names(obj_defs) <- full_names
}
obj_defs
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.