A Brief Introduction to bdpar"

Abstract

*Bdpar* is a tool to easily build customized data flows to pre-process large volumes of information from different sources. To this end, *bdpar* allows to (i) easily use and create new functionalities and (ii) develop new data source extractors according to the user needs. Additionally, the package provides by default a predefined data flow to extract and preprocess the most relevant information (tokens, dates, ... ) from some textual sources (SMS, email, tweets, YouTube comments).

Introduction and basics

The package has been develop using R6 class for implement, for example, the extraction of input data and the 18 Pipes that there are in the application by default. Next, the different tools that make up the application are explained and how the features it offers can be extended.

In case of any more specific doubt, use the package help through help(package = "bdpar").

Instance

To manage the information obtained from the input data, the package uses an structure, called *Instance*, which allows store the extracted **properties** of the different *Pipes*. Below are described all the fields comprising the *Instance* class. - **source**: The input data without modifications. - **date**: The date on which the **source** was generated or sent. - **data**: The input data with modifications. - **properties**: Contains a list of properties extracted from the **data** that is being processed. - **path**: Identifier of the input data. - **isValid**: Indicates if the *Instance* is valid or not. - **flowPipes**: The list contains the *Pipes* that the *Instance* has passed through. - **banPipes**: The list contains the *Pipes* that can not be executed from that moment.

Types of input data available by default

The package have four input data implemented, which are:

**bdpar** is able to load data source from multiple sources. In fact, by default is fully compatible with Email (eml), Twitter (twtid), SMS (tsms) and YouTube (ytbid) data sources. However, new data loaders can be easily implemented by implementing (i) a new class implementing *obtainSource* and *obtainDate* abstract methods from the *Instance* class and (ii) registering this new class on the *ExtractorFactory* class through its method *registerExtractor*. Is important to take into account that the type of Instance used is deducted by default according to the file extension. However, this behaviour can be easily modelled according to user needs.

How to create a customized input data extractors.

This example shows how a new type of *Instance* (named *ExtractorTytb*) is be created. In this particular case, *ExtractorTytb* is responsible of extracting textual comments from Youtube files (extension .tytb).

```{R, echo = TRUE, results = "hide"} library(R6) ExtractorTytb <- R6Class( classname = "ExtractorTytb", inherit = Instance, public = list( initialize = function(path) { if (!"character" %in% class(path)) { stop("[ExtractorTytb][initialize][Error] ", "Checking the type of the variable: path ", class(path)) } super$initialize(path) }, obtainDate = function() { super$setDate(file.info(super$getPath())[["ctime"]]) }, obtainSource = function() { super$setSource(readLines(super$getPath(), warn = FALSE)) super$setData(super$getSource()) } ) )

### Enabling a new Instance.

<div style = "text-align: justify">

In order to automatically execute the new *Instance* class (*ExtractorTytb*), is must be registered through *registerExtractor* method of *ExtractorFactory* class. Below is shown an example describing how this extractor is registered.

</div>

```{R, echo = TRUE, results = "hide",ExtractorTytb}
library(bdpar)
extractors <- ExtractorFactory$new()
extractors$registerExtractor("tytb", ExtractorTytb)

Pipe

A pipe consists of a simple task responsible for generating a new output by applying some transformations over the input data. A set of sequentially interconnected pipes to achieve a required result is called pipeline. Pipes in bdpar are represented as *GenericPipe* class while the pipelining process is defined through *GenericPipeline* abstract class.

Dependencies

An important feature that has been added to the Pipe concept is the control of the functionalities that go before and after a Pipe, that is, a control of the preprocessing flow. On the one hand, this control allows us to ensure that if a Pipe needs another before, it has already been executed. On the other hand, prevents a Pipe can not be executed later because it interferes with the functionality of a previous Pipe. This functionality can be customized in each of the pipes that the user uses and/or develops, allowing to decide what to do in the situation in which the dependencies are not respected. Internally, the operator always checks if the Instance satisfies the dependencies to enter the following pipe.

Pipes available by default

The framework provides over 18 different pipes (inherited from GenericPipe). Each pipe is classified following two categories: (i) basic-functionality pipes and (ii) external file access pipes.

### (i) Pipes of basic functionality #### **File2Pipe** Obtains the **source** using the *obtainSource* method which implements the subclass of the superclass *Instance*. By default, the subclass implemented are *ExtractorEml*, *ExtractorSms*, *ExtractorTwtid* and *ExtractorYtbid*. #### **FindEmojiPipe** Creates a new **emoji** property where the emojis stored that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, removing the emojis that are found. #### **FindEmoticonPipe** Creates a new **emoticon** property where the emoticons stored that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, removing the emoticons that are found. #### **FindHashtagPipe** Creates a new **hashtag** property where the hashtags stored that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, removing the hashtags that are found. #### **FindUrlPipe** Creates a new **URLs** property where the URLs stored that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, removing the URLs that are found. #### **FindUserNamePipe** Creates a new **userName** property where the user names stored that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, removing the user names that are found. #### **GuessDatePipe** Obtains the *date* using the *obtainDate* method which implements the subclass of the superclass *Instance*. By default, the subclass implemented are *ExtractorEml*, *ExtractorSms*, *ExtractorTwtid* and *ExtractorYtbid*. #### **GuessLanguagePipe** Guesses the language by using language detector of library cld2. Creates the **language** property which indicates the idiom text. Optionally, it is possible to choose the language provided by Twitter. #### **MeasureLengthPipe** Creates the **length** property which indicates the length of the text. The property's name is customize throught the class constructor. #### **StoreFileExtPipe** Creates the **extension** property which indicates file's extension. #### **TargetAssigningPipe** Identifies the class of the *Instance* (**target** attribute), starting with the path of the file. #### **TeeCSVPipe** Generates a CSV from the **properties** of the *Instance*. #### **ToLowerCasePipe** Converts the data of an *Instance* to lower case. ### (ii) Pipes that access external files #### **AbbreviationPipe** Creates a new property **abbreviation**, where the abbreviations that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, replacing the abbreviations found for its extended version. The abbreviations and their substitutions have to be stored in files of type json the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the *ResourceHandler* class. #### **ContractionPipe** Creates a new property **contraction**, where the contractions that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, replacing the contractions found for its extended version. The contractions and their substitutions have to be stored in files of type json the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the *ResourceHandler* class. #### **InterjectionPipe** Creates a new property **interjection**, where the interjections that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, removing interjections. The interjections will be stored in files of type json and the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the *ResourceHandler* class. #### **SlangPipe** Creates a new property **slang**, where the slang words that are in the **data** attribute are stored. In addition, you can decide if you want to modify the **data** property of the *Instance*, replacing the contractions for its extended version. The words slang and their substitutions hava to be stored in files of type json and the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the *ResourceHandler* class. #### **StopWordPipe** Creates a new property **stopwords**, where the *empty words* that are in the **data** attribute are stored. In addition, you can decide if you want to modify the data property of the *Instance*, removing the *empty words*. The *empty words* will be stored in files of type json and the file associated with the language of the text is chosen. The loading and handling of the information of these files will be done through the *ResourceHandler* class.

How to create your customized Pipe

Additionally, in order to improve the customization capabilities, bdpar allows to easily desing and develop new personalized pipes by implementing the pipe method included in the *GenericPipe* class (inheritance relation). The code included below, exemplified the creation of a simple pipe in charge of removing multiple consecutive spaces from a string.

```{R, echo = TRUE, results = "hide"} library(R6) RemovesWhiteSpaces <- R6Class( "RemovesWhiteSpaces", inherit = GenericPipe, public = list( initialize = function(propertyName = "", alwaysBeforeDeps = list(), notAfterDeps = list()) { if (!"character" %in% class(propertyName)) { stop("[RemovesWhiteSpaces][initialize][Error] ", "Checking the type of the 'propertyName' variable: ", class(propertyName)) } if (!"list" %in% class(alwaysBeforeDeps)) { stop("[RemovesWhiteSpaces][initialize][Error] ", "Checking the type of the 'alwaysBeforeDeps' variable: ", class(alwaysBeforeDeps)) } if (!"list" %in% class(notAfterDeps)) { stop("[RemovesWhiteSpaces][initialize][Error] ", "Checking the type of the 'notAfterDeps' variable: ", class(notAfterDeps)) } super$initialize(propertyName, alwaysBeforeDeps, notAfterDeps) }, pipe = function(instance) { if (!"Instance" %in% class(instance)) { stop("[RemovesWhiteSpaces][pipe][Error] ", "Checking the type of the 'instance' variable: ", class(instance)) } instance$setData(trimws(x = instance$getData()))

  if (length(instance$getData()) == 0) {
    instance$invalidate()
  }

  return(instance)
}

) )

## Flow of Pipes (pipelining proccess)

<div style = "text-align: justify">

Flow of pipes is the set of pipes that comprising the whole preprocessing proccess. 
By default bdpar provides a default pipelining proccess (implemented in *DefaultPipeline*) comprising all the 18 available pipes. 

</div>

### Flow of Pipes available by default

The code included below shows a pipelining example comprising 18 pipes:

```R
  instance %>|%
    TargetAssigningPipe$new() %>|%
    StoreFileExtPipe$new() %>|%
    GuessDatePipe$new() %>|%
    File2Pipe$new() %>|%
    MeasureLengthPipe$new(propertyName = "length_before_cleaning_text") %>|%
    FindUserNamePipe$new() %>|%
    FindHashtagPipe$new() %>|%
    FindUrlPipe$new() %>|%
    FindEmoticonPipe$new() %>|%
    FindEmojiPipe$new() %>|%
    GuessLanguagePipe$new() %>|%
    ContractionPipe$new() %>|%
    AbbreviationPipe$new() %>|%
    SlangPipe$new() %>|%
    ToLowerCasePipe$new() %>|%
    InterjectionPipe$new() %>|%
    StopWordPipe$new() %>|%
    MeasureLengthPipe$new(propertyName = "length_after_cleaning_text") %>|%
    TeeCSVPipe$new()

Create your own flow of Pipes

Additionally, in order to build a flexible framework, bdpar allows users to define their own (and customized) flow of pipes. To accomplish this task, it is necessary to create a new class that inherits from *GenericPipeline* and implements the *execute()* method. Below is included an example of how a new pipe (called *TestPipeline*) is created:

```{R, echo = TRUE, results = "hide", RemovesWhiteSpaces} library(R6) library(bdpar) TestPipeline <- R6Class( "TestPipeline", inherit = GenericPipeline, public = list( initialize = function() { }, execute = function(instance) { if (!"Instance" %in% class(instance)) { stop("[TestPipeline][execute][Error] ", "Checking the type of the 'instance' variable: ", class(instance)) } message("[TestPipeline][execute][Info] ", instance$getPath()) tryCatch( instance %>|% TargetAssigningPipe$new() %>|% StoreFileExtPipe$new() %>|% File2Pipe$new() %>|% RemovesWhiteSpaces$new() %>|% TeeCSVPipe$new() , error = function(e) { message("[TestPipeline][execute][Error]", instance$getPath(), " :", paste(e)) instance$invalidate() } ) return(instance) } ) )

<div style = "text-align: justify">

Alternatively, the pre-processing flow can be dynamically constructed through the *DynamicPipeline* class. An example of its use is shown below. To see the rest of the options offered by this method, access the package's help through *help(package = "bdpar")*.

</div>

```{R, echo = TRUE, results = "hide"}
library(bdpar)
pipeline <- DynamicPipeline$new()
pipeline$add(list(TargetAssigningPipe$new(),StoreFileExtPipe$new(),File2Pipe$new()), pos = NULL)
pipeline$add(list(TeeCSVPipe$new()), pos = NULL)

Operator

To manage the flow of pipes, there is a new operator called **%>\|%**. This operator allows check if the *Instance* was invalidated in the lastest *Pipe* or not. In the case that the *Instance* is invalid, the flow of *Pipes* stops and the preprocessing pass to the next *Instance*. In the other case, the *Instance* continues the current flow of *Pipes*. On the other hand, it facilitates the workflow between pipes because it automatically invokes the *pipe* function of the objects that pass through it. Finally, it should be noted that it also checks whether an *Instance* complies with the requirements to enter the following pipe.

bdpar.Options

The *bdpar.Options* object is used to store the different configuration parameters of the pipes used in the preprocessing. For example, to indicate the keys used to work with the APIs that require it (such as YouTube or Twitter) as well as various configuration parameters that allow to customize the behavior of the application such as the choice of text format to use in case there are multipart emails (plain text or text in html format). It is important to keep in mind that if the parameters are not needed, the value can be omitted. The different options that exist initially and how their value is indicated to you are shown below.
# [eml]
bdpar.Options$set("extractorEML.mpaPartSelected", <<PartSelectedOnMPAlternative>>)

# [resources]
bdpar.Options$set("resources.abbreviations.path", <<abbreviation.path>>)
bdpar.Options$set("resources.contractions.path", <<contractions.path>>)
bdpar.Options$set("resources.interjections.path", <<interjections.path>>)
bdpar.Options$set("resources.slangs.path", <<slangs.path>>)
bdpar.Options$set("resources.stopwords.path", <<stopwords.path>>)

# [twitter]
bdpar.Options$set("twitter.consumer.key", <<consumer_key>>)
bdpar.Options$set("twitter.consumer.secret", <<consumer_secret>>)
bdpar.Options$set("twitter.access.token", <<access_token>>)
bdpar.Options$set("twitter.access.token.secret", <<access_token_secret>>)
bdpar.Options$set("cache.twitter.path", <<cache.path>>)

# [teeCSVPipe]
bdpar.Options$set("teeCSVPipe.output.path", <<outputh.path>>)

# [youtube]
bdpar.Options$set("youtube.app.id", <<app_id>>)
bdpar.Options$set("youtube.app.password", <<app_password>>)
bdpar.Options$set("cache.youtube.path", <<cache.path>>)

# [cache]
bdpar.Options$set("cache", <<status_cache>>)
bdpar.Options$set("cache.folder", <<cache.path>>)

# [parallel]
bdpar.Options$set("numCores", <<num_cores>>)

# [verbose]
bdpar.Options$set("verbose", <<status_verbose>>)

Logging on bdpar

The bdpar log is configured through the *configureLog* function. This system manages both the place to display the messages and the priority level of each message showing only the messages with a higher level than indicated in the threshold variable. The log can be output either by console, by file, or both. wzxhzdk:4 The type of message changes according to the level indicated:
  • The **DEBUG**, **INFO** and **ERROR** levels return a text using the *message* function.
  • The **WARN** level returns a text using the *warning* function.
  • The **FATAL** level returns a text using the *stop* function.
If the case of wanting to deactivate the bdpar log completely, the *disableLog* method in *bdpar.Options* object does this task. It is important to note that the messages with **WARN** or **FATAL** level will still be displayed by console. wzxhzdk:5 In the case of wanting to see the status of the instances at the output of each pipe, the **DEBUG** logging mode must be activated. An example of the output could be: wzxhzdk:6 If the *"verbose"* option is enabled, for example in the *runPipeline* function, only messages with **WARN** or **FATAL** levels will be displayed by the console.

Cache functionality

If the bdpar cache is configured through the *"cache"* and *"cache.folder"* options, the status of the instances will be stored after each pipe. This allows to avoid rejections of previously executed tasks, if the order and configuration of the pipe and pipeline is the same as what is stored in the cache. In case of using the *cache* parameter, for example in the *runPipeline* function, it will set the option to the input value. wzxhzdk:7 If you want to remove the cache, the *cleanCache* method does this task.

Parallelization

The bdpar package allows the parallelization of the processing of Instances in order to save computational time. This functionality is configured through the "numCores" option (included in bdpar.Options) which will indicate the number of cores to be used in the execution.

bdpar.Options$set("numCores", 2)

It should be mentioned that in the case of parallelization, the output of the cores' log will only be available in file mode.

Development

The bdpar package is also available in a development version at the Github development page: github.com/miferreiro/bdpar



Try the bdpar package in your browser

Any scripts or data that you put into this service are public.

bdpar documentation built on Aug. 22, 2022, 5:08 p.m.