pip_add: Add a step

View source: R/pipeline.R

pip_addR Documentation

Add a step

Description

Adds a named step to the pipeline. Each step is a function whose parameters either hold constant defaults or reference the output of a prior step using formula notation (~step_name). Dependencies are validated when the step is added.

Usage

pip_add(x, step, fun, tags = character(0), after = length(x), exec = "auto")

Arguments

x

A pipeflow pipeline object.

step

Unique step name.

fun

Function to execute for the step. Each function parameter must have a default value. Default values that are simple constants are resolved immediately. Default values that are formulas like ~other_step are treated as dependencies to those steps and resolved to the respective output values at runtime once the step is executed.

tags

Optional character vector of tags belonging to the step. Can also be adjusted later using ⁠[pip_tag()]⁠.

after

Optional position after which the new step should be inserted (defaults to last position). Can be a step name or an integer index. If set to 0, the new step will be inserted at the beginning of the pipeline.

exec

Execution mode for this step. One of "auto", "split", "reduce" or "plain". Using execution mode exec = split, the output of the step is marked as partitioned output. In this mode, any step that depends on the split step (directly or indirectly) will have its output automatically mapped partition-wise during step execution. The reduce mode expects partitioned input and passes it through without mapping, while plain mode only accepts non-partitioned input and always intends to execute a single call. In summary:

  • auto: map if partitioned input appears, otherwise single call

  • split: single call, then mark output as partitioned

  • reduce: single call, but only valid with partitioned input

  • plain: single call, only valid with non-partitioned input

Details

If after was specified, the new step will be inserted after the given step or position. Be aware that in contrast to adding a step at the end, inserting a step in the middle is a rather expensive operation as it requires re-wiring parts of the internal pipeline structure, especially if the new step is inserted at an early position.

Value

The updated pipeline, invisibly.

Examples

# --- Tags, and view filtering ---
p <- pip_new("analysis") |>
  pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |>
  pip_add("clean", \(x = ~load) x * 2, tags = c("io", "process")) |>
  pip_add("fit", \(x = ~clean) sum(x), tags = c("model", "core", "daily")) |>
  pip_add("report", \(x = ~fit) paste("result:", x), tags = "report")

pip_run(p)
p

# Filter by tag using pip_view — keeps steps with any matching tag
pip_view(p, tags = "daily")
pip_view(p, tags = "core")
pip_view(p, tags = c("raw", "report"))

# --- Split / reduce execution modes ---
q <- pip_new("split-demo") |>
  pip_add("data", \(x = iris) x) |>
  pip_add("split", \(x = ~data) split(x, x$Species),
    exec = "split"
  ) |>
  pip_add("stats", \(x = ~split) summary(x)) |>
  pip_add("combine", \(x = ~stats) do.call(rbind, x),
    exec = "reduce"
  )

pip_run(q)
q[["stats", "out"]]   # partitioned list — one summary per species
q[["combine", "out"]] # combined table

pipeflow documentation built on June 15, 2026, 9:10 a.m.