Analysis pipelines - Core functionality and working with R data frames and functions

An overview of the package

In a typical data science workflow there are multiple steps involved; from data aggregation, cleaning, exploratory analysis, modeling and so on. As the data science community matures, we are seeing that there are a variety of languages which provide better capabilities for specific steps in the data science workflow. R is typically used for data transformations, statistical models, and visualizations, while Python provides more robust functions for machine learning. In addition to this, Spark provides an environment to process high volume data - both as one-time/ batches or as streams.

The job of today's data scientist is changing from one where they are married to a specific tool or language, to one where they are using all these tools for their specialized purposes. The key problem then becomes one of translation between these tools for seamless analysis. Additionally, in the work of a data scientist, there is a need to perform the same task repeatedly, as well as put certain analysis flows (or) pipelines into production to work on new data periodically, or work on streaming data.

Recently, interfaces for using these various tools have been published. In terms of R packages, the reticulate package provides an interface to Python, and the SparkR and sparklyr packages provide an interface to Spark.

The analysisPipelines package uses these interfaces to enable Interoperable Pipelines i.e. the ability compose and execute a reusable data science pipeline which can contain functions to be executed in an R environment, in a Python environment or in a Spark environment. These pipelines can be saved and loaded, to enable batch operation as datasets get updated with new data.

The goal of the analysisPipelines package is to make the job of the data scientist easier and help them compose pipelines of analysis which consist of data manipulation, exploratory analysis & reporting, as well as modeling steps. The idea is for data scientists to use tools of their choice through an R interface, using this package Essentially, it allows data scientists to:

Types of pipelines

This package supports for both batch/ repeated pipelines, as well as streaming pipelines.

For batch pipelines, the vision is to enable interoperable pipelines which execute efficiently with functions in R, Spark and Python

For streaming pipelines, the package allows for streaming analyses through Apache Spark Structured Streaming.

Classes and implementation

The analysisPipelines package uses S4 classes and methods to implement all the core functionality. The fundamental class exposed in this package is the BaseAnalysisPipeline class on which most of the core functions are implemented. The user, however, interacts with the AnalysisPipeline and StreamingAnalysisPipeline classes for batch and streaming analysis respectively. In this vignette, we work with the AnalysisPipeline class, with functions solely in R.

Pipelining semantics

The package stays true to the tidyverse pipelining style which also fits nicely into the idea of creating pipelines. The core mechanism in the package is too instantiate a pipeline with data and then pipeline required functions to the object itself.

The package allows both the use of magrittr pipe (%>%) or the pipeR pipe (%>>%).

Supported engines

As of this version, the package supports functions executed on R, or Spark through the SparkR interface for batch pipelines. It also supports Spark Structured Streaming pipelines for streaming analyses. In subsequent releases, Python will also be supported

Available vignettes

This package contains 5 vignettes:

Usage

Loading the package

library(analysisPipelines)
knitr::opts_chunk$set(warning = FALSE)

Creating an analysisPipeline object

An object of class AnalysisPipeline can be created like so:

obj <- AnalysisPipeline(input = iris)

class(obj)

While initializing the object, an input dataframe can be provided on which the pipeline should work, either by providing the filePath to a .csv file through the filePath argument, or by providing R dataframe available in the session, through the input argument

The AnalysisPipeline object has a set of getters, for retrieving various slots containing data and metadata required for pipeline execution. The most basic of them is the getInput method which retrieves the input dataframe with which the object has been initialized. If not initialized with a dataframe, the setInput method can be used to do so.

obj %>>% getInput %>>% str
getRegistry()

The getRegistry function retrieves the set of functions and their metadata available for pipelining. Any AnalysisPipeline object comes with a set of pre-registered functions which can be used out-of-the-box. Of course, the user can register her own functions, to be used in the pipeline. We will look at this later on in the vignette.

There are two types of functions which can be pipelined:

Both pre-registered and user-defined functions work with the AnalysisPipeline object in the same way i.e. regardless of who writes the function, they follow the same semantics.

Creating a simple pipeline

We'll now take a look at creating a simple pipeline, with some of the pre-registered functions available in the registry. We pipeline the univarCatDistPlots function (available as a pre-registered utility function,which generates a chart showing distribution of a categorical variable in a dataset), by simply using the pipe or double pipe operator, and providing the required additional parameters apart from the data on which it needs to operate, as we have already initialized the AnalysisPipeline object with the data.

Note that unless assigned to the same or another object, the pipeline does not get stored.

We can access the details of the pipeline as a tibble through the getPipeline method.

# Running univariate categorical distribution plot on the constructed object
# ?analysisPipelines::univarCatDistPlots
obj1 <- obj %>>% univarCatDistPlots(uniCol = "Species", priColor = "blue", optionalPlots = 0, storeOutput = T)
obj1 %>>% getPipeline

We now construct a pipeline with a couple of functions, one to generate a categorical distribution plot, and another one for outliers.

# Running univariate categorical distribution plot and then 
# outlier detection on the constructed object

obj %>>% univarCatDistPlots(uniCol = "Species", priColor = "green", optionalPlots = 0) %>>% 
         outlierPlot(method = "iqr", columnName = "Sepal.Length", 
              cutoffValue = 0.01, priColor = "blue", optionalPlots = 0, storeOutput = T) -> obj2
obj2 %>>% getPipeline

Apart from the parameters required for the function, a couple of additional parameters can be passed to registered functions when adding them to the pipeline:

Lazy evaluation

It is important to note that the functions in the pipeline are not actually called when the pipeline is defined. This can be seen by looking at the output slot of the pipeline we defined above.

length(obj1@output)

The pipeline can be executed by explicitly calling the generateOutput method and assigning it to a variable.

obj1Output <- obj1 %>>% generateOutput
length(obj1Output@output)

Observe that the length of the output list of the first object (the chunk above) remains 0 since the generated output was assigned to a new object.

length(obj1@output)

A specific output can be viewed by providing the ID of the function in the pipeline to the getOutputById method. The ID can be obtained from the id column in the pipeline tibble. corresponding to the function whose output is desired.

# The index can range from 1 to length(obj@output)
obj1Output %>>% getOutputById("1")

User-defined functions

Registering your own function

You can register your own data or non-data functions by calling registerFunction. This adds the user-defined function to the registry. The registry is maintained by the package and once registered, functions can be used across pipeline objects. The registry can be view by calling the getRegistry function.

# Currently registered functions
getRegistry()

In order to register a function, first the function must be defined in the Global environment, before calling registerFunction.

bivariatePlots <- function(dataset, select_var_name_1, select_var_name_2, 
                       priColor = "blue", secColor='black') {
  x=dataset[, select_var_name_1]
  y=dataset[, select_var_name_2]
  bivarPlot <- ggplot2::ggplot(dataset, ggplot2::aes(x,y)) +
    ggplot2::geom_point(color=priColor,alpha=0.7) +
    ggplot2::geom_smooth(method = lm,color=secColor) +
    ggplot2::xlab(select_var_name_1) +
    ggplot2::ylab(select_var_name_2) + 
    ggplot2::theme_bw() +
    ggplot2::ggtitle(paste('Bivariate plot for', select_var_name_1, 
                           'and', select_var_name_2, sep=' ')) +
    ggplot2::theme(plot.title = ggplot2::element_text(hjust = 0.5, size = 10), 
                   axis.text = ggplot2::element_text(size=10),
                   axis.title=ggplot2::element_text(size=10))
  return(bivarPlot)
}

registerFunction(functionName = "bivariatePlots", heading = "Bivariate Analysis")

Adding the newly registered function to a pipeline

Now the newly registered user-defined function can be used as part of the pipeline, exactly as described before. For example, we add it to a pipeline which already contains some functions. The function then gets added to the end of the pipeline

# Chaining the user-defined function to the object's pipeline where it was registered
obj2 <- obj2 %>>% 
  bivariatePlots(select_var_name_1 = 'Sepal.Length', select_var_name_2 = 'Sepal.Width', 
                 priColor = "blue", secColor = "black")

# Printing the updated pipeline
obj2 %>>% getPipeline 

The newly added function will also lazy evaluate depending on the trigger.

obj2Output <- obj2 %>>% generateOutput()
obj2Output %>>% getOutputById("1")
obj2Output %>>% getPipeline()

Complex pipelines and formula semantics

In addition to the simple pipelines described above, more complex pipelines can also be defined. There are cases when the outputs of previous functions in the pipeline, as inputs to arbitrary parameters of subsequent functions. This is apart from the data which is passed through in a more native manner through the outAsIn argument to subsequent functions in the pipeline.

Using formulae in pipelines

The package defines certain formula semantics to accomplish this. We take the example of two simple user-defined functions, both which simply return the color of the graph, as well as the column on which the graph should be plotted, in order to illustrate how this works.

getColor <- function(color){
  return(color)
}

getColumnName <-function(columnName){
  return(columnName)
}

registerFunction(functionName = "getColor", isDataFunction = F, firstArgClass = "character")
registerFunction(functionName = "getColumnName", isDataFunction = F, firstArgClass = "character")

getRegistry()

In the following pipeline, we pass the colour and column name we want to the non-data functions we defined, and use the output of those in subsequent functions. This can be done simply by specifying a formula of the form 'fid' against the argument to which we want to pass the output. The ID represents the ID of the function in the pipeline. For example, to pass the output of function with ID '1' as an argument to a parameter of a subsequent function, the formula '~f1' is passed to that corresponding argument.

obj %>>% getColor(color = "blue") %>>% getColumnName(columnName = "Petal.Length") %>>%
      univarCatDistPlots(uniCol = "Species", priColor = ~f1, optionalPlots = 0, storeOutput = T) %>>%
      outlierPlot(method = "iqr", columnName = ~f2, cutoffValue = 0.01, priColor = ~f1 , optionalPlots = 0) -> complexPipeline

complexPipeline %>>% getPipeline
complexPipeline %>>% generateOutput -> op
op %>>% getOutputById("3")

Differences between outAsIn and formulae - When to use what

The package provides 2 mechanisms to pass outputs from previous functions to subsequent ones. The first one is the outAsIn parameter. This is limited only to transformations on data that the pipeline is instantiated with which need to be passed on and is limited to the output of the immediate previous function when defining the pipeline. This is provided as an easy-to-use intuitive interface for the common use case of performing a set of sequential data transformations on the input data before performing some kind of analysis. Therefore, this should be sufficient for simple, or linear pipelines.

Formula semantics are provided to implement more complex pipelines, and are not limited to data parameters. Any type of object which is an output of a previous function can be used in a subsequent function. The typical use of these formulae are to provide parameters to certain functions in the pipeline, which are the result of previous functions. Formula semantics can be used for the data parameter of data functions as well. This uses the output of the function specified, instead of the input data the object was instantiated with.

In essence, you can implement any kind of complex pipeline with formula semantics. The outAsIn parameter is provided as an easy-to-use shortcut for simpler linear pipelines. When a formula is specified for the first argument, the outAsIn parameter is rendered irrelevant.

exampleObj <- AnalysisPipeline(input = iris)

filterData <- function(dataset, conditionVar, val){
  condition <- paste0(conditionVar, '== "', val, '"')
  dataSub <- dataset %>>% dplyr::filter_(condition)
  return(dataSub)
}

summarizeData <- function(dataset, conditionVar){
  dataGrouped <-  dataset %>>% dplyr::group_by_(conditionVar) %>>% dplyr::summarise_all(dplyr::funs(avg = mean))
  return(dataGrouped)
}

plotLine <- function(dataset, y, x){
    p <- ggplot2::ggplot(data = dataset, ggplot2::aes_string(y = y, x = x)) + 
         ggplot2::geom_line(color = "blue")
  return(p)
}

plotSummary <- function(d, summaryVar, groupingVar){
  p <- ggplot2::ggplot(data = d, ggplot2::aes_string(y = paste0(summaryVar, "_avg"), x = groupingVar)) + 
         ggplot2::geom_col(fill = "blue")
  return(p)
}

registerFunction("filterData")
registerFunction("summarizeData")
registerFunction("plotLine")
registerFunction("plotSummary")



exampleObj %>>%  summarizeData(conditionVar = "Species") %>>% 
                filterData(conditionVar = "Species", val = "setosa", outAsIn = F, storeOutput = F) %>>%
                plotLine(y = "Sepal.Length", x = "Sepal.Width", outAsIn = T, storeOutput = T) %>>%
                plotSummary(d = ~f1, summaryVar = "Sepal.Length", groupingVar = "Species", storeOutput = T) ->
  exampleObj

#exampleObj %>>% visualizePipeline
#exampleObj %>>% generateOutput %>>% getOutputById("4")

Visualizing pipelines

These pipelines can be visualized by calling the visualizePipeline method. This generates the whole pipeline as a network, showing the engines on which each function is run, and which outputs are stored.

complexPipeline %>>% visualizePipeline

Generating reports

The generated output can easily be rendered into a beautiful, formatted report by calling the generateReport method. The report is a html file which contains the stored outputs, as well as a peek into the data and the pipeline that was executed.

obj2Output %>>% generateReport(path = '~/Desktop')
objRep <- obj %>>% bivariatePlots(select_var_name_1 = 'Petal.Length',select_var_name_2 =  'Petal.Width', 
                           priColor = "blue", secColor = "black")

objRep %>>% generateReport(path = '~/Desktop')

Saving and loading pipelines

Pipelines can also be saved and loaded for re-use and execution on new data. The pipeline can be saved by calling the savePipeline function. On save, both the pipeline and the existing state of the registry is saved (including both predefined as well as user-defined functions).

obj2 %>>% savePipeline(path = 'pipeline.RDS')

This pipeline can be loaded and while loading instantiated with a new dataset of the same schema as that which was used in the original pipeline (in order to prevent errors). This can be used for a variety of purposes such as:

library(analysisPipelines)
obj3 <- loadPipeline(path = 'pipeline.RDS', input = iris) 
getRegistry()
obj2 %>% getPipeline

Details of execution

This section provides an overview of how pipelines are executed, along with additional features such as logging and exception handling.

Execution mechanism

The 'analysisPipelines' package internally converts the pipeline defined by the user into a directed graph which captures dependencies of each function in the pipeline on data, other arguments as well as outputs as other functions.

Topological sort and ordering

When it is required to generate the output, the pipeline is first prepped by performing a topological sort of the directed graph, and identifying sets (or) batches of independent functions and a sequence of batches for execution. A later release of the package will allow for parallel execution of these independent functions

Memory management & garbage cleaning

Memory is managed efficiently, by only storing outputs which the user has explicitly specified, or temporarily storing intermediate outputs required for subsequent functions only until they are required for processing. Garbage cleaning is performed after the execution of each batch in order to manage memory effectively.

Type conversions

In the case of Interoperable pipelines executing across multiple engines such as R, Spark and Python, type conversions between data types in the different engines is minimized by identifying the optimal number of type conversions, before execution starts

Logging & Execution times

The package provides logging capabilities for execution of pipelines, as you might have noted when the output was generated in sections above. By default, logs are written to the console, but alternatively the user can specify an output file to which the logs need to be written through the setLoggerDetails function.

obj2 %>>% setLoggerDetails(target = "file") -> obj2

Logs capture errors, as well as provide information on the steps being performed, execution times and so on.

Custom exception-handling

By default, when a function is registered, a generic exception handling function which captures the R error message, in case of error is registered against each function in the registry. The user can define a custom exception handling function, by defining it and providing it during the time of registration. The function should take 1 argument, which is the error object.

sampleFunction <- function(text){
  return(text)
}

sampleException <- function(error){
  stop("There was an error with the provided input")
}

registerFunction(functionName = 'sampleFunction', "Sample", exceptionFunction = "sampleException")

getRegistry()

Saving and loading registries

Registries can be saved and loaded like pipelines. The characteristic of the registry is that it is shared across all pipelines in a particular session. Therefore, when a registry is loaded it overwrites the existing registry in the session

getRegistry()
saveRegistry("./registry.RDS")
##.rs.restartR() # Run this on console to restart the R session
rm(list=ls(all=TRUE)) # Remove all objects in the R environment
getRegistry()

loadRegistry("./registry.RDS")
getRegistry()


Try the analysisPipelines package in your browser

Any scripts or data that you put into this service are public.

analysisPipelines documentation built on July 1, 2020, 7:09 p.m.