require(pipeflow) knitr::opts_chunk$set( comment = "#", prompt = FALSE, tidy = FALSE, cache = FALSE, collapse = TRUE ) old <- options(width = 100L)
A common scenario is to split a data set into subsets and then apply the same
analysis to each part. In context of pipelines, this means that we would like
to apply the same pipeline multiple times to each data subset. In additon,
we may then want to combine parts of the individual output. As we will see,
pipeflow
provides a built-in function to handle this scenario.
Let's first define our pipeline, which, to keep matters simple, just fits a linear model and outputs the model coefficients.
library(pipeflow) pip <- pipe_new( "my-pipeline" ) |> pipe_add( "fit", function( data = ~data, xVar = "x", yVar = "y" ) { lm(paste(yVar, "~", xVar), data = data) } ) |> pipe_add( "coefs", function( fit = ~fit ) { coefficients(fit) }, keepOut = TRUE )
So our pipeline looks like this:
pip
Or graphically:
library(visNetwork) do.call(visNetwork, args = c(pip$get_graph(), list(height = 100))) |> visHierarchicalLayout(direction = "LR")
We use the iris
data set as our working example.
head(iris)
First, we apply the pipeline to the whole data set.
pip$set_data(iris) pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width")) pip$run()
pip$collect_out()
Next, we want to apply the pipeline to each species separately.
One way to do this would be to use R's split
function.
We can split it by the Species
column and then run the pipeline
for each subset. For example:
run_pipeline <- function(data) { pip$set_data(data) pip$run() pip$collect_out() } results <- lapply(split(iris, iris$Species), FUN = run_pipeline)
results
Unfortunately, with this approach we had to create additional code that had to be run outside the pipeline framework. Let's now see how to handle this scenario within the pipeline framework.
As a reminder, our pipeline looks like this:
pip
To deal with split data sets, we use the built-in function set_data_split
.
splitData <- split(iris, iris$Species) pip$set_data_split(splitData)
This function actually transforms the pipeline:
pip
As we can see, the pipeline now has replicated itself for each data subset.
do.call(visNetwork, args = pip$get_graph()) |> visHierarchicalLayout(direction = "LR", sortMethod = "directed")
Note that set_data_split
accepts any list of data frames, not just the output of split
.
Now let's run this pipeline.
pip$run()
pip$collect_out()
As we can see, the output is now the same as before but was obtained without the need to write additional code outside the pipeline framework.
Finally, as a side note there is another built-in function named split
, which can be
used to split the pipeline into its independent parts. While this works for any pipeline,
in our working example, it naturally will split the pipeline into the parts
defined by the data split.
pip$split()
This function is especially useful if you want separate parts of the pipeline code in order to run them in parallel.
In some cases, we may want to (re-)combine the output of the pipeline parts. For example, we may want to combine the coefficients of the linear models into one table.
Let's for this matter extend our pipeline by one step at the end.
pip <- pipe_new( "my-pipeline" ) |> pipe_add( "fit", function( data = ~data, xVar = "x", yVar = "y" ) { lm(paste(yVar, "~", xVar), data = data) } ) |> pipe_add( "coefs", function( fit = ~fit ) { coefficients(fit) } ) |> pipe_add( "combine_coefs", function( coefs = ~coefs ) { coefs |> do.call(rbind, args = _) |> as.data.frame() }, keepOut = TRUE )
Here is how the pipeline looks for now.
pip
Next we again want to apply the set_data_split
function, but we basically need to
make sure that the pipeline is split up except for the last step that combines
everything. We achieve this by using the toStep
parameter, which basically tells
the pipeline to split up to a certain step.
pip$set_data_split(split(iris, iris$Species), toStep = "coefs") pip
We see that the last step is not replicated for each data subset and it now contains a list of dependencies, namely:
pip$get_depends()[["combine_coefs"]]
Graphically it becomes even more clear:
do.call(visNetwork, args = pip$get_graph()) |> visHierarchicalLayout(direction = "LR", sortMethod = "directed")
Finally, let's see how this plays out when we run the pipeline.
pip$set_params(list(xVar = "Sepal.Length", yVar = "Sepal.Width")) pip$run()
pip$collect_out()
options(old)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.