Collecting output"

require(pipeflow)

knitr::opts_chunk$set(
  comment = "#",
  prompt = FALSE,
  tidy = FALSE,
  cache = FALSE,
  collapse = TRUE
)

old <- options(width = 100L)
library(ggplot2)

Generally, one should keep pipeline steps as simple as possible, basically following the principle "one step, one task". This means that usually a lot of pipeline steps are used to calculate intermediate results and only a few steps contain the final results that we are interested in. This vignette shows how to conveniently collect and possibly group the output of those final steps.

Flag output steps

Output steps are flagged by settting the keepOut argument to TRUE when adding a step to the pipeline. In the following example, we will want to keep the output of the steps data_summary, model_summary, and model_plot.

library(pipeflow)
library(ggplot2)

pip <- pipe_new(
        "my-pipeline",
        data = airquality
    ) |>

    pipe_add(
        "data_prep",
        function(data = ~data) {
            replace(data, "Temp.Celsius", (data[, "Temp"] - 32) * 5/9)
        }
    ) |>

    pipe_add(
        "data_summary",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            data[, c(xVar, yVar)]
        },
        keepOut = TRUE              # <- keep this
    ) |>

    pipe_add(
        "model_fit",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "model_summary",
        function(
            fit = ~model_fit
        ) {
            summary(fit)
        },
        keepOut = TRUE              # <- keep this
    ) |>

    pipe_add(
        "model_plot",
        function(
            model = ~model_fit,
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone",
            title = "Linear model fit"
        ) {
            coeffs <- coefficients(model)
            ggplot(data) +
                geom_point(aes(.data[[xVar]], .data[["Ozone"]])) +
                geom_abline(intercept = coeffs[1], slope = coeffs[2]) +
                labs(title = title)
        },
        keepOut = TRUE              # <- keep this
    )

Looking at the pipeline, we see that the steps data_summary, model-summary, and model_plot have been flagged accordingly (see column keepOut).

pip

Graphically, steps flagged with keepOut = TRUE are displayed with a circle shape while "normal" steps are shown as rectangle boxes.

library(visNetwork)
do.call(visNetwork, args = c(pip$get_graph(), list(height = 300))) |>
    visHierarchicalLayout(direction = "LR", sortMethod = "directed")

Now let's run and collect the output of the flagged steps using the collect_out method, which returns a list with the output of the flagged steps.

pip$run()

out <- pip$collect_out()

names(out)

As expected, the output list contains the output of the flagged steps.

str(out, max.level = 1)

Grouping output steps

Often certain output steps are related and should be grouped together. This can be achieved conveniently by setting the group argument when adding a step to the pipeline. Let's illustrate this by slightly modifying the previous example.

pip <- Pipeline$new("my-pipeline", data = airquality) |>

    pipe_add(
        "data_prep",
        function(data = ~data) {
            replace(data, "Temp.Celsius", (data[, "Temp"] - 32) * 5/9)
        }
    ) |>

    pipe_add(
        "used_data",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            data[, c(xVar, yVar)]
        },
        keepOut = TRUE,
        group = "Data"                 # <- define 'Data' group here
    ) |>

    pipe_add(
        "model_fit",
        function(
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "model_summary",
        function(
            fit = ~model_fit
        ) {
            summary(fit)
        },
        keepOut = TRUE,
        group = "Model"                # <- define 'Model' group here
    ) |>

    pipe_add(
        "model_plot",
        function(
            model = ~model_fit,
            data = ~data_prep,
            xVar = "Temp.Celsius",
            yVar = "Ozone",
            title = "Linear model fit"
        ) {
            coeffs <- coefficients(model)
            ggplot(data) +
                geom_point(aes(.data[[xVar]], .data[["Ozone"]])) +
                geom_abline(intercept = coeffs[1], slope = coeffs[2]) +
                labs(title = title)
        },
        keepOut = TRUE,
        group = "Model"                # <- define 'Model' group here
    )

Looking at the pipeline, the defined groups are shown in the group column.

pip

As you see, by default, the group is identical to the step name, that is, each step represents the trivial case of a one-sized group. Again, we run the pipeline and collect the output.

pip$run()

out <- pip$collect_out()

names(out)

As we can see, the output related to the modelling has been grouped into one sublist named Model.

str(out, max.level = 2)
options(old)


Try the pipeflow package in your browser

Any scripts or data that you put into this service are public.

pipeflow documentation built on April 3, 2025, 10:50 p.m.