Nothing
#' @title Transformations in `drake_plan()`. `r lifecycle::badge("stable")`
#' @name transformations
#' @aliases map split cross combine group
#' @description In [drake_plan()], you can define whole batches
#' of targets with transformations such as
#' `map()`, `split()`, `cross()`, and `combine()`.
#' @details For details, see
#' `https://books.ropensci.org/drake/plans.html#large-plans`.
#' @section Transformations:
#' `drake` has special syntax for generating large plans.
#' Your code will look something like
#' `drake_plan(y = target(f(x), transform = map(x = c(1, 2, 3)))`
#' You can read about this interface at
#' `https://books.ropensci.org/drake/plans.html#large-plans`. # nolint
#' @section Static branching:
#' In static branching, you define batches of targets
#' based on information you know in advance.
#' Overall usage looks like
#' `drake_plan(<x> = target(<...>, transform = <call>)`,
#' where
#' - `<x>` is the name of the target or group of targets.
#' - `<...>` is optional arguments to [target()].
#' - `<call>` is a call to one of the transformation functions.
#'
#' Transformation function usage:
#' - `map(..., .data, .names, .id, .tag_in, .tag_out)`
#' - `split(..., slices, margin = 1L, drop = FALSE, .names, .tag_in, .tag_out)` # nolint
#' - `cross(..., .data, .names, .id, .tag_in, .tag_out)`
#' - `combine(..., .by, .names, .id, .tag_in, .tag_out)`
#' @section Dynamic branching:
#' - `map(..., .trace)`
#' - `cross(..., .trace)`
#' - `group(..., .by, .trace)`
#'
#' `map()` and `cross()` create dynamic sub-targets from the variables
#' supplied to the dots. As with static branching, the variables
#' supplied to `map()` must all have equal length.
#' `group(f(data), .by = x)` makes new dynamic
#' sub-targets from `data`. Here, `data` can be either static or dynamic.
#' If `data` is dynamic, `group()` aggregates existing sub-targets.
#' If `data` is static, `group()` splits `data` into multiple
#' subsets based on the groupings from `.by`.
#'
#' Differences from static branching:
#' - `...` must contain *unnamed* symbols with no values supplied,
#' and they must be the names of targets.
#' - Arguments `.id`, `.tag_in`, and `.tag_out` no longer apply.
#'
#' @param ... Grouping variables. New grouping variables must be
#' supplied with their names and values, existing grouping variables
#' can be given as symbols without any values assigned.
#' For dynamic branching, the entries in `...` must be unnamed symbols
#' with no values supplied, and they must be the names of targets.
#' @param .data A data frame of new grouping variables with
#' grouping variable names as column names and values as elements.
#' @param .names Literal character vector of names for the targets.
#' Must be the same length as the targets generated.
#' @param .id Symbol or vector of symbols naming grouping variables
#' to incorporate into target names. Useful for creating short target
#' names. Set `.id = FALSE` to use integer indices as target name suffixes.
#' @param .tag_in A symbol or vector of symbols. Tags assign targets
#' to grouping variables. Use `.tag_in` to assign *untransformed*
#' targets to grouping variables.
#' @param .tag_out Just like `.tag_in`, except that `.tag_out`
#' assigns *transformed* targets to grouping variables.
#' @param slice Number of slices into which `split()` partitions the data.
#' @param margin Which margin to take the slices in `split()`. Same meaning
#' as the `MARGIN` argument of `apply()`.
#' @param drop Logical, whether to drop a dimension if its length is 1.
#' Same meaning as `mtcars[, 1L, drop = TRUE]` versus
#' `mtcars[, 1L, drop = TRUE]`.
#' @param .by Symbol or vector of symbols of grouping variables.
#' `combine()` aggregates/groups targets by the grouping variables in `.by`.
#' For dynamic branching, `.by` can only take one variable at a time,
#' and that variable must be a vector. Ideally, it should take
#' little space in memory.
#' @param .trace Symbol or vector of symbols for the dynamic trace.
#' The dynamic trace allows you to keep track of the values of
#' dynamic dependencies are associated with individual sub-targets.
#' For `combine()`, `.trace` must either be empty or the same as the
#' variable given for `.by`.
#' See [get_trace()] and [read_trace()] for examples and other details.
#' @examples
#' # Static branching
#' models <- c("glm", "hierarchical")
#' plan <- drake_plan(
#' data = target(
#' get_data(x),
#' transform = map(x = c("simulated", "survey"))
#' ),
#' analysis = target(
#' analyze_data(data, model),
#' transform = cross(data, model = !!models, .id = c(x, model))
#' ),
#' summary = target(
#' summarize_analysis(analysis),
#' transform = map(analysis, .id = c(x, model))
#' ),
#' results = target(
#' bind_rows(summary),
#' transform = combine(summary, .by = data)
#' )
#' )
#' plan
#' if (requireNamespace("styler")) {
#' print(drake_plan_source(plan))
#' }
#' # Static splitting
#' plan <- drake_plan(
#' analysis = target(
#' analyze(data),
#' transform = split(data, slices = 3L, margin = 1L, drop = FALSE)
#' )
#' )
#' print(plan)
#' if (requireNamespace("styler", quietly = TRUE)) {
#' print(drake_plan_source(plan))
#' }
#' # Static tags:
#' drake_plan(
#' x = target(
#' command,
#' transform = map(y = c(1, 2), .tag_in = from, .tag_out = c(to, out))
#' ),
#' trace = TRUE
#' )
#' plan <- drake_plan(
#' survey = target(
#' survey_data(x),
#' transform = map(x = c(1, 2), .tag_in = source, .tag_out = dataset)
#' ),
#' download = target(
#' download_data(),
#' transform = map(y = c(5, 6), .tag_in = source, .tag_out = dataset)
#' ),
#' analysis = target(
#' analyze(dataset),
#' transform = map(dataset)
#' ),
#' results = target(
#' bind_rows(analysis),
#' transform = combine(analysis, .by = source)
#' )
#' )
#' plan
#' if (requireNamespace("styler", quietly = TRUE)) {
#' print(drake_plan_source(plan))
#' }
NULL
#' @title Transform a plan
#' `r lifecycle::badge("stable")`
#' @description Evaluate the `map()`, `cross()`, `split()` and
#' `combine()` operations in the `transform` column of a
#' `drake` plan.
#' @details `https://books.ropensci.org/drake/plans.html#large-plans` # nolint
#' @export
#' @seealso drake_plan, map, split, cross, combine
#' @param plan A `drake` plan with a `transform` column
#' @param envir Environment for tidy evaluation.
#' @param trace Logical, whether to add columns to show
#' what happens during target transformations.
#' @param max_expand Positive integer, optional.
#' `max_expand` is the maximum number of targets to generate in each
#' `map()`, `split()`, or `cross()` transform.
#' Useful if you have a massive plan and you want to
#' test and visualize a strategic subset of targets
#' before scaling up.
#' Note: the `max_expand` argument of `drake_plan()` and
#' `transform_plan()` is for static branching only.
#' The dynamic branching `max_expand`
#' is an argument of `make()` and `drake_config()`.
#' @param tidy_eval Logical, whether to use tidy evaluation
#' (e.g. unquoting/`!!`) when resolving commands.
#' Tidy evaluation in transformations is always turned on
#' regardless of the value you supply to this argument.
#' @examples
#' plan1 <- drake_plan(
#' y = target(
#' f(x),
#' transform = map(x = c(1, 2))
#' ),
#' transform = FALSE
#' )
#' plan2 <- drake_plan(
#' z = target(
#' g(y),
#' transform = map(y, .id = x)
#' ),
#' transform = FALSE
#' )
#' plan <- bind_plans(plan1, plan2)
#' transform_plan(plan)
#' models <- c("glm", "hierarchical")
#' plan <- drake_plan(
#' data = target(
#' get_data(x),
#' transform = map(x = c("simulated", "survey"))
#' ),
#' analysis = target(
#' analyze_data(data, model),
#' transform = cross(data, model = !!models, .id = c(x, model))
#' ),
#' summary = target(
#' summarize_analysis(analysis),
#' transform = map(analysis, .id = c(x, model))
#' ),
#' results = target(
#' bind_rows(summary),
#' transform = combine(summary, .by = data)
#' )
#' )
#' plan
#' if (requireNamespace("styler", quietly = TRUE)) {
#' print(drake_plan_source(plan))
#' }
#' # Tags:
#' drake_plan(
#' x = target(
#' command,
#' transform = map(y = c(1, 2), .tag_in = from, .tag_out = c(to, out))
#' ),
#' trace = TRUE
#' )
#' plan <- drake_plan(
#' survey = target(
#' survey_data(x),
#' transform = map(x = c(1, 2), .tag_in = source, .tag_out = dataset)
#' ),
#' download = target(
#' download_data(),
#' transform = map(y = c(5, 6), .tag_in = source, .tag_out = dataset)
#' ),
#' analysis = target(
#' analyze(dataset),
#' transform = map(dataset)
#' ),
#' results = target(
#' bind_rows(analysis),
#' transform = combine(analysis, .by = source)
#' )
#' )
#' plan
#' if (requireNamespace("styler", quietly = TRUE)) {
#' print(drake_plan_source(plan))
#' }
transform_plan <- function(
plan,
envir = parent.frame(),
trace = FALSE,
max_expand = NULL,
tidy_eval = TRUE
) {
force(envir)
transform_plan_(
plan = plan,
envir = envir,
trace = trace,
max_expand = max_expand,
tidy_eval = tidy_eval,
sanitize = TRUE
)
}
transform_plan_ <- function(
plan,
envir,
trace,
max_expand,
tidy_eval,
sanitize
) {
if (!("transform" %in% names(plan))) {
return(plan)
}
old_cols(plan) <- old_cols <- colnames(plan)
plan$transform <- tidyeval_exprs(plan$transform, envir = envir)
plan <- convert_splits_to_maps(plan)
plan$transform <- lapply(plan$transform, parse_transform)
graph <- dsl_graph(plan)
order <- igraph::topo_sort(graph)$name
subplans <- split(plan, f = plan$target)
for (target in order) {
upstream_plan <- dsl_upstream_plan(target, graph, subplans)
index <- which(target == upstream_plan$target)
old_cols(upstream_plan) <- old_cols
subplans[[target]] <- transform_row(index, upstream_plan, graph, max_expand)
}
plan <- drake_bind_rows(subplans)
old_cols(plan) <- old_cols
plan <- dsl_trace(plan = plan, trace = trace)
old_cols(plan) <- plan$transform <- NULL
plan <- dsl_tidy_eval(plan = plan, tidy_eval = tidy_eval, envir = envir)
plan <- dsl_sanitize(plan = plan, sanitize = sanitize, envir = envir)
plan
}
dsl_upstream_plan <- function(target, graph, subplans) {
upstream_targets <- upstream_nodes(graph, target)
drake_bind_rows(subplans[upstream_targets])
}
dsl_trace <- function(plan, trace) {
if (!trace) {
keep <- as.character(intersect(colnames(plan), old_cols(plan)))
plan <- plan[, intersect(colnames(plan), old_cols(plan)), drop = FALSE]
}
plan
}
dsl_tidy_eval <- function(plan, tidy_eval, envir) {
if (tidy_eval) {
for (col in setdiff(colnames(plan), c("target", "transform"))) {
plan[[col]] <- tidyeval_exprs(plan[[col]], envir = envir)
}
}
plan
}
dsl_sanitize <- function(plan, sanitize, envir) {
if (sanitize) {
plan <- sanitize_plan(plan, envir = envir)
}
plan
}
convert_splits_to_maps <- function(plan) {
fields <- c("target", "command", "transform")
for (i in seq_len(nrow(plan))) {
skip <- safe_is_na(plan$transform[[i]]) ||
plan$transform[[i]][[1]] != quote(split)
if (skip) {
next
}
out <- convert_split_to_map(
target = plan$target[[i]],
command = plan$command[[i]],
transform = plan$transform[[i]]
)
plan$command[[i]] <- out$command
plan$transform[[i]] <- out$transform
}
plan
}
convert_split_to_map <- function(target, command, transform) {
slice <- transform
slice[[1]] <- quote(drake_slice)
index <- paste0(target, "_index")
slice$index <- as.symbol(index)
args <- setdiff(names(transform), names(formals(drake_slice)))
args <- args[nzchar(args)]
for (arg in args) {
slice[[arg]] <- NULL
}
arglist <- as.list(transform)[args]
slice <- match.call(drake_slice, slice)
sub <- list(slice = slice)
names(sub) <- safe_deparse(slice$data, backtick = TRUE)
command <- eval(call("substitute", command, sub), envir = baseenv())
transform <- as.call(c(quote(map), slice$data))
transform[[index]] <- as.numeric(seq_len(slice$slices))
for (arg in args) {
transform[[arg]] <- arglist[[arg]]
}
list(command = command, transform = transform)
}
#' @title Take a strategic subset of a dataset.
#' `r lifecycle::badge("stable")`
#' @description `drake_slice()` is similar to `split()`.
#' Both functions partition data into disjoint subsets,
#' but whereas `split()` returns *all* the subsets, `drake_slice()`
#' returns just *one*. In other words, `drake_slice(..., index = i)`
#' returns `split(...)[[i]]`.
#' Other features:
#' 1. `drake_slice()` works on vectors, data frames,
#' matrices, lists, and arbitrary arrays.
#' 2. Like `parallel::splitIndices()`, `drake_slice()` tries to
#' distribute the data uniformly across subsets.
#' See the examples to learn why splitting is useful in `drake`.
#' @export
#' @return A subset of `data`.
#' @param data A list, vector, data frame, matrix, or arbitrary array.
#' Anything with a `length()` or `dim()`.
#' @param slices Integer of length 1, number of slices (i.e. pieces)
#' of the whole dataset. Remember, `drake_slice(index = i)` returns
#' only slice number `i`.
#' @param index Integer of length 1, which piece of the partition to return.
#' @param margin Integer of length 1, margin over which to split the data.
#' For example, for a data frame or matrix,
#' use `margin = 1` to split over rows and `margin = 2`
#' to split over columns. Similar to `MARGIN` in `apply()`.
#' @param drop Logical, for matrices and arrays.
#' If `TRUE`,` the result is coerced to the lowest possible dimension.
#' See ?`[` for details.
#' @examples
#' # Simple usage
#' x <- matrix(seq_len(20), nrow = 5)
#' x
#' drake_slice(x, slices = 3, index = 1)
#' drake_slice(x, slices = 3, index = 2)
#' drake_slice(x, slices = 3, index = 3)
#' drake_slice(x, slices = 3, margin = 2, index = 1)
#' # In drake, you can split a large dataset over multiple targets.
#' \dontrun{
#' isolate_example("contain side effects", {
#' plan <- drake_plan(
#' large_data = mtcars,
#' data_split = target(
#' drake_slice(large_data, slices = 32, index = i),
#' transform = map(i = !!seq_len(32))
#' )
#' )
#' plan
#' cache <- storr::storr_environment()
#' make(plan, cache = cache, session_info = FALSE, verbose = FALSE)
#' readd(data_split_1L, cache = cache)
#' readd(data_split_2L, cache = cache)
#' })
#' }
drake_slice <- function(data, slices, index, margin = 1L, drop = FALSE) {
check_drake_slice_args(margin, slices, index)
args <- list(data)
dim <- dim(data) %|||% length(data)
margin <- ifelse(is.null(dim(data)), 1L, margin)
for (m in seq_along(dim)) {
if (m == margin) {
args[[m + 1]] <- slice_indices(dim[m], slices, index)
} else {
args[[m + 1]] <- substitute()
}
}
args$drop <- drop
do.call(`[`, args)
}
check_drake_slice_args <- function(slices, index, margin) {
sclr <- length(slices) == 1L && length(index) && length(margin) == 1L
if (sclr) {
return()
}
stop0(
"In drake_slice(), arguments margin, slices, ",
"and index must each have length 1."
)
}
slice_indices <- function(length, slices, index) {
if (length < 1L || slices < 1L || index < 1L || index > slices) {
return(integer(0))
}
inc <- as.integer(length / slices)
mod <- length %% slices
n <- inc + as.integer(index <= mod)
from <- 1L + inc * (index - 1L) + min(index - 1L, mod)
seq(from = from, length.out = n)
}
dsl_graph <- function(plan) {
edges <- lapply(seq_len(nrow(plan)), function(index) {
dsl_target_edges(plan$transform[[index]], plan$target[[index]])
})
edges <- do.call(rbind, edges)
if (!length(edges) || !nrow(edges)) {
# Not run but better to keep:
return(igraph::make_empty_graph()) # nocov
}
keep <- !vapply(
plan$transform,
safe_is_na,
FUN.VALUE = logical(1)
)
names(keep) <- plan$target
keep <- names(which(keep, useNames = TRUE))
edges <- trim_vs_protect_cons(edges, keep)
graph <- igraph::graph_from_data_frame(edges)
graph <- igraph::simplify(graph)
stopifnot(igraph::is_dag(graph))
transforms <- plan$transform
names(transforms) <- plan$target
transforms <- transforms[igraph::V(graph)$name]
igraph::set_vertex_attr(
graph,
name = "transform",
index = igraph::V(graph)$name,
value = transforms
)
}
trim_vs_protect_cons <- function(edges, keep) {
delete <- unique(setdiff(c(edges$from, edges$to), keep))
for (v in delete) {
edges <- delete_v_protect_con(edges, v)
}
edges
}
delete_v_protect_con <- function(edges, v) {
from <- edges$from[edges$to == v]
to <- edges$to[edges$from == v]
edges <- edges[edges$from != v & edges$to != v, ]
if (!length(from) || !length(to)) {
return(edges)
}
nbhd_edges <- expand.grid(from = from, to = to, stringsAsFactors = FALSE)
rbind(edges, nbhd_edges)
}
dsl_target_edges <- function(transform, target) {
if (safe_is_na(transform)) {
return(NULL)
}
from <- dsl_deps(transform)
to <- dsl_revdeps(transform)
edges <- data.frame(
from = target, to = target, stringsAsFactors = FALSE
)
if (length(from)) {
edges <- rbind(
edges,
data.frame(from = from, to = target, stringsAsFactors = FALSE)
)
}
if (length(to)) {
edges <- rbind(
edges,
data.frame(from = target, to = to, stringsAsFactors = FALSE)
)
}
edges
}
parse_transform <- function(transform, target) {
if (safe_is_na(transform)) {
return(NA)
}
text <- direct_deparse(
transform[[1]],
control = deparse_control_default,
backtick = TRUE
)
transform <- structure(
as.expression(transform),
class = unique(c(text, "transform", class(transform)))
)
assert_good_transform(transform)
transform <- structure(
transform,
id = dsl_id(transform),
.names = dsl_names(transform),
tag_in = tag_in(transform),
tag_out = tag_out(transform)
)
transform <- interpret_transform(transform)
structure(
transform,
deps = dsl_deps(transform),
revdeps = dsl_revdeps(transform)
)
}
assert_good_transform <- function(transform, target) UseMethod("assert_good_transform")
assert_good_transform.map <-
assert_good_transform.cross <-
assert_good_transform.combine <- function(transform, target) NULL
assert_good_transform.default <- function(transform, target) {
stop0(
"invalid transform: ", lang(transform),
". Expected: one of map(), cross(), or combine()."
)
}
interpret_transform <- function(transform) UseMethod("interpret_transform")
interpret_transform.map <- function(transform) {
structure(
transform,
new_groupings = new_groupings(transform)
)
}
interpret_transform.cross <- interpret_transform.map
interpret_transform.combine <- function(transform) {
structure(
transform,
combine = dsl_combine(transform),
by = dsl_by(transform)
)
}
transform_row <- function(index, plan, graph, max_expand) {
row <- plan[index,, drop = FALSE] # nolint
target <- row$target
transform <- set_old_groupings(plan$transform[[index]], plan)
new_cols <- c(
target,
tag_in(transform),
tag_out(transform),
group_names(transform)
)
check_group_names(new_cols, old_cols(plan))
out <- dsl_transform(transform, target, row, plan, graph, max_expand)
out[[target]] <- out$target
for (col in tag_in(transform)) {
out[[col]] <- target
}
for (col in tag_out(transform)) {
out[[col]] <- out$target
}
out
}
check_group_names <- function(groups, protect) {
groups <- intersect(groups, protect)
if (length(groups)) {
stop0(
"grouping variables cannot also be custom column names in the plan:\n",
multiline_message(groups)
)
}
}
dsl_transform <- function(
transform,
target,
row,
plan,
graph,
max_expand,
...
) {
UseMethod("dsl_transform")
}
dsl_transform.map <- dsl_transform.cross <- function(
transform,
target,
row,
plan,
graph,
max_expand,
...
) {
groupings <- groupings(transform)
grid <- dsl_grid(transform, groupings)
if (any(dim(grid) < 1L)) {
error_nonempty_transform(target)
}
grid <- dsl_map_join_plan(
grid = grid,
plan = plan,
graph = graph,
target = target,
transform = transform
)
dsl_map_new_targets(
transform = transform,
target = target,
row = row,
grid = grid,
plan = plan,
max_expand = max_expand
)
}
dsl_map_join_plan <- function(grid, plan, graph, target, transform) {
cols <- upstream_trace_vars(target, plan, graph)
gridlist <- lapply(grid, as.data.frame, stringsAsFactors = FALSE)
for (i in seq_len(ncol(grid))) {
colnames(gridlist[[i]]) <- colnames(grid)[i]
gridlist[[i]] <- dsl_left_outer_join(
gridlist[[i]],
plan[, cols, drop = FALSE]
)
}
grid <- do.call(cbind, unname(gridlist))
grid_cols <- c(names(groupings), setdiff(colnames(grid), names(groupings)))
grid[, grid_cols, drop = FALSE]
}
dsl_map_new_targets <- function(
transform,
target,
row,
grid,
plan,
max_expand
) {
sub_cols <- intersect(colnames(grid), group_names(transform))
new_target_names <- new_target_names(
target,
grid,
cols = sub_cols,
id = dsl_id(transform),
names = dsl_names(transform)
)
if (length(new_target_names) != nrow(grid)) {
stop0("target names must be the same length as the number of new targets.")
}
out <- data.frame(target = new_target_names, stringsAsFactors = FALSE)
grid$.id_chr <- sprintf("\"%s\"", new_target_names)
for (col in setdiff(old_cols(plan), c("target", "transform"))) {
if (is.language(row[[col]][[1]])) {
out[[col]] <- grid_subs(row[[col]][[1]], grid)
} else {
out[[col]] <- row[[col]][[1]]
}
}
grid$.id_chr <- NULL
out <- cbind(out, grid)
df_max_expand(out, max_expand)
}
group_names <- function(transform) {
as.character(names(groupings(transform)))
}
dsl_left_outer_join <- function(x, y) {
by <- intersect(colnames(x), colnames(y))
if (!length(by)) {
return(x)
}
# The output must have the same number of rows as x,
# and the ID variables must all be nonmissing.
rows_keep <- complete_cases(y[, by, drop = FALSE])
y <- y[rows_keep,, drop = FALSE] # nolint
# Drop the columns that only partially tag along.
# These grouping variables never truly belonged to the
# ID variables in x.
cols_keep <- !vapply(y, anyNA, FUN.VALUE = logical(1))
y <- y[, cols_keep, drop = FALSE]
# Just a precaution. We should actually be okay by now.
y <- y[!duplicated(y[, by, drop = FALSE]),, drop = FALSE] # nolint
# Need to recover the original row order
key <- basename(tempfile(fileext = "drake_transform_plan_col"))
x[[key]] <- seq_len(nrow(x))
out <- merge(x = x, y = y, by = by, all.x = TRUE, sort = FALSE)
out <- out[, !duplicated(colnames(out)), drop = FALSE]
is_na_col <- vlapply(out, function(x) {
all(is.na(x))
})
out <- out[, !is_na_col, drop = FALSE]
out <- out[order(out[[key]]),, drop = FALSE] # nolint
out[[key]] <- NULL
out
}
upstream_trace_vars <- function(target, plan, graph) {
targets <- igraph::subcomponent(graph, v = target, mode = "in")$name
targets <- setdiff(targets, target)
if (!length(targets)) {
return(character(0))
}
transforms <- igraph::vertex_attr(graph, "transform", index = targets)
revdeps <- unique(unlist(lapply(transforms, function(x) dsl_revdeps(x))))
out <- c(targets, revdeps)
intersect(out, colnames(plan))
}
df_max_expand <- function(df, max_expand) {
if (is.null(max_expand)) {
return(df)
}
rows <- seq_max_expand(nrow(df), max_expand)
df[rows,, drop = FALSE] # nolint
}
seq_max_expand <- function(n, max_expand) {
max_expand <- min(n, max_expand)
i <- seq(from = 1L, to = n, length.out = max_expand)
i <- floor(i)
unique(i)
}
dsl_transform.combine <- function(transform, target, row, plan, graph, ...) {
plan <- valid_splitting_plan(plan, transform)
if (!nrow(plan)) {
error_nonempty_transform(target)
}
out <- dsl_commands_combine(transform = transform, row = row, plan = plan)
if (!nrow(out)) {
error_nonempty_transform(target)
}
out$target <- new_target_names(
target,
out,
cols = dsl_by(transform),
id = dsl_id(transform),
names = dsl_names(transform)
)
out <- id_chr_sub(plan = out, cols = old_cols(plan), .id_chr = out$target)
out
}
dsl_commands_combine <- function(transform, row, plan) {
map_by(
.x = plan,
.by = dsl_by(transform),
.f = combine_step,
row = row,
transform = transform,
old_cols = old_cols(plan)
)
}
error_nonempty_transform <- function(target) {
stop0(
"A grouping variable for target ", target,
" is either undefined or improperly invoked. Details: ",
"https://books.ropensci.org/drake/static.html#grouping-variables"
)
}
valid_splitting_plan <- function(plan, transform) {
cols <- dsl_by(transform)
if (!length(cols)) {
return(plan)
}
rows_keep <- complete_cases(plan[, cols, drop = FALSE])
old_cols <- old_cols(plan)
out <- plan[rows_keep,, drop = FALSE] # nolint
old_cols(out) <- old_cols
out
}
map_by <- function(.x, .by, .f, ...) {
splits <- split_by(.x, .by = .by)
out <- lapply(
X = splits,
FUN = function(split) {
out <- .f(split, ...)
if (nrow(out)) {
out[, .by] <- split[replicate(nrow(out), 1), .by]
}
out
}
)
do.call(what = drake_bind_rows, args = out)
}
split_by <- function(.x, .by = character(0)) {
if (!length(.by)) {
return(list(.x))
}
fact <- lapply(.x[, .by, drop = FALSE], factor, exclude = c())
splits <- base::split(x = .x, f = fact)
Filter(x = splits, f = nrow)
}
id_chr_sub <- function(plan, cols, .id_chr) {
grid <- data.frame(.id_chr = .id_chr, stringsAsFactors = FALSE)
for (col in setdiff(cols, c("target", "transform"))) {
if (is.language(plan[[col]][[1]])) {
plan[[col]] <- lapply(
seq_len(nrow(plan)),
function(index) {
grid_sub(index, expr = plan[[col]][[index]], grid = grid)
}
)
}
}
plan
}
grid_subs <- function(expr, grid) {
keep <- intersect(all.vars(expr, functions = TRUE), colnames(grid))
grid <- grid[, keep, drop = FALSE]
for (i in seq_along(grid)) {
grid[[i]] <- dsl_syms(grid[[i]])
}
lapply(
seq_len(nrow(grid)),
grid_sub,
expr = expr,
grid = grid
)
}
grid_sub <- function(index, expr, grid) {
sub <- lapply(grid, `[[`, index)
eval(call("substitute", expr, sub), envir = baseenv())
}
dsl_syms <- function(x) {
out <- lapply(as.character(x), dsl_sym)
}
dsl_sym <- function(x) {
tryCatch(parse(text = x)[[1]], error = function(e) as.symbol(x))
}
new_target_names <- function(target, grid, cols, id, names) {
if (!is.null(names)) {
return(names)
}
if (is.character(id)) {
cols <- intersect(id, colnames(grid))
}
grid <- grid[, cols, drop = FALSE]
if (identical(id, FALSE) || any(dim(grid) < 1L)) {
out <- rep(target, nrow(grid))
return(make_unique(make.names(out, unique = FALSE, allow_ = TRUE)))
}
suffixes <- apply(grid, 1, paste, collapse = "_")
suffixes <- gsub("\"", "", suffixes, fixed = TRUE)
out <- paste0(target, "_", suffixes)
make_unique(make.names(out, unique = FALSE, allow_ = TRUE))
}
make_unique <- function(x) {
if (!length(x)) {
return(character(0))
}
ord <- order(x)
y <- x[ord]
dup <- duplicated(y)
if (!any(dup)) {
return(x)
}
suffix <- as.integer(
do.call(c, tapply(dup, y, FUN = cumsum, simplify = FALSE))
)
i <- suffix > 0L
suffix <- suffix + i
y[i] <- paste(y[i], suffix[i], sep = "_")
y[order(ord)]
}
combine_step <- function(plan, row, transform, old_cols) {
args <- args_combine(plan, transform)
any_empty <- any(!vapply(args, length, FUN.VALUE = integer(1)))
if (!length(args) || any_empty) {
return(data.frame(stringsAsFactors = FALSE))
}
out <- data.frame(command = NA, stringsAsFactors = FALSE)
for (col in setdiff(old_cols, c("target", "transform"))) {
if (is.language(row[[col]][[1]])) {
out[[col]] <- list(splice_args(row[[col]][[1]], args))
} else {
out[[col]] <- row[[col]]
}
}
groupings <- dsl_combine_join_plan(plan, transform, old_cols)
if (nrow(groupings) == 1L) {
out <- cbind(out, groupings)
}
out
}
old_cols <- function(plan) {
attr(plan, "old_cols")
}
`old_cols<-` <- function(plan, value) {
attr(plan, "old_cols") <- value
plan
}
args_combine <- function(plan, transform) {
out <- lapply(
dsl_combine(transform),
args_combine_entry,
plan = plan,
transform = transform
)
names(out) <- dsl_combine(transform)
out
}
args_combine_entry <- function(name, transform, plan) {
lapply(as.character(na_omit(unique(plan[[name]]))), dsl_sym)
}
splice_args <- function(x, replacements) {
out <- splice_inner(x, replacements)
# Avoid edge cases like #715
out <- parse(text = safe_deparse(out, backtick = TRUE))
if (length(out)) {
out <- out[[1]]
}
out
}
# From https://stackoverflow.com/a/54623901/3704549
splice_inner <- function(x, replacements) {
if (is.call(x)) {
splice_call(x, replacements)
} else if (is.name(x)) {
splice_name(x, replacements)
} else {
list(x)
}
}
splice_call <- function(x, replacements) {
args <- lapply(as.list(x), splice_inner, replacements = replacements)
use_names <- args$use.names
recursive <- args$recursive
args <- do.call("c", args, quote = TRUE)
args$use.names <- unlist(use_names)
args$recursive <- unlist(recursive)
as.call(args)
}
splice_name <- function(x, replacements) {
nm <- direct_deparse(
x,
control = deparse_control_default,
backtick = FALSE
)
if (nm %in% names(replacements)) {
replacements[[nm]]
} else {
list(x)
}
}
dsl_combine_join_plan <- function(plan, transform, old_cols) {
combined_plan <- plan[, dsl_combine(transform), drop = FALSE]
out <- plan[complete_cases(combined_plan),, drop = FALSE] # nolint
drop <- c(old_cols, dsl_combine(transform), dsl_by(transform))
keep <- setdiff(colnames(out), drop)
out <- out[, keep, drop = FALSE]
keep <- !vapply(out, anyNA, FUN.VALUE = logical(1))
out <- out[, keep, drop = FALSE]
keep <- viapply(out, function(x) {
length(unique(x))
}) == 1L
utils::head(out[, keep, drop = FALSE], n = 1)
}
dsl_deps <- function(transform) UseMethod("dsl_deps")
dsl_deps.map <- function(transform) {
attr(transform, "deps") %|||% c(
as.character(unnamed(as.list(transform[[1]][-1]))),
unname(unlist(new_groupings(transform)))
)
}
dsl_deps.cross <- dsl_deps.map
dsl_deps.combine <- function(transform) {
attr(transform, "deps") %|||% c(
dsl_combine(transform),
dsl_by(transform)
)
}
dsl_revdeps <- function(transform) UseMethod("dsl_revdeps")
dsl_revdeps.map <- function(transform) {
attr(transform, "revdeps") %|||% c(
names(new_groupings(transform)),
tag_in(transform),
tag_out(transform)
)
}
dsl_revdeps.cross <- dsl_revdeps.map
dsl_revdeps.combine <- function(transform) {
attr(transform, "revdeps") %|||% c(
tag_in(transform),
tag_out(transform)
)
}
dsl_grid <- function(transform, groupings) UseMethod("dsl_grid")
dsl_grid.map <- function(transform, groupings) {
tryCatch(
as.data.frame(groupings, stringsAsFactors = FALSE),
error = function(e) {
map_grid_error(transform, groupings)
}
)
}
map_grid_error <- function(transform, groupings) {
stop0(
"Failed to make a grid of grouping variables for map().\n",
"Grouping variables in map() must have suitable lengths ",
"for coercion to a data frame.\n",
"Possibly uneven groupings detected in ", char(transform), ":\n",
multiline_message(groupings)
)
}
dsl_grid.cross <- function(transform, groupings) {
do.call(expand.grid, c(groupings, stringsAsFactors = FALSE))
}
groupings <- function(transform) {
UseMethod("groupings")
}
groupings.map <- groupings.cross <- function(transform) {
c(new_groupings(transform), old_groupings(transform))
}
groupings.combine <- function(transform) character(0)
old_groupings <- function(transform, plan = NULL) UseMethod("old_groupings")
old_groupings.map <- old_groupings.cross <- function(transform, plan = NULL) {
attr(transform, "old_groupings") %|||%
find_old_groupings(transform, plan)
}
set_old_groupings <- function(transform, plan) {
attr(transform, "old_groupings") <- find_old_groupings(transform, plan)
transform
}
find_old_groupings <- function(transform, plan) UseMethod("find_old_groupings")
find_old_groupings.map <- function(transform, plan) {
group_names <- as.character(unnamed(lang(transform))[-1])
group_names <- intersect(group_names, names(plan))
subplan <- plan[, group_names, drop = FALSE]
keep <- apply(subplan, 1, function(x) {
!all(is.na(x))
})
subplan <- subplan[keep,, drop = FALSE] # nolint
if (any(dim(subplan) < 1L)) {
return(list())
}
# Look for blocks of nested grouping variables.
blocks <- column_components(subplan)
blocks <- lapply(blocks, function(x) {
as.list(x[complete_cases(x),, drop = FALSE]) # nolint
})
names(blocks) <- NULL
out <- do.call(c, blocks)
out <- select_nonempty(lapply(out, na_omit))
min_length <- min(vapply(out, length, FUN.VALUE = integer(1)))
out <- as.data.frame(
lapply(out, head, n = min_length),
stringsAsFactors = FALSE
)
as.list(out[!duplicated(out),, drop = FALSE]) # nolint
}
column_components <- function(x) {
adj <- crossprod(!is.na(x)) > 0L
graph <- igraph::graph_from_adjacency_matrix(adj)
membership <- sort(igraph::components(graph)$membership)
tapply(
X = names(membership),
INDEX = membership,
function(cols) {
x[, cols, drop = FALSE]
},
simplify = FALSE
)
}
find_old_groupings.cross <- function(transform, plan) {
group_names <- as.character(unnamed(lang(transform))[-1])
group_names <- intersect(group_names, names(plan))
lapply(plan[, group_names, drop = FALSE], function(x) {
unique(na_omit(x))
})
}
na_omit <- function(x) {
x[!is.na(x)]
}
find_old_groupings.combine <- function(transform, plan) NULL
new_groupings <- function(transform) UseMethod("new_groupings")
new_groupings.map <- function(transform) {
attr <- attr(transform, "new_groupings")
if (!is.null(attr)) {
return(attr)
}
transform <- lang(transform)
explicit <- explicit_new_groupings(
transform,
exclude = c(".data", dsl_all_special)
)
data_arg <- data_arg_groupings(transform[[".data"]])
c(explicit, data_arg)
}
data_arg_groupings <- function(data_arg) {
if (is.null(data_arg)) {
return(list())
}
lapply(data_arg, function(x) {
x <- factor_to_character(x)
vapply(x, long_deparse, FUN.VALUE = character(1))
})
}
dsl_all_special <- c(".id", ".names", ".tag_in", ".tag_out")
new_groupings.cross <- new_groupings.map
explicit_new_groupings <- function(code, exclude = character(0)) {
list <- named(as.list(code), exclude)
lapply(list, function(x) {
if (is.call(x)) {
x <- x[-1]
}
as.character(lapply(as.list(x), long_deparse))
})
}
long_deparse <- function(x, collapse = "\n") {
out <- direct_deparse(x, control = deparse_control_default, backtick = TRUE)
if (length(out) > 1L) {
out <- paste(out, collapse = collapse)
}
out
}
dsl_combine <- function(transform) UseMethod("dsl_combine")
dsl_combine.combine <- function(transform) {
attr(transform, "combine") %|||%
as.character(unnamed(lang(transform))[-1])
}
dsl_by <- function(transform) UseMethod("dsl_by")
dsl_by.combine <- function(transform) {
attr(transform, "by") %|||%
all.vars(lang(transform)[[".by"]], functions = FALSE)
}
dsl_id <- function(transform) UseMethod("dsl_id")
dsl_id.transform <- function(transform) {
if (!is.null(attr(transform, "id"))) {
return(attr(transform, "id"))
}
out <- lang(transform)[[".id"]]
if (all(is.logical(out))) {
return(out)
}
all.vars(out, functions = FALSE) %||% TRUE
}
dsl_names <- function(transform) UseMethod("dsl_names")
dsl_names.transform <- function(transform) {
if (!is.null(attr(transform, ".names"))) {
return(attr(transform, ".names"))
}
eval(lang(transform)[[".names"]])
}
tag_in <- function(transform) UseMethod("tag_in")
tag_in.transform <- function(transform) {
attr(transform, "tag_in") %|||%
all.vars(lang(transform)[[".tag_in"]], functions = FALSE)
}
tag_out <- function(transform) UseMethod("tag_out")
tag_out.transform <- function(transform) {
attr(transform, "tag_out") %|||%
all.vars(lang(transform)[[".tag_out"]], functions = FALSE)
}
lang <- function(x) UseMethod("lang")
lang.command <- lang.transform <- function(x) x[[1]]
char <- function(x) UseMethod("char")
char.transform <- function(x) safe_deparse(lang(x), backtick = TRUE)
named <- function(x, exclude = character(0)) {
if (is.null(names(x))) return(NULL)
x[!(names(x) %in% c("", exclude))]
}
unnamed <- function(x) {
if (is.null(names(x))) return(x)
x[!nzchar(names(x))]
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.