Get started with pipeflow"

require(pipeflow)

knitr::opts_chunk$set(
  comment = "#",
  prompt = FALSE,
  tidy = FALSE,
  cache = FALSE,
  collapse = TRUE
)

old <- options(width = 100L)

A simple example to get started

In this example, we'll use base R's airquality dataset.

head(airquality)

Our goal is to create an analysis pipeline that performs the following steps:

In the following, we'll show how to define and run the pipeline, how to inspect the output of specifics steps, and finally how to re-run the pipeline with different parameter settings, which is one of the selling points of using such a pipeline.

Define the pipeline

For easier understanding, we go step by step. First, we create a new pipeline with the name "my-pipeline" and the airquality dataset as input data.

library(pipeflow)
library(ggplot2)

pip <- Pipeline$new("my-pipeline", data = airquality)

Let's view the pipeline in it's table form after initialization.

pip

Here, each step is represented by one row in the table where the step column lists the name of the step and the state column shows the current state of the step. For now, don't worry about the other columns - they are explained later.

As you can see, the data is always the first step in the pipeline and initially it is in state New, which means the pipeline has not been run yet.

Next, we add a step called data_prep, which consists of a function that takes the output of the data step as its first argument, adds a new column and returns the modified data as its output. To refer to the output of an earlier pipeline step, we just write the name of the step preceded with the tilde (~) operator.

pip$add(
    "data_prep",
    function(data = ~data) {
        replace(data, "Temp.Celsius", (data[, "Temp"] - 32) * 5/9)
    }
)

Again, let's inspect the pipeline.

pip

A second step called data_prep was added and it depends on the data step as can be seen in column depends, which will list all dependencies of a step.

Next, we add a step called model_fit, which fits a linear model to the data. The function takes the output of the data_prep and defines a parameter xVar, which is used to specify the variable that is used as predictor in the linear model.

pip$add(
    "model_fit",
    function(
        data = ~data_prep,
        xVar = "Temp.Celsius"
    ) {
        lm(paste("Ozone ~", xVar), data = data)
    }
)

Lastly, we add a step called model_plot, which plots the data and the linear model fit. The function uses the output from both the model_fit and the data_prep step. It also defines the xVar parameter and a parameter title, which is used as the title of the plot.

pip$add(
    "model_plot",
    function(
        model = ~model_fit,
        data = ~data_prep,
        xVar = "Temp.Celsius",
        title = "Linear model fit"
    ) {
        coeffs <- coefficients(model)
        ggplot(data) +
            geom_point(aes(.data[[xVar]], .data[["Ozone"]])) +
            geom_abline(intercept = coeffs[1], slope = coeffs[2]) +
            labs(title = title)
    }
)

This completes our pipeline, which now looks as follows:

pip

In the last line, we see that the model_plot step depends on both the model_fit and data_prep step. To visualize a pipeline, we can use the get_graph() method, which returns a list of arguments that can be passed to the visNetwork function from the visNetwork package.

library(visNetwork)
do.call(visNetwork, args = pip$get_graph()) |>
    visHierarchicalLayout(direction = "LR")
library(visNetwork)
do.call(visNetwork, args = c(pip$get_graph(), list(height = 100))) |>
    visHierarchicalLayout(direction = "LR")

Pipeline integrity

A key feature of pipeflow is that the integrity of a pipeline is verified at definition time. To see this, let's try to add another step that is referring to a step that does not exist:

pip$add(
    "another_step",
    function(data = ~foo) {
        data
    }
)

pipeflow immediately signals an error and the pipeline remains unchanged:

pip

Run the pipeline and inspect the output

To run the pipeline, we simply call the run() method on the pipeline, which will produce the following output:

pip$run()

Let's inspect the pipeline again.

pip
library(visNetwork)
do.call(visNetwork, args = c(pip$get_graph(), list(height = 100))) |>
    visHierarchicalLayout(direction = "LR")

We can see that the state of all steps have been changed from New to Done, which graphically is represented by the color change from blue to green.

In addition, the output was added in the out column. We can inspect the output of a specific step by calling the get_out() method on the pipeline.

pip$get_out("model_fit")
pip$get_out("model_plot")

Change parameters of the pipeline

Any parameter of the pipeline that is not bound to a specific step (i.e. is not defined with the tilde (~) operator) can be changed by calling the set_params() method on the pipeline. Let's first list all unbound parameters:

pip$get_params()

Now let's assume we want to change the xVar in the entire pipeline to "Solar.R". We can do this simply as follows:

pip$set_params(list(xVar = "Solar.R"))
pip$get_params()

Changing the parameters of the pipeline at least has an effect on all steps that depend on the changed parameter.

pip
library(visNetwork)
do.call(visNetwork, args = c(pip$get_graph(), list(height = 100))) |>
    visHierarchicalLayout(direction = "LR")

We can see that the model_fit and model_plot steps are now in state Outdated (graphically indicated by the orange color), which means that they need to be rerun. We can do this by calling the run() method on the pipeline again.

pip$run()

The outdated steps were re-run as expected and the output was updated accordingly.

pip$get_out("model_plot")

To just change the title of the plot, obviously only the model_plot step needs to be rerun.

pip$set_params(list(title = "Some new title"))
pip
pip$run()$get_out("model_plot")

If instead we were to change the input data, since all steps depend on it, we expect all steps to be rerun.

pip$set_data(airquality[1:10, ])
pip
pip$run()$get_out("model_plot")

Try setting non-existing parameters

Last but not least let's see what happens if we are trying to set parameters that don't exist in the pipeline, which mostly happens due to accidental misspells.

pip$set_params(list(titel = "misspelled variable name", foo = "my foo"))

As you see, a warning is given to the user hinting at the respective parameter names, which makes fixing any misspells straight-forward.

Next, let's see how to modify the pipeline.

options(old)


Try the pipeflow package in your browser

Any scripts or data that you put into this service are public.

pipeflow documentation built on April 3, 2025, 10:50 p.m.