Advanced: How to use pipeflow with split data sets"

require(pipeflow)

knitr::opts_chunk$set(
  comment = "#",
  prompt = FALSE,
  tidy = FALSE,
  cache = FALSE,
  collapse = TRUE
)

old <- options(width = 100L)

Motivation

A common scenario is to split a data set into subsets and then apply the same analysis to each part. In context of pipelines, this means that we would like to apply the same pipeline multiple times to each data subset. In additon, we may then want to combine parts of the individual output. As we will see, pipeflow provides a built-in function to handle this scenario.

Define pipeline

Let's first define our pipeline, which, to keep matters simple, just fits a linear model and outputs the model coefficients.

library(pipeflow)

pip <- pipe_new(
        "my-pipeline"
    ) |>

    pipe_add(
        "fit",
        function(
            data = ~data,
            xVar = "x",
            yVar = "y"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "coefs",
        function(
            fit = ~fit
        ) {
            coefficients(fit)
        },
        keepOut = TRUE
    )

So our pipeline looks like this:

pip

Or graphically:

library(visNetwork)
do.call(visNetwork, args = c(pip$get_graph(), list(height = 100))) |>
    visHierarchicalLayout(direction = "LR")

We use the iris data set as our working example.

head(iris)

First, we apply the pipeline to the whole data set.

pip$set_data(iris)
pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width"))
pip$run()
pip$collect_out()

Split data set

Next, we want to apply the pipeline to each species separately. One way to do this would be to use R's split function. We can split it by the Species column and then run the pipeline for each subset. For example:

run_pipeline <- function(data) {
    pip$set_data(data)
    pip$run()
    pip$collect_out()
}

results <- lapply(split(iris, iris$Species), FUN = run_pipeline)
results

Unfortunately, with this approach we had to create additional code that had to be run outside the pipeline framework. Let's now see how to handle this scenario within the pipeline framework.

As a reminder, our pipeline looks like this:

pip

To deal with split data sets, we use the built-in function set_data_split.

splitData <- split(iris, iris$Species)
pip$set_data_split(splitData)

This function actually transforms the pipeline:

pip

As we can see, the pipeline now has replicated itself for each data subset.

do.call(visNetwork, args = pip$get_graph()) |>
    visHierarchicalLayout(direction = "LR", sortMethod = "directed")

Note that set_data_split accepts any list of data frames, not just the output of split. Now let's run this pipeline.

pip$run()
pip$collect_out()

As we can see, the output is now the same as before but was obtained without the need to write additional code outside the pipeline framework.

Finally, as a side note there is another built-in function named split, which can be used to split the pipeline into its independent parts. While this works for any pipeline, in our working example, it naturally will split the pipeline into the parts defined by the data split.

pip$split()

This function is especially useful if you want separate parts of the pipeline code in order to run them in parallel.

Combine output

In some cases, we may want to (re-)combine the output of the pipeline parts. For example, we may want to combine the coefficients of the linear models into one table.

Let's for this matter extend our pipeline by one step at the end.

pip <- pipe_new(
        "my-pipeline"
    ) |>

    pipe_add(
        "fit",
        function(
            data = ~data,
            xVar = "x",
            yVar = "y"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>

    pipe_add(
        "coefs",
        function(
            fit = ~fit
        ) {
            coefficients(fit)
        }
    )  |>

    pipe_add(
        "combine_coefs",
        function(
            coefs = ~coefs
        ) {
            coefs |> do.call(rbind, args = _) |> as.data.frame()
        },
        keepOut = TRUE
    )

Here is how the pipeline looks for now.

pip

Next we again want to apply the set_data_split function, but we basically need to make sure that the pipeline is split up except for the last step that combines everything. We achieve this by using the toStep parameter, which basically tells the pipeline to split up to a certain step.

pip$set_data_split(split(iris, iris$Species), toStep = "coefs")
pip

We see that the last step is not replicated for each data subset and it now contains a list of dependencies, namely:

pip$get_depends()[["combine_coefs"]]

Graphically it becomes even more clear:

do.call(visNetwork, args = pip$get_graph()) |>
    visHierarchicalLayout(direction = "LR", sortMethod = "directed")

Finally, let's see how this plays out when we run the pipeline.

pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width"))
pip$run()
pip$collect_out()
options(old)


Try the pipeflow package in your browser

Any scripts or data that you put into this service are public.

pipeflow documentation built on April 3, 2025, 10:50 p.m.