Nothing
#' @title Pipeline Class
#'
#' @description This class implements an analysis pipeline. A pipeline consists
#' of a sequence of analysis steps, which can be added one by one. Each added
#' step may or may not depend on one or more previous steps. The pipeline
#' keeps track of the dependencies among these steps and will ensure that
#' all dependencies are met on creation of the pipeline, that is, before the
#' the pipeline is run. Once the pipeline is run, the output is
#' stored in the pipeline along with each step and can be accessed later.
#' Different pipelines can be bound together while preserving all dependencies
#' within each pipeline.
#' @field name `string` name of the pipeline
#' @field pipeline `data.table` the pipeline where each row represents one step.
#' @author Roman Pahl
#' @docType class
#' @importFrom R6 R6Class
#' @importFrom stats setNames
#' @importFrom utils tail
#' @import data.table
#' @export
Pipeline = R6::R6Class("Pipeline", #nolint
public = list(
name = NULL,
pipeline = NULL,
#' @description constructor
#' @param name the name of the Pipeline
#' @param data optional data used at the start of the pipeline. The
#' data also can be set later using the `set_data` function.
#' @param logger custom logger to be used for logging. If no logger
#' is provided, the default logger is used, which should be sufficient
#' for most use cases.
#' If you do want to use your own custom log function, you need to
#' provide a function that obeys the following form:
#'
#' `function(level, msg, ...) {
#' your custom logging code here
#' }`
#'
#' The `level` argument is a string and will be one of `info`, `warn`,
#' or `error`. The `msg` argument is a string containing the message
#' to be logged. The `...` argument is a list of named parameters,
#' which can be used to add additional information to the log message.
#' Currently, this is only used to add the context in case of a step
#' giving a warning or error.
#'
#' Note that with the default logger, the log layout can be altered
#' any time via [set_log_layout()].
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("myPipe", data = data.frame(x = 1:8))
#' p
#'
#' # Passing custom logger
#' my_logger <- function(level, msg, ...) {
#' cat(level, msg, "\n")
#' }
#' p <- Pipeline$new("myPipe", logger = my_logger)
initialize = function(
name,
data = NULL,
logger = NULL
) {
if (!is_string(name)) {
stop_no_call("name must be a string")
}
if (!nzchar(name)) {
stop_no_call("name must not be empty")
}
stopifnot(is.null(logger) || is.function(logger))
if (is.null(logger)) {
private$.lg <- lgr::get_logger(name = .this_package_name())$log
}
if (is.function(logger)) {
expectedFormals <- c("level", "msg", "...")
if (!setequal(names(formals(logger)), expectedFormals)) {
stop_no_call(
"logger function must have the following signature: ",
"function(", paste(expectedFormals, collapse = ", "),
")"
)
}
private$.lg <- logger
}
self$name <- name
self$pipeline <- data.table::data.table(
step = character(0),
fun = list(),
funcName = character(0),
params = list(),
depends = list(),
out = list(),
keepOut = logical(),
group = character(0),
description = character(0),
time = structure(numeric(0), class = c("POSIXct", "POSIXt")),
state = character(0)
)
self$add("data", function() data, keepOut = FALSE)
invisible(self)
},
#' @description Add pipeline step
#' @param step `string` the name of the step. Each step name must
#' be unique.
#' @param fun `function` or name of the function to be applied at
#' the step. Both existing and anonymous/lambda functions can be used.
#' All function parameters must have default values. If a parameter
#' is missing a default value in the function signature, alternatively,
#' it can be set via the `params` argument (see Examples section with
#' [mean()] function).
#'
#' @param params `list` list of parameters to set or overwrite
#' parameters of the passed function.
#' @param description `string` optional description of the step
#' @param group `string` output collected after pipeline execution
#' (see function `collect_out`) is grouped by the defined group
#' names. By default, this is the name of the step, which comes in
#' handy when the pipeline is copy-appended multiple times to keep
#' the results of the same function/step grouped at one place.
#' @param keepOut `logical` if `FALSE` (default) the output of the
#' step is not collected when calling `collect_out` after the pipeline
#' run. This option is used to only keep the results that matter
#' and skip intermediate results that are not needed. See also
#' function `collect_out` for more details.
#' @return returns the `Pipeline` object invisibly
#' @examples
#' # Add steps with lambda functions
#' p <- Pipeline$new("myPipe", data = 1)
#' p$add("s1", \(x = ~data) 2*x) # use input data
#' p$add("s2", \(x = ~data, y = ~s1) x * y)
#' try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already
#' try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found
#' p
#'
#' # Add step with existing function
#' p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4))
#' p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
#' p$run()$get_out("calc_mean")
#'
#' # Step description
#' p <- Pipeline$new("myPipe", data = 1:10)
#' p$add("s1", \(x = ~data) 2*x, description = "multiply by 2")
#' print(p)
#' print(p, verbose = TRUE) # print all columns
#'
#' # Group output
#' p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5))
#' p$add("prep_x", \(data = ~data) data$x, group = "prep")
#' p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep")
#' p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y)
#' p$run()$collect_out(all = TRUE)
add = function(
step,
fun,
params = list(),
description = "",
group = step,
keepOut = FALSE
) {
private$.verify_step_does_not_exist(step)
stopifnot(
is.function(fun) || is_string(fun),
is.list(params),
is_string(description),
is_string(group) && nzchar(group),
is.logical(keepOut)
)
if (is.function(fun)) {
funcName <- as.character(substitute(fun))[[1]]
}
else {
funcName <- fun
fun <- get(fun, mode = "function")
}
params <- private$.prepare_and_verify_params(fun, funcName, params)
# Derive and verify dependencies
deps <- private$.derive_dependencies(
params = params,
step = step
)
sapply(deps, FUN = private$.verify_dependency, step = step)
self$pipeline <- self$pipeline |>
rbind(
list(
step = step,
fun = list(fun),
funcName = funcName,
params = list(params),
depends = list(deps),
out = list(NULL),
keepOut = keepOut,
group = group,
description = description,
time = Sys.time(),
state = "New"
)
)
invisible(self)
},
#' @description Append another pipeline
#' When appending, `pipeflow` takes care of potential name clashes with
#' respect to step names and dependencies, that is, if needed, it will
#' automatically adapt step names and dependencies to make sure they
#' are unique in the merged pipeline.
#' @param p `Pipeline` object to be appended.
#' @param outAsIn `logical` if `TRUE`, output of first pipeline is used
#' as input for the second pipeline.
#' @param tryAutofixNames `logical` if `TRUE`, name clashes are tried
#' to be automatically resolved by appending the 2nd pipeline's name.
#' Only set to `FALSE`, if you know what you are doing.
#' @param sep `string` separator used when auto-resolving step names
#' @return returns new combined `Pipeline`.
#' @examples
#' # Append pipeline
#' p1 <- Pipeline$new("pipe1")
#' p1$add("step1", \(x = 1) x)
#' p2 <- Pipeline$new("pipe2")
#' p2$add("step2", \(y = 1) y)
#' p1$append(p2)
#'
#' # Append pipeline with potential name clashes
#' p3 <- Pipeline$new("pipe3")
#' p3$add("step1", \(z = 1) z)
#' p1$append(p2)$append(p3)
#'
#' # Use output of first pipeline as input for second pipeline
#' p1 <- Pipeline$new("pipe1", data = 8)
#' p2 <- Pipeline$new("pipe2")
#' p1$add("square", \(x = ~data) x^2)
#' p2$add("log2", \(x = ~data) log2(x))
#'
#' p12 <- p1$append(p2, outAsIn = TRUE)
#' p12$run()$get_out("log2")
#' p12
#'
#' # Custom name separator
#' p1$append(p2, sep = "___")
append = function(
p,
outAsIn = FALSE,
tryAutofixNames = TRUE,
sep = "."
) {
stopifnot(
inherits(p, "Pipeline"),
is.logical(outAsIn),
is_string(sep)
)
# Clone both pipelines and then append the 2nd to the 1st
p1 <- self$clone()
p2 <- p$clone()
stepNames <- c(p1$get_step_names(), p2$get_step_names())
# Handle name clashes
if (any(duplicated(stepNames))) {
duplicatedNames <- stepNames[duplicated(stepNames)]
if (!tryAutofixNames) {
stop_no_call(
"combined pipeline would have duplicated step names: ",
paste0("'", duplicatedNames, "'", sep = ", ")
)
}
for (name in duplicatedNames) {
newName <- paste(name, p2$name, sep = sep)
if (self$has_step(newName)) {
stop_no_call(
"Cannot auto-fix name clash for step '",
name, "' in pipeline '",
p2$name, "'. Step '", newName,
"' already exists in pipeline '", self$name, "'."
)
}
p2$rename_step(from = name, to = newName)
}
}
# Build combined pipeline
combinedName <- paste0(p1$name, sep, p2$name)
combinedPipe <- Pipeline$new(combinedName)
combinedPipe$pipeline <- rbind(p1$pipeline, p2$pipeline)
if (outAsIn) {
# Replace first step of p2, with the output of last step of p1
stepNames <- combinedPipe$get_step_names()
lastStep1 = stepNames[p1$length()]
firstStep2 = stepNames[p1$length() + 1]
combinedPipe$replace_step(
step = firstStep2,
fun = function(data = ~-1) data,
description = "output of last step of first pipeline",
keepOut = p2$pipeline[["keepOut"]][[1]]
)
}
combinedPipe
},
#' @description Appends string to all step names and takes care
#' of updating step dependencies accordingly.
#' @param postfix `string` to be appended to each step name.
#' @param sep `string` separator between step name and postfix.
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe")
#' p$add("step1", \(x = 1) x)
#' p$add("step2", \(y = 1) y)
#' p$append_to_step_names("new")
#' p
#' p$append_to_step_names("foo", sep = "__")
#' p
append_to_step_names = function(postfix, sep = ".") {
stopifnot(is_string(postfix))
add_postfix <- function(x) {
if (length(x) == 0) {
return(x)
}
paste(x, postfix, sep = sep) |>
stats::setNames(names(x))
}
# Add postfix to step names and update dependencies accordingly
steps <- self$get_step_names()
deps <- self$get_depends()
self$pipeline[["step"]] <- add_postfix(steps)
self$pipeline[["depends"]] <- lapply(deps, add_postfix)
invisible(self)
},
#' @description Collect output afer pipeline run, by default, from all
#' steps for which `keepOut` was set to `TRUE`. The output is grouped
#' by the group names (see `group` parameter in function `add`),
#' which by default are set identical to the step names.
#' @param groupBy `string` column of pipeline by which to group the
#' output.
#' @param all `logical` if `TRUE` all output is collected
#' regardless of the `keepOut` flag. This can be useful for debugging.
#' @return `list` containing the output, named after the groups, which,
#' by default, are the steps.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("step1", \(x = ~data) x + 2)
#' p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE)
#' p$run()
#' p$collect_out()
#' p$collect_out(all = TRUE) |> str()
#'
#' # Grouped output
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("step1", \(x = ~data) x + 2, group = "add")
#' p$add("step2", \(x = ~step1, y = 2) x + y, group = "add")
#' p$add("step3", \(x = ~data) x * 3, group = "mult")
#' p$add("step4", \(x = ~data, y = 2) x * y, group = "mult")
#' p
#' p$run()
#' p$collect_out(all = TRUE) |> str()
#'
#' # Grouped by state
#' p$set_params(list(y = 5))
#' p
#' p$collect_out(groupBy = "state", all = TRUE) |> str()
collect_out = function(groupBy = "group", all = FALSE)
{
# nolint start
stopifnot(
"groupBy must be a single string" = is_string(groupBy),
"groupBy column does not exist" = groupBy %in% colnames(self$pipeline),
"groupBy column must be character" = is.character(self$pipeline[[groupBy]])
)
# nolint end
keepOut <- if (all) {
rep(TRUE, self$length())
} else {
self$pipeline[["keepOut"]]
}
if (!any(keepOut)) {
return(list())
}
collect_results <- function(pipeline) {
results = as.list(pipeline[["out"]])
names(results) = pipeline[["step"]]
results
}
pipeOut <- self$pipeline[keepOut, ]
result <- list()
groupLabels <- pipeOut[[groupBy]]
# Group output if at least two steps have the same group label
groupings <- table(groupLabels) |>
as.list() |>
Filter(f = function(x) x > 1) |>
unlist()
hasGroupings <- length(groupings) > 0
if (hasGroupings) {
indices <- groupLabels %in% names(groupings)
groupedRes <- pipeOut[indices, ] |>
split(f = groupLabels[indices]) |>
lapply(collect_results)
result <- append(result, groupedRes)
}
# Ungrouped result
isUngrouped <- !(groupLabels %in% names(groupings))
ungroupedRes <- pipeOut[isUngrouped, ] |>
collect_results() |>
stats::setNames(groupLabels[isUngrouped])
result <- append(result, ungroupedRes)
# Keep original order of groups as they were defined
orderedGroupLabels <- intersect(groupLabels, names(result))
result |> .subset(orderedGroupLabels)
},
#' @description Discard all steps that match a given `pattern`.
#' @param pattern `string` containing a regular expression (or
#' character string for `fixed = TRUE`) to be matched.
#' @param fixed `logical` If `TRUE`, `pattern` is a string to
#' be matched as is. Overrides all conflicting arguments.
#' @param recursive `logical` if `TRUE` the step is removed together
#' with all its downstream dependencies.
#' @param ... further arguments passed to [grep()].
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(x = ~data) x + 1)
#' p$add("add2", \(x = ~add1) x + 2)
#' p$add("mult3", \(x = ~add1) x * 3)
#' p$add("mult4", \(x = ~add2) x * 4)
#' p
#'
#' p$discard_steps("mult")
#' p
#'
#' # Re-add steps
#' p$add("mult3", \(x = ~add1) x * 3)
#' p$add("mult4", \(x = ~add2) x * 4)
#' p
#' # Discarding 'add1' does not work ...
#' try(p$discard_steps("add1"))
#'
#' # ... unless we enforce to remove its downstream dependencies as well
#' p$discard_steps("add1", recursive = TRUE) # this works
#' p
#'
#' # Trying to discard non-existent steps is just ignored
#' p$discard_steps("non-existent")
discard_steps = function(
pattern,
recursive = FALSE,
fixed = TRUE,
...
) {
steps2remove = grep(
pattern = pattern,
x = self$get_step_names(),
fixed = fixed,
value = TRUE,
...
)
# To respect dependencies, remove steps from last to first
for (step in rev(steps2remove)) {
self$remove_step(step, recursive = recursive)
message(gettextf("step '%s' was removed", step))
}
invisible(self)
},
#' @description Get data
#' @return the output defined in the `data` step, which by default is
#' the first step of the pipeline
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$get_data()
#' p$set_data(3:4)
#' p$get_data()
get_data = function()
{
steps <- self$get_step_names()
if (!"data" %in% steps) {
stop_no_call("no data step defined")
}
pos <- match("data", steps)
self$pipeline[["fun"]][[pos]]()
},
#' @description Get all dependencies defined in the pipeline
#' @return named `list` of dependencies for each step
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(x = ~data) x + 1)
#' p$add("add2", \(x = ~data, y = ~add1) x + y)
#' p$get_depends()
get_depends = function()
{
self$pipeline[["depends"]] |>
stats::setNames(self$get_step_names())
},
#' @description Get all downstream dependencies of given step, by
#' default descending recursively.
#' @param step `string` name of step
#' @param recursive `logical` if `TRUE`, dependencies of dependencies
#' are also returned.
#' @return `list` of downstream dependencies
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(x = ~data) x + 1)
#' p$add("add2", \(x = ~data, y = ~add1) x + y)
#' p$add("mult3", \(x = ~add1) x * 3)
#' p$add("mult4", \(x = ~add2) x * 4)
#' p$get_depends_down("add1")
#' p$get_depends_down("add1", recursive = FALSE)
get_depends_down = function(
step,
recursive = TRUE
) {
private$.verify_step_exists(step)
deps <- private$.get_downstream_depends(
step = step,
depends = self$get_depends(),
recursive = recursive
)
# Ensure order matches the step order of the pipeline
intersect(self$get_step_names(), deps)
},
#' @description Get all upstream dependencies of given step, by
#' default descending recursively.
#' @param step `string` name of step
#' @param recursive `logical` if `TRUE`, dependencies of dependencies
#' are also returned.
#' @return `list` of upstream dependencies
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(x = ~data) x + 1)
#' p$add("add2", \(x = ~data, y = ~add1) x + y)
#' p$add("mult3", \(x = ~add1) x * 3)
#' p$add("mult4", \(x = ~add2) x * 4)
#' p$get_depends_up("mult4")
#' p$get_depends_up("mult4", recursive = FALSE)
get_depends_up = function(
step,
recursive = TRUE
) {
private$.verify_step_exists(step)
deps <- private$.get_upstream_depends(
step = step,
depends = self$get_depends(),
recursive = recursive
)
# Ensure order matches the step order of the pipeline
intersect(self$get_step_names(), deps)
},
#' @description Visualize the pipeline as a graph.
#' @param groups `character` if not `NULL`, only steps belonging to the
#' given groups are considered.
#' @return two data frames, one for nodes and one for edges ready to be
#' used with the `visNetwork` package.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = ~add1) x + y)
#' p$add("mult1", \(x = ~add1, y = ~add2) x * y)
#' graph <- pipe_get_graph(p)
#' graph
#'
#' if (require("visNetwork", quietly = TRUE)) {
#' do.call(visNetwork, args = p$get_graph())
#' }
get_graph = function(
groups = NULL
) {
nodes <- private$.create_node_table(groups = groups)
edges <- private$.create_edge_table(groups = groups)
list(nodes = nodes, edges = edges)
},
#' @description Get output of given step
#' @param step `string` name of step
#' @return the output at the given step.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(x = ~data) x + 1)
#' p$add("add2", \(x = ~data, y = ~add1) x + y)
#' p$run()
#' p$get_out("add1")
#' p$get_out("add2")
get_out = function(step)
{
self$get_step(step)[["out"]][[1]]
},
#' @description Set unbound function parameters defined in
#' the pipeline where 'unbound' means parameters that are not linked
#' to other steps. Trying #' to set parameters that don't exist in
#' the pipeline is ignored, by default, with a warning.
#' @param ignoreHidden `logical` if TRUE, hidden parameters (i.e. all
#' names starting with a dot) are ignored and thus not returned.
#' @return `list` of parameters, sorted and named by step. Steps with
#' no parameters are filtered out.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
#' p$add("add3", \() 1 + 2)
#' p$get_params() |> str()
#' p$get_params(ignoreHidden = FALSE) |> str()
get_params = function(ignoreHidden = TRUE)
{
res <- lapply(
self$pipeline[["step"]],
FUN = self$get_params_at_step,
ignoreHidden = ignoreHidden
)
names(res) <- self$pipeline[["step"]]
Filter(res, f = function(x) length(x) > 0)
},
#' @description Get all unbound (i.e. not referring to other steps)
#' at given step name.
#' @param step `string` name of step
#' @param ignoreHidden `logical` if TRUE, hidden parameters (i.e. all
#' names starting with a dot) are ignored and thus not returned.
#' @return `list` of parameters defined at given step.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
#' p$add("add3", \() 1 + 2)
#' p$get_params_at_step("add2")
#' p$get_params_at_step("add2", ignoreHidden = FALSE)
#' p$get_params_at_step("add3")
get_params_at_step = function(step, ignoreHidden = TRUE)
{
isValue = function(x) !is.name(x) && !is.call(x)
areHidden = function(x) {
startsWith(names(x), ".")
}
filter_desired_parameters = function(x) {
if (length(x) == 0) {
return(x)
}
params <- Filter(x, f = isValue)
if (ignoreHidden) {
params = params[!areHidden(params)]
}
params
}
step <- self$get_step(step)
step[["params"]] |>
unlist1() |>
filter_desired_parameters() |>
as.list()
},
#' @description Get all unbound (i.e. not referring to other steps)
#' parameters defined in the pipeline,
#' but only list each parameter once. The values of the parameters,
#' will be the values of the first step where the parameter was defined.
#' This is particularly useful after the parameters where set using
#' the `set_params` function, which will set the same value
#' for all steps.
#' @param ignoreHidden `logical` if TRUE, hidden parameters (i.e. all
#' names starting with a dot) are ignored and thus not returned.
#' @return `list` of unique parameters
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
#' p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
#' p$get_params_unique()
#' p$get_params_unique(ignoreHidden = FALSE)
get_params_unique = function(ignoreHidden = TRUE)
{
params <- self$get_params(ignoreHidden)
if (length(params) == 0) {
return(params)
}
paramNames <- sapply(params, FUN = names) |> unlist()
paramValues <- unlist1(params) |> stats::setNames(paramNames)
paramValues[!duplicated(names(paramValues))]
},
#' @description Get all unique function parameters in json format.
#' @param ignoreHidden `logical` if TRUE, hidden parameters (i.e. all
#' names starting with a dot) are ignored and thus not returned.
#' @return `list` flat unnamed json list of unique function parameters
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
#' p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
#' p$get_params_unique_json()
#' p$get_params_unique_json(ignoreHidden = FALSE)
get_params_unique_json = function(ignoreHidden = TRUE)
{
params = self$get_params_unique(ignoreHidden)
param_to_list <- function(p, name) {
if (methods::is(p, "Param")) {
p = as.list(attributes(eval(p)))
p[["name"]] = name
} else {
p <- list(name = name, value = p)
}
p
}
mapply(
params,
name = names(params),
FUN = param_to_list,
SIMPLIFY = FALSE
) |>
stats::setNames(NULL) |>
jsonlite::toJSON(auto_unbox = TRUE, pretty = TRUE)
},
#' @description Get step of pipeline
#' @param step `string` name of step
#' @return `data.table` row containing the step.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z)
#' p$run()
#' add1 <- p$get_step("add1")
#' print(add1)
#' add1[["params"]]
#' add1[["fun"]]
#' try()
#' try(p$get_step("foo")) # error: step 'foo' does not exist
get_step = function(step)
{
private$.verify_step_exists(step)
pos <- Position(
self$pipeline[["step"]],
f = function(x) x == step,
nomatch = stop_no_call("step '", step, "' not found")
)
self$pipeline[pos, ]
},
#' @description Get step names of pipeline
#' @return `character` vector of step names
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$get_step_names()
get_step_names = function()
{
self$pipeline[["step"]]
},
#' @description Get step number
#' @param step `string` name of step
#' @return the step number in the pipeline
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$get_step_number("f2")
get_step_number = function(step)
{
private$.verify_step_exists(step)
match(step, self$get_step_names())
},
#' @description Check if pipeline has given step
#' @param step `string` name of step
#' @return `logical` whether step exists
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$has_step("f2")
#' p$has_step("foo")
has_step = function(step)
{
stopifnot(
is_string(step),
nzchar(step)
)
step %in% self$get_step_names()
},
#' @description Insert step after a certain step
#' @param afterStep `string` name of step after which to insert
#' @param step `string` name of step to insert
#' @param ... further arguments passed to `add` method of the pipeline
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(x = ~f1) x)
#' p$insert_after("f1", "f3", \(x = ~f1) x)
#' p
insert_after = function(
afterStep,
step,
...
) {
private$.verify_step_does_not_exist(step)
if (!self$has_step(afterStep)) {
stop_no_call("step '", afterStep, "' does not exist")
}
pos <- match(afterStep, self$get_step_names())
pip <- Pipeline$new(name = self$name)
pip$pipeline <- self$pipeline[seq_len(pos), ]
pip$add(step = step, ...)
if (pos < self$length()) {
pip$pipeline <- rbind(
pip$pipeline,
self$pipeline[-seq_len(pos), ]
)
}
self$pipeline <- pip$pipeline
invisible(self)
},
#' @description Insert step before a certain step
#' @param beforeStep `string` name of step before which to insert
#' @param step `string` name of step to insert
#' @param ... further arguments passed to `add` method of the pipeline
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(x = ~f1) x)
#' p$insert_before("f2", "f3", \(x = ~f1) x)
#' p
insert_before = function(
beforeStep,
step,
...
) {
private$.verify_step_does_not_exist(step)
if (!self$has_step(beforeStep)) {
stop_no_call("step '", beforeStep, "' does not exist")
}
pos <- match(beforeStep, self$get_step_names()) - 1
if (pos == 0) {
stop_no_call("cannot insert before first step")
}
pip <- Pipeline$new(name = self$name)
pip$pipeline <- self$pipeline[seq_len(pos), ]
pip$add(step = step, ...)
pip$pipeline <- rbind(
pip$pipeline,
self$pipeline[-seq_len(pos), ]
)
self$pipeline <- pip$pipeline
invisible(self)
},
#' @description Length of the pipeline aka number of pipeline steps.
#' @return `numeric` length of pipeline.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$length()
length = function() nrow(self$pipeline),
#' @description Locking a step means that both its parameters and its
#' output (given it has output) are locked such that neither
#' setting new pipeline parameters nor future pipeline runs can change
#' the current parameter and output content.
#' @param step `string` name of step
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = 1, data = ~data) x + data)
#' p$add("add2", \(x = 1, data = ~data) x + data)
#' p$run()
#' p$get_out("add1")
#' p$get_out("add2")
#' p$lock_step("add1")
#'
#' p$set_data(3)
#' p$set_params(list(x = 3))
#' p$run()
#' p$get_out("add1")
#' p$get_out("add2")
lock_step = function(step) {
private$.set_at_step(step, "state", "Locked")
invisible(self)
},
#' @description Drop last step from the pipeline.
#' @return `string` the name of the step that was removed
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p
#' p$pop_step() # "f2"
#' p
pop_step = function() {
len <- self$length()
lastStepName <- self$get_step_names()[len]
self$pipeline <- self$pipeline[-len, ]
lastStepName
},
#' @description Drop all steps after the given step.
#' @param step `string` name of step
#' @return `character` vector of steps that were removed.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$add("f3", \(z = 1) z)
#' p$pop_steps_after("f1") # "f2", "f3"
#' p
pop_steps_after = function(step) {
stepNumber <- self$get_step_number(step)
if (stepNumber == self$length()) {
return(character(0)) # nothing to remove
}
nextStep <- stepNumber + 1
numbers2remove <- seq(from = nextStep, to = self$length())
removedSteps <- self$pipeline[["step"]][numbers2remove]
self$pipeline <- self$pipeline[-numbers2remove, ]
removedSteps
},
#' @description Drop all steps from and including the given step.
#' @param step `string` name of step
#' @return `character` vector of steps that were removed.
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$add("f3", \(z = 1) z)
#' p$pop_steps_from("f2") # "f2", "f3"
#' p
pop_steps_from = function(step) {
stepNumber <- self$get_step_number(step)
numbers2remove <- seq(from = stepNumber, to = self$length())
removedStepNames <- self$pipeline[["step"]][numbers2remove]
self$pipeline <- self$pipeline[-numbers2remove, ]
removedStepNames
},
#' @description Print the pipeline as a table.
#' @param verbose `logical` if `TRUE`, print all columns of the
#' pipeline, otherwise only the most relevant columns are displayed.
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$print()
print = function(verbose = FALSE) {
if (verbose) {
print(self$pipeline)
} else {
columns2print <- c(
"step", "depends", "out", "keepOut", "group", "state"
)
print(self$pipeline[, columns2print, with = FALSE])
}
invisible(self)
},
#' @description Remove certain step from the pipeline.
#' If other steps depend on the step to be removed, an error is
#' given and the removal is blocked, unless `recursive` was set to
#' `TRUE`.
#' @param step `string` the name of the step to be removed.
#' @param recursive `logical` if `TRUE` the step is removed together
#' with all its downstream dependencies.
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = ~add1) x + y)
#' p$add("mult1", \(x = 1, y = ~add2) x * y)
#' p$remove_step("mult1")
#' p
#' try(p$remove_step("add1")) # fails because "add2" depends on "add1"
#' p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2"
#' p
remove_step = function(
step,
recursive = FALSE
) {
private$.verify_step_exists(step)
deps <- self$get_depends_down(step, recursive)
hasDeps <- length(deps) > 0
if (hasDeps) {
stepsString <- paste0("'", deps, "'", collapse = ", ")
if (!recursive) {
stop_no_call(
"cannot remove step '", step, "' because the ",
"following steps depend on it: ", stepsString
)
}
# Remove all downstream dependencies starting from the end
message(
"Removing step '", step, "' and its downstream ",
"dependencies: ", stepsString
)
sapply(rev(deps), FUN = self$remove_step)
}
stepNumber <- self$get_step_number(step)
self$pipeline = self$pipeline[-stepNumber, ]
invisible(self)
},
#' @description Safely rename a step in the pipeline. If new step
#' name would result in a name clash, an error is given.
#' @param from `string` the name of the step to be renamed.
#' @param to `string` the new name of the step.
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("add1", \(data = ~data, x = 1) x + data)
#' p$add("add2", \(x = 1, y = ~add1) x + y)
#' p
#' try(p$rename_step("add1", "add2")) # fails because "add2" exists
#' p$rename_step("add1", "first_add") # Ok
#' p
rename_step = function(
from,
to
) {
private$.verify_step_exists(from)
private$.verify_step_does_not_exist(to)
self$pipeline[["step"]] <- pipeflow_replace_string(
self$pipeline[["step"]],
target = from,
replacement = to
)
self$pipeline[["depends"]] <- lapply(
self$pipeline[["depends"]],
FUN = pipeflow_replace_string,
target = from,
replacement = to
)
invisible(self)
},
#' @description Replaces an existing pipeline step.
#' @param step `string` the name of the step to be replaced. Step must
#' exist.
#' @param fun `string` or `function` operation to be applied at the
#' step. Both existing and lambda/anonymous functions can be used.
#' @param params `list` list of parameters to overwrite default
#' parameters of existing functions.
#' @param description `string` optional description of the step
#' @param group `string` grouping information (by default the same as
#' the name of the step. Any output collected later (see function
#' `collect_out` by default is put together by these group names. This,
#' for example, comes in handy when the pipeline is copy-appended
#' multiple times to keep the results of the same function/step at one
#' place.
#' @param keepOut `logical` if `FALSE` the output of the function will
#' be cleaned at the end of the whole pipeline execution. This option
#' is used to only keep the results that matter.
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 1) x + y)
#' p$add("add2", \(x = ~data, y = 2) x + y)
#' p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE)
#' p$run()$collect_out()
#' p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
#' p$run()$collect_out()
#' try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist
replace_step = function(
step,
fun,
params = list(),
description = "",
group = step,
keepOut = FALSE
) {
private$.verify_step_exists(step)
stopifnot(
is.function(fun) || is_string(fun),
is.list(params),
is_string(description),
is_string(group) && nzchar(group),
is.logical(keepOut)
)
if (is.function(fun)) {
funcName <- as.character(substitute(fun))[[1]]
}
else {
funcName <- fun
fun <- get(fun, mode = "function")
}
params <- private$.prepare_and_verify_params(fun, funcName, params)
# Derive and verify dependencies
allSteps <- self$get_step_names()
stepNumber <- self$get_step_number(step)
toStep = allSteps[stepNumber - 1]
deps <- private$.derive_dependencies(
params = params,
step = step,
toStep = toStep
)
sapply(
deps,
FUN = private$.verify_dependency,
step = step,
toStep = toStep
)
newStep <- list(
step = step,
fun = list(fun),
funcName = funcName,
params = list(params),
depends = list(deps),
out = list(NULL),
keepOut = keepOut,
group = group,
description = description,
time = Sys.time(),
state = "New"
)
self$pipeline[stepNumber, ] <- newStep
private$.update_states_downstream(step, state = "Outdated")
invisible(self)
},
#' @description Resets the pipeline to the state before it was run.
#' This means that all output is removed and the state of all steps
#' is reset to 'New'.
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1:2)
#' p$add("f1", \(x = 1) x)
#' p$add("f2", \(y = 1) y)
#' p$run()
#' p
#' p$reset()
#' p
reset = function() {
self$pipeline[["out"]] <- list(NULL)
self$pipeline[["state"]] <- "New"
invisible(self)
},
#' @description Run all new and/or outdated pipeline steps.
#' @param force `logical` if `TRUE` all steps are run regardless of
#' whether they are outdated or not.
#' @param recursive `logical` if `TRUE` and a step returns a new
#' pipeline, the run of the current pipeline is aborted and the
#' new pipeline is run recursively.
#' @param cleanUnkept `logical` if `TRUE` all output that was not
#' marked to be kept is removed after the pipeline run. This option
#' can be useful if temporary results require a lot of memory.
#' @param progress `function` this parameter can be used to provide a
#' custom progress function of the form `function(value, detail)`,
#' which will show the progress of the pipeline run for each step,
#' where `value` is the current step number and `detail` is the name
#' of the step.
#' @param showLog `logical` should the steps be logged during the
#' pipeline run?
#' @return returns the `Pipeline` object invisibly
#' @examples
#' # Simple pipeline
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 1) x + y)
#' p$add("add2", \(x = ~add1, z = 2) x + z)
#' p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
#' p$run()$collect_out()
#' p$set_params(list(z = 4)) # outdates steps add2 and final
#' p
#' p$run()$collect_out()
#' p$run(cleanUnkept = TRUE) # clean up temporary results
#' p
#'
#' # Recursive pipeline
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 1) x + y)
#' p$add("new_pipe", \(x = ~add1) {
#' pp <- Pipeline$new("new_pipe", data = x)
#' pp$add("add1", \(x = ~data) x + 1)
#' pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE)
#' }
#' )
#' p$run(recursive = TRUE)$collect_out()
#'
#' # Run pipeline with progress bar
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("first step", \() Sys.sleep(1))
#' p$add("second step", \() Sys.sleep(1))
#' p$add("last step", \() Sys.sleep(1))
#' pb <- txtProgressBar(min = 1, max = p$length(), style = 3)
#' fprogress <- function(value, detail) {
#' setTxtProgressBar(pb, value)
#' }
#' p$run(progress = fprogress, showLog = FALSE)
run = function(
force = FALSE,
recursive = TRUE,
cleanUnkept = FALSE,
progress = NULL,
showLog = TRUE
) {
stopifnot(
is.logical(force),
is.logical(recursive),
is.logical(cleanUnkept),
is.logical(showLog)
)
log_info <- function(msg, ...) {
if (showLog) {
private$.lg(level = "info", msg = msg, ...)
}
}
gettextf("Start run of '%s' pipeline:", self$name) |> log_info()
to <- self$length()
for (i in seq(from = 1, to = to)) {
step <- as.character(self$pipeline[i, "step"])
if (is.function(progress)) {
progress(value = i, detail = step)
}
state <- self$get_step(step)[["state"]]
info <- gettextf("Step %i/%i %s", i, to, step)
if (state == "Locked" || (state == "Done" && !force)) {
paste0(info, " - skip '", tolower(state), "' step") |>
log_info()
next()
}
log_info(info)
res <- private$.run_step(step)
if (inherits(res, "Pipeline") && recursive) {
log_info("Abort pipeline execution and restart on new.")
self <- res
self$run(
force = force,
recursive = TRUE,
progress = progress,
showLog = showLog
)
return(invisible(self))
}
}
log_info("Finished execution of steps.")
if (cleanUnkept) {
log_info("Clean temporary results.")
private$.clean_out_not_kept()
}
log_info("Done.")
invisible(self)
},
#' @description Run given pipeline step possibly together with
#' upstream and downstream dependencies.
#' @param step `string` name of step
#' @param upstream `logical` if `TRUE`, run all dependent upstream
#' steps first.
#' @param downstream `logical` if `TRUE`, run all depdendent
#' downstream afterwards.
#' @param cleanUnkept `logical` if `TRUE` all output that was not
#' marked to be kept is removed after the pipeline run. This option
#' can be useful if temporary results require a lot of memory.
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 1) x + y)
#' p$add("add2", \(x = ~add1, z = 2) x + z)
#' p$add("mult", \(x = ~add1, y = ~add2) x * y)
#' p$run_step("add2")
#' p$run_step("add2", downstream = TRUE)
#' p$run_step("mult", upstream = TRUE)
run_step = function(
step,
upstream = TRUE,
downstream = FALSE,
cleanUnkept = FALSE
) {
private$.verify_step_exists(step)
stopifnot(
is.logical(upstream),
is.logical(downstream),
is.logical(cleanUnkept)
)
upstreamSteps <- character(0)
steps <- step
downstreamSteps <- character(0)
if (upstream) {
upstreamSteps <- self$get_depends_up(
step = step,
recursive = TRUE
)
steps <- c(upstreamSteps, step)
}
if (downstream) {
downstreamSteps <- self$get_depends_down(
step = step,
recursive = TRUE
)
steps <- c(steps, downstreamSteps)
}
nStep <- length(steps)
log_info <- function(msg, ...) {
private$.lg(level = "info", msg = msg, ...)
}
gettextf("Start step run of '%s' pipeline:", self$name) |>
log_info()
for (i in seq_along(steps)) {
step <- steps[i]
state <- self$get_step(step)[["state"]]
stream <- ""
if (step %in% upstreamSteps) {
stream <- " (upstream)"
}
if (step %in% downstreamSteps) {
stream <- " (downstream)"
}
info <- gettextf("Step %i/%i %s%s", i, nStep, step, stream)
if (state == "Locked") {
paste0(info, " - skip '", tolower(state), "' step") |>
log_info()
next()
}
log_info(info)
private$.run_step(step)
}
log_info("Finished execution of steps.")
if (cleanUnkept) {
log_info("Clean temporary results.")
private$.clean_out_not_kept()
}
log_info("Done.")
invisible(self)
},
#' @description Set data in first step of pipeline.
#' @param data initial data set
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
#' p$run()$collect_out()
#' p$set_data(3)
#' p$run()$collect_out()
set_data = function(data)
{
step <- self$get_step_names()[1]
self$replace_step(
step = step,
fun = function() data,
keepOut = FALSE
)
},
#' @description This function can be used to apply the pipeline
#' repeatedly to various data sets. For this, the pipeline split-copies
#' itself by the list of given data sets. Each sub-pipeline will have
#' one of the data sets set as input data.
#' The step names of the sub-pipelines will be the original
#' step names plus the name of the data set.
#' @param dataList `list` of data sets
#' @param toStep `string` step name marking optional subset of
#' the pipeline, at which the data split should be applied to.
#' @param groupBySplit `logical` whether to set step groups according
#' to data split.
#' @param sep `string` separator to be used between step name and
#' data set name when creating the new step names.
#' @return new combined `Pipeline` with each sub-pipeline having set
#' one of the data sets.
#' @examples
#' # Split by three data sets
#' dataList <- list(a = 1, b = 2, c = 3)
#' p <- Pipeline$new("pipe")
#' p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
#' p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
#' p$set_data_split(dataList)
#' p
#' p$run()$collect_out() |> str()
#'
#' # Don't group output by split
#' p <- Pipeline$new("pipe")
#' p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
#' p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
#' p$set_data_split(dataList, groupBySplit = FALSE)
#' p
#' p$run()$collect_out() |> str()
#'
#' # Split up to certain step
#' p <- Pipeline$new("pipe")
#' p$add("add1", \(x = ~data) x + 1)
#' p$add("mult", \(x = ~data, y = ~add1) x * y)
#' p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
#' p
#' p$get_depends()[["average_result"]]
#'
#' p$set_data_split(dataList, toStep = "mult")
#' p
#' p$get_depends()[["average_result"]]
#'
#' p$run()$collect_out()
set_data_split = function(
dataList,
toStep = character(),
groupBySplit = TRUE,
sep = "."
) {
if (length(toStep) == 0) {
toStep <- utils::tail(self$get_step_names(), 1)
}
stopifnot(
is.list(dataList),
is_string(toStep),
is.logical(groupBySplit),
is_string(sep)
)
private$.verify_step_exists(toStep)
to <- self$get_step_number(toStep)
upperSteps <- self$get_step_names()[seq_len(to)]
dataNames <- names(dataList)
if (is.null(dataNames)) {
dataNames = as.character(seq_along(dataList))
}
init_new_pipeline_with_data = function(data) {
self$clone(deep = TRUE)$set_data(data)
}
# Create new pipeline for each data set
pipes <- lapply(dataList, init_new_pipeline_with_data)
for (i in seq_along(pipes)) {
name <- dataNames[[i]]
pipes[[i]]$name <- name
pipes[[i]]$pipeline <- pipes[[i]]$pipeline[seq_len(to), ]
pipes[[i]]$append_to_step_names(pipes[[i]]$name, sep = sep)
newGroups <- name
if (!groupBySplit) {
oldGroups <- pipes[[i]]$pipeline[["group"]]
newGroups <- paste0(oldGroups, sep, name)
}
pipes[[i]]$pipeline[["group"]] <- newGroups
}
# Combine pipelines
pipeNames <- sapply(pipes, function(x) x$name)
combinedName <- paste(self$name, "split")
pip <- Pipeline$new(combinedName)
combined <- Reduce(
c(pip, pipes),
f = function(x, y) x$append(y, sep = sep)
)
combined$remove_step("data")
# If subset was used for split, append the remaining steps and
# update all of the (now changed) upstream dependencies.
if (to < self$length()) {
remainingPipe <- self$pipeline[(to + 1):self$length(), ]
remainingDeps <- remainingPipe[["depends"]]
update_if_needed <- function(x) {
needsUpdate <- x %in% upperSteps
if (needsUpdate) paste0(x, sep, pipeNames) else x
}
updatedDeps <- lapply(
remainingDeps,
function(deps) {
res = lapply(deps, update_if_needed)
if (all(sapply(res, function(x) length(x) == 1))) {
res = unlist(res) # flatten single deps
}
res
}
)
remainingPipe[["depends"]] <- updatedDeps
combined$pipeline <- rbind(combined$pipeline, remainingPipe)
}
self$pipeline <- combined$pipeline
invisible(self)
},
#' @description Change the `keepOut` flag at a given pipeline step,
#' which determines whether the output of that step is collected
#' when calling `collect_out()` after the pipeline was run.
#' @param step `string` name of step
#' @param keepOut `logical` whether to keep output of step
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
#' p$add("add2", \(x = ~data, y = 2) x + y)
#' p$add("mult", \(x = ~add1, y = ~add2) x * y)
#' p$run()$collect_out()
#' p$set_keep_out("add1", keepOut = FALSE)
#' p$set_keep_out("mult", keepOut = TRUE)
#' p$collect_out()
set_keep_out = function(step, keepOut = TRUE)
{
stopifnot(
is.logical(keepOut)
)
private$.set_at_step(step, "keepOut", value = keepOut)
invisible(self)
},
#' @description Set parameters in the pipeline. If a parameter occurs
#' in several steps, the parameter is set commonly in all steps.
#' Trying to set parameters that don't exist in the pipeline is ignored,
#' by default, with a warning.
#' @param params `list` of parameters to be set
#' @param warnUndefined `logical` whether to give a warning when trying
#' to set a parameter that is not defined in the pipeline.
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 2) x + y)
#' p$add("add2", \(x = ~data, y = 3) x + y)
#' p$add("mult", \(x = 4, z = 5) x * z)
#' p$get_params()
#' p$set_params(list(x = 3, y = 3))
#' p$get_params()
#' p$set_params(list(x = 5, z = 3))
#' p$get_params()
#' suppressWarnings(
#' p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined
#' )
#' p$set_params(list(foo = 3), warnUndefined = FALSE)
set_params = function(params, warnUndefined = TRUE)
{
if (!is.list(params)) {
stop_no_call("params must be a list")
}
definedParams <- self$get_params_unique(ignoreHidden = FALSE)
extra <- setdiff(names(params), names(definedParams))
if (warnUndefined && length(extra) > 0) {
warning(
"Trying to set parameters not defined in the pipeline: ",
toString(extra)
)
}
for (step in self$get_step_names()) {
paramsAtStep <- self$get_params_at_step(
step,
ignoreHidden = FALSE
)
overlap <- intersect(names(params), names(paramsAtStep))
if (length(overlap) > 0) {
paramsAtStep[overlap] <- params[overlap]
self$set_params_at_step(step, paramsAtStep)
}
}
invisible(self)
},
#' @description Set unbound function parameters defined at given
#' pipeline step where 'unbound' means parameters that are not
#' linked to other steps.
#' @param step `string` the name of the step
#' @param params `list` of parameters to be set
#' @return returns the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = ~data, y = 2, z = 3) x + y)
#' p$set_params_at_step("add1", list(y = 5, z = 6))
#' p$get_params()
#' try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined
set_params_at_step = function(
step,
params
) {
stopifnot(
is_string(step),
is.list(params)
)
isLocked <- self$get_step(step)[["state"]] == "Locked"
if (isLocked) {
message(
"skipping setting parameters ",
paste0(names(params), collapse = ", "),
" at locked step '", step, "'"
)
return(invisible(self))
}
current <- self$get_params_at_step(step, ignoreHidden = FALSE)
extra <- setdiff(names(params), names(current))
if (length(extra) > 0) {
stop_no_call(
"Unable to set parameter(s) ", toString(extra),
" at step ", step, " - candidates are ",
toString(names(current))
)
}
toUpdate <- intersect(names(params), names(current))
hasUpdate <- length(toUpdate) > 0
replace_param <- function(old, new) {
is <- methods::is
if (old |> is("Param") && !(new |> is("Param"))) {
old@value <- new
return(old)
}
new
}
if (hasUpdate) {
# Update params
current <- self$get_step(step)[["params"]] |> unlist1()
current[toUpdate] <- mapply(
FUN = replace_param,
old = current[toUpdate],
new = params[toUpdate],
SIMPLIFY = FALSE
)
private$.set_at_step(step, "params", value = current)
# Update state if applicable
state <- self$get_step(step)[["state"]]
if (state == "Done") {
private$.set_at_step(step, "state", "Outdated")
private$.update_states_downstream(step, "Outdated")
}
}
invisible(self)
},
#' @description Splits pipeline into its independent parts.
#' @return list of `Pipeline` objects
#' @examples
#' # Example for two independent calculation paths
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("f1", \(x = ~data) x)
#' p$add("f2", \(x = 1) x)
#' p$add("f3", \(x = ~f1) x)
#' p$add("f4", \(x = ~f2) x)
#' p$split()
#'
#' # Example of split by three data sets
#' dataList <- list(a = 1, b = 2, c = 3)
#' p <- Pipeline$new("pipe")
#' p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
#' p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
#' pipes <- p$set_data_split(dataList)$split()
#' pipes
split = function()
{
groups <- private$.get_depends_grouped()
name <- self$name
newNames <- paste0(name, seq_along(groups))
pips <- lapply(newNames, \(name) Pipeline$new(name))
for (i in seq_along(groups)) {
pip <- pips[[i]]
stepNumbers <- which(self$pipeline[["step"]] %in% groups[[i]])
pip$pipeline <- self$pipeline[stepNumbers, ]
}
pips
},
#' @description Unlock previously locked step. If step was not locked,
#' the command is ignored.
#' @param step `string` name of step
#' @return the `Pipeline` object invisibly
#' @examples
#' p <- Pipeline$new("pipe", data = 1)
#' p$add("add1", \(x = 1, data = ~data) x + data)
#' p$add("add2", \(x = 1, data = ~data) x + data)
#' p$lock_step("add1")
#' p$set_params(list(x = 3))
#' p$get_params()
#' p$unlock_step("add1")
#' p$set_params(list(x = 3))
#' p$get_params()
unlock_step = function(step) {
state <- self$get_step(step)[["state"]]
if (state == "Locked") {
private$.set_at_step(step, "state", "Unlocked")
}
invisible(self)
}
),
private = list(
.lg = NULL, # the logger function
deep_clone = function(name, value) {
if (name == "pipeline")
return(data.table::copy(value))
value
},
.clean_out_not_kept = function()
{
areNotKept <- !self$pipeline[["keepOut"]]
data.table::set(
self$pipeline,
i = which(areNotKept),
j = "out",
value = list(list(NULL))
)
data.table::set(
self$pipeline,
i = which(areNotKept),
j = "state",
value = "Outdated"
)
},
.create_edge_table = function(
groups = NULL
) {
deps <- self$get_depends()
if (!is.null(groups)) {
# Only use dependencies defined by the given groups
stopifnot(
is.character(groups),
all(groups %in% self$pipeline[["group"]])
)
deps <- deps[self$pipeline[["group"]] %in% groups]
}
deps <- Filter(deps, f = function(x) length(x) > 0)
if (length(deps) == 0) {
return(
data.frame(
from = numeric(0),
to = numeric(0),
arrows = character()
)
)
}
create_edges_for_step <- function(step, depsAtStep) {
if (is.list(depsAtStep)) {
# Handle nested dependencies, for example, when combining
# after data split
depsAtStep <- unlist(depsAtStep)
}
data.frame(
from = sapply(depsAtStep, FUN = self$get_step_number),
to = self$get_step_number(step)
)
}
edges <- mapply(
step = names(deps),
depsAtStep = deps,
FUN = create_edges_for_step,
SIMPLIFY = FALSE
) |>
do.call(what = rbind, args = _) |>
replace("arrows", value = "to")
edges[order(edges[["from"]], edges[["to"]]), ]
},
.create_node_table = function(
groups = NULL
) {
pip <- self$pipeline
nodes <- data.frame(
"id" = seq_len(self$length()),
"label" = self$get_step_names(),
"group" = pip[["group"]],
"shape" = "box",
"color" = "grey",
"title" = paste0("<p>", pip[["description"]], "</p>")
)
nodes[["color"]][pip[["state"]] == "New"] <- "lightblue"
nodes[["color"]][pip[["state"]] == "Done"] <- "lightgreen"
nodes[["color"]][pip[["state"]] == "Outdated"] <- "orange"
nodes[["color"]][pip[["state"]] == "Failed"] <- "red"
nodes[["color"]][pip[["state"]] == "Locked"] <- "grey"
areDataNodes <- nodes[, "label"] |> startsWith("data")
if (any(areDataNodes)) {
nodes[areDataNodes, "shape"] <- "database"
}
nodes[["shape"]][pip[["keepOut"]]] <- "circle"
if (!is.null(groups)) {
stopifnot(
is.character(groups),
all(groups %in% self$pipeline[["group"]])
)
nodes <- nodes[self$pipeline[["group"]] %in% groups, ]
}
nodes
},
.derive_dependencies = function(
params,
step,
toStep = private$.get_last_step()
) {
if (length(toStep) == 0) {
return(character())
}
stopifnot(
is_string(step),
is_string(toStep)
)
private$.verify_step_exists(toStep)
deps <- private$.extract_depends_from_param_list(params)
# Handle any relative dependencies
allSteps <- self$get_step_names()
toIndex <- self$get_step_number(toStep)
consideredSteps <- allSteps[seq_len(toIndex)]
relativeDeps <- deps |>
Filter(f = function(x) startsWith(x, "-")) |>
sapply(as.numeric)
stepIndices <- mapply(
relative_dep = relativeDeps,
dependencyName = names(relativeDeps),
FUN = private$.relative_dependency_to_index,
MoreArgs = list(
startIndex = toIndex + 1,
step = step
)
) |> as.integer()
deps[names(relativeDeps)] <- consideredSteps[stepIndices]
deps
},
.extract_dependent_out = function(
depends,
out
) {
stopifnot(
is.character(depends) || is.list(depends),
is.list(out),
all(unlist(depends) %in% names(out))
)
if (length(depends) == 0) {
return(NULL)
}
extract_out <- function(x) {
if (length(x) == 1) {
out[[x]]
} else {
# If multiple dependencies are given, return a list
out[x]
}
}
lapply(depends, FUN = extract_out)
},
.extract_depends_from_param_list = function(
params
) {
if (length(params) == 0) {
return(character())
}
stopifnot(is.list(params))
# Extract the dependency name from the formula, that is, ~x
# becomes "x" and ~-1 becomes -1
deps <- params |>
Filter(f = function(x) methods::is(x, "formula")) |>
sapply(FUN = function(x) deparse(x[[2]]))
if (length(deps) == 0) {
return(character()) # otherwise named list() would be returned
}
deps
},
.get_depends_grouped = function()
{
res <- list()
steps <- self$get_step_names()
for (step in rev(steps)) {
if (!step %in% unlist(res)) {
stepGroup <- self$get_step_names() |>
intersect(c(step, self$get_depends_up(step)))
res <- c(list(stepGroup), res)
}
}
res
},
.get_downstream_depends = function(
step,
depends,
recursive = TRUE
) {
stopifnot(
is_string(step),
is.character(depends) || is.list(depends),
is.logical(recursive)
)
result <- depends |>
Filter(f = function(x) step %in% unlist(x)) |>
names()
if (recursive) {
result <- c(
result,
sapply(
result,
FUN = private$.get_downstream_depends,
depends = depends,
recursive = TRUE
)
)
}
unique(unlist(result)) |> as.character()
},
.get_last_step = function() {
self$get_step_names() |> utils::tail(1)
},
.get_upstream_depends = function(
step,
depends,
recursive = TRUE
) {
stopifnot(
is_string(step),
is.character(depends) || is.list(depends),
is.logical(recursive)
)
if (length(depends) == 0) {
return(character())
}
result <- unlist(depends[[step]]) |> as.character()
if (recursive) {
result <- c(
result,
sapply(
result,
FUN = private$.get_upstream_depends,
depends = depends,
recursive = TRUE
)
)
}
unique(unlist(result)) |> as.character()
},
.prepare_and_verify_params = function(
fun,
funcName,
params = list()
) {
stopifnot(
is.function(fun),
is_string(funcName),
is.list(params)
)
params <- replace(formals(fun), names(params), params)
params <- params[!names(params) %in% "..."] # ignore dots
private$.verify_fun_params(fun, funcName, as.list(params))
params <- lapply(params, eval)
params
},
.relative_dependency_to_index = function(
relative_dep,
dependencyName,
startIndex,
step # required for error message
) {
stopifnot(
is_number(relative_dep),
relative_dep < 0,
is_string(dependencyName),
is_number(startIndex),
startIndex > 0,
is_string(step)
)
absIndex <- relative_dep + startIndex
if (absIndex < 1) {
stop_no_call(
"step '", step, "': relative dependency ",
paste0(dependencyName, "=", relative_dep),
" points to outside the pipeline"
)
}
absIndex
},
.run_step = function(step)
{
private$.verify_step_exists(step)
pip <- self$pipeline
thisWasRunSuccessfully <- FALSE
on.exit(
if (!thisWasRunSuccessfully) {
private$.set_at_step(step, "state", value = "Failed")
},
add = TRUE
)
row <- self$get_step(step) |> unlist1()
fun <- row[["fun"]]
args <- row[["params"]]
deps <- row[["depends"]]
# If calculation depends on results of earlier steps, get them from
# respective output slots of the pipeline.
hasDeps <- length(deps) > 0
if (hasDeps) {
out <- pip[["out"]] |> stats::setNames(pip[["step"]])
depdendentOut <- private$.extract_dependent_out(deps, out)
args[names(depdendentOut)] <- depdendentOut
}
# If arg is encapsulated in a Param object, get the value
args <- lapply(
args,
FUN = function(arg) {
if (methods::is(arg, "Param")) arg@value else arg
}
)
iStep <- self$get_step_number(step)
context <- gettextf("Step %i ('%s')", iStep, step)
res <- withCallingHandlers(
do.call(fun, args = args),
error = function(e) {
msg <- e$message
private$.lg(level = "error", msg = msg, context = context)
stop_no_call(e$message)
},
warning = function(w) {
msg <- w$message
private$.lg(level = "warn", msg = msg, context = context)
}
)
if (self$has_step(step)) {
private$.set_at_step(step, "time", value = Sys.time())
private$.set_at_step(step, "out", value = res)
private$.set_at_step(step, "state", value = "Done")
private$.update_states_downstream(step, "Outdated")
}
thisWasRunSuccessfully <- TRUE
invisible(res)
},
.set_at_step = function(
step,
field,
value
) {
i <- self$get_step_number(step)
stopifnot(
is_string(field),
field %in% names(self$pipeline)
)
class <- data.class(self$pipeline[[field]])
if (class == "list") {
self$pipeline[[field]][i] <- list(value)
} else {
stopifnot(data.class(value) == class)
self$pipeline[[field]][i] <- value
}
},
.update_states_downstream = function(step, state)
{
private$.verify_step_exists(step)
stopifnot(is_string(state))
deps <- self$get_depends_down(step, recursive = TRUE)
for (dep in deps) {
current <- self$get_step(dep)[["state"]]
if (current != "Locked") {
private$.set_at_step(dep, "state", value = state)
}
}
},
.verify_dependency = function(
dep,
step, # required for error message
toStep = private$.get_last_step()
) {
stopifnot(
is_string(dep),
is_string(step),
is_string(toStep)
)
private$.verify_step_exists(toStep)
allSteps <- self$get_step_names()
toIndex <- match(toStep, allSteps)
consideredSteps <- allSteps[seq_len(toIndex)]
if (!(dep %in% consideredSteps)) {
msg = paste0(
"step '", step, "': dependency '", dep, "' not found"
)
if (toStep != private$.get_last_step()) {
msg = paste0(
msg, " up to step '", consideredSteps[toIndex], "'"
)
}
stop_no_call(msg)
}
invisible(TRUE)
},
.verify_from_to = function(from, to)
{
stopifnot(
is_number(from),
is_number(to),
from > 0,
from <= to
)
if (to > self$length()) {
stop_no_call("'to' must not be larger than pipeline length")
}
invisible(TRUE)
},
.verify_fun_params = function(
fun,
funcName,
params = as.list(formals(fun))
) {
stopifnot(
is.function(fun),
is_string(funcName),
is.list(params)
)
fargs <- formals(fun)
hasDots <- "..." %in% names(fargs)
# Unless there are dots, all parameters should appear in the
# function definition
if (hasDots) {
fargs <- fargs[!names(fargs) %in% "..."]
} else {
unknownParams <- setdiff(names(params), names(fargs))
if (length(unknownParams) > 0) {
stop_no_call(
paste0("'", unknownParams, "'", collapse = ", "),
" are no function parameters of '", funcName, "'"
)
}
}
# Signal undefined parameters, e.g. things like function(x, y = 1)
isUndefined <- function(x) {
is.name(x) && toString(x) == ""
}
undefinedParams <- Filter(params, f = isUndefined)
if (length(undefinedParams) > 0) {
stop_no_call(
paste0("'", names(undefinedParams), "'", collapse = ", "),
" parameter(s) must have default values"
)
}
invisible(TRUE)
},
.verify_step_does_not_exist = function(step)
{
if (self$has_step(step)) {
stop_no_call("step '", step, "' already exists")
}
},
.verify_step_exists = function(step)
{
if (!self$has_step(step)) {
stop_no_call("step '", step, "' does not exist")
}
}
)
)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.