knitr::opts_chunk$set(collapse=TRUE, comment="#>")
options(dplyr.print_min=5L, dplyr.print_max=5L)

cc <- rxSetComputeContext("local")

This vignette goes into more details on using dplyrXdf. Most of dplyrXdf works the same way as dplyr, so if you know how to use the latter, then you also (mostly) know how to use the former. However, there are some places in which the two packages are different. These will be described below.

As with the introductory vignette, we'll use the nycflights13 flights data to demonstrate the use of dplyrXdf.

Importing to Xdf

The easiest way to import data into an Xdf file is via the as_xdf function. This works with any RevoScaleR data source, as well as any object that can be converted into a data frame. For example, you can use this to import data from a SQL Server database table, a SAS or SPSS dataset, or a delimited text file. The syntax is very simple, as can be seen by importing the flights data:

library(dplyrXdf) # also loads dplyr
library(nycflights13)

flightsXdf <- as_xdf(flights)
tbl_vars(flightsXdf)

This will create an Xdf file whose name is taken from the input data, in this case "flights.xdf".

To demonstrate importing from something other than a data frame, let's write the flights data to a csv file, and then import it. As before, the name of the output Xdf file will be taken from the name of the input, with the extension changed to ".xdf". To avoid overwriting the Xdf we've just created, we should therefore call our csv file something other than "flights". However, as_xdf has a file argument to specify the name of the output Xdf, so let's use that instead:

# create a RevoScaleR text data source pointing to a csv file
write.csv(flights, "flights.csv", row.names=FALSE)
flightsCsv <- RxTextData("flights.csv")

flightsXdf2 <- as_xdf(flights, file="flights2.xdf")
tbl_vars(flightsXdf2)
file.remove("flights.csv")
delete_xdf(flightsXdf2)

As well as a csv file, as_xdf will work with any RevoScaleR data source, or with any R object that can be treated as a data frame. In particular, the following code will import a table from a SQL Server database.

sqlSrc <- RxSqlServerData(table="mytable", server="host", databaseName="mydatabase", ...)

# this will create the file 'mytable.xdf'
xdfSrc <- as_xdf(sqlSrc)

However, rather than importing the data, dplyrXdf can also work directly with a database. If given a RxOdbcData, RxTeradata or RxSqlServerData source as the input to a pipeline, dplyrXdf will convert it to a dplyr tbl. For more information, see the vignette on SQL databases.

Tbls and file management

To facilitate the task of file management, dplyrXdf defines a new tbl_xdf class that extends the RxXdfData class. This is what allows it to keep track of which data sources should remain untouched, and which can be modified or overwritten as part of a pipeline. To the base RevoScaleR functions, a tbl_xdf object is just a normal Xdf data source; thus, existing code dealing with Xdfs should work with minimal modification. However, the verbs implemented in dplyrXdf will recognise when they are passed a tbl_xdf, as opposed to a normal Xdf file, in which case they will delete their input file after writing the output file. Thus there is always only one file that represents the latest stage of a pipeline.

In general, you never need to create a tbl_xdf manually. An exception is if you want to create a tbl_xdf from a non-Xdf data source. In that case you can do the following, which will transparently import the data into an Xdf file:

txt <- RxTextData("path/to/file.txt")
txtTbl <- as(txt, "tbl_xdf")

A side-effect of dplyrXdf managing files for you is that you should be careful when passing the result from an initial pipeline into subsequent pipelines. Consider the following example:

# pipeline 1
output1 <- flightsXdf %>%
    mutate(delay=(arr_delay + dep_delay)/2)

# use the output from pipeline 1
output2 <- output1 %>%
    group_by(carrier) %>%
    summarise(delay=mean(delay))

# reuse the output from pipeline 1 -- WRONG
output3 <- output1 %>%
    group_by(dest) %>%
    summarise(delay=mean(delay))

The bug in this code is that the second pipeline will overwrite or delete its input, so the third pipeline will fail. This is consistent with dplyrXdf's philosophy of only saving the most recent output of a pipeline, where a pipeline is defined as all operations starting from a raw Xdf file. However, in this case it isn't what we want.

Similarly, dplyrXdf creates its output files in a temporary directory, which will be deleted when you quit R. This saves you having to manually delete files that are no longer in use, but it does mean that you must copy the results of your analyses to a permanent location if you want to keep it around.

dplyrXdf gives you a number of ways to deal with these issues.

Specifying the output format

First, all dplyrXdf verbs support a special argument .outFile, which controls how the output data is handled. If you don't specify a value for this argument, the data will be saved to a tbl_xdf which will be managed by dplyrXdf. This supports the default behaviour of dplyrXdf, whereby data files are automatically created and deleted inside a pipeline. There are two other options for .outFile:

To show how this works, we can modify pipeline 1 above to save its output to a persistent location. This resets the status of the pipeline, so that subsequent operations will know not to modify the data.

# pipeline 1 -- use .outFile to save the data
output1 <- flightsXdf %>%
    mutate(delay=(arr_delay + dep_delay)/2, .outFile="output1.xdf")

# use the output from pipeline 1
output2 <- output1 %>%
    group_by(carrier) %>%
    summarise(delay=mean(delay))

# reuse the output from pipeline 1 -- this works as expected
output3 <- output1 %>%
    group_by(dest) %>%
    summarise(delay=mean(delay))

The persist verb

The second way of creating a persistent Xdf file is with the persist verb. This is a simple function that moves or copies its input to the specified location. Like the .outFile argument described above, it will also reset the status of the pipeline.

# pipeline 1 -- use persist to save the data
output1 <- flightsXdf %>%
    mutate(delay=(arr_delay + dep_delay)/2) %>% persist("output1.xdf")

# use the output from pipeline 1
output2 <- output1 %>%
    group_by(carrier) %>%
    summarise(delay=mean(delay))

# reuse the output from pipeline 1 -- this also works as expected
output3 <- output1 %>%
    group_by(dest) %>%
    summarise(delay=mean(delay))

In general, setting .outFile to save an Xdf file is preferred to using the persist verb. You would use persist if you have already run a pipeline, and want to save its output after the fact.

Using as_xdf

Another way to create a Xdf file from a tbl_xdf is via as_xdf, which was described previously. as_xdf always creates a raw Xdf data source, and if called from a tbl_xdf, has the effect of removing the tbl information. This does not modify the file at all, unlike persist. The advantage of this is that it's fast (since the data is not touched); the disadvantage is that the file remains in the dplyrXdf temporary directory. Because of this, you still have to make a new copy if you want the data to remain after you quit your R session.

outputXdf <- as_xdf(output3)

output3

# no longer a tbl_xdf
outputXdf

Copying, moving and deleting Xdf files

A number of functions are supplied for manipulating Xdf files as files:

All of these except delete_xdf return an Xdf data source object pointing to the new file's name/location. For example, we can use move_xdf to move the output of as_xdf, from above, to a more permanent location.

output3 <- move_xdf(output3, "d:/data/output3.xdf")
output3

Setting the tbl directory

By default, dplyrXdf will save the files it creates into the R working directory. On some systems, this may be located on a filesystem that is relatively small; this is rarely an issue with open source R, as all its objects are loaded into memory, but can be problematic with large Xdf files. You can view the location of the current Xdf tbl directory with get_dplyrxdf_dir:

get_dplyrxdf_dir()

Similarly, you can change the location of the Xdf tbl directory with the set_dplyrxdf_dir function:

# set the tbl directory to a network drive (on Windows)
set_dplyrxdf_dir("n:/Rtemp")

For best results, you should avoid setting the Xdf tbl directory to a remote location/network drive unless you have a fast network connection.

Standard and composite Xdf files

RevoScaleR actually supports two kinds of Xdf files: standard (what we've been using so far) and composite. A composite Xdf is actually a directory that contains multiple data and metadata files; the RevoScaleR functions know how to treat this as a single data source. Similarly, the dplyrXdf verbs can work transparently with both kinds of Xdfs, with the output of a verb being of the same kind as the input.

Xdf files in the native filesystem can be either standard or composite, but those in HDFS must be composite to work properly. By default, dplyrXdf will create a composite Xdfs if the input dataset is stored in HDFS, and standard otherwise. There are also as_standard_xdf and as_composite_xdf functions if you want to create a specific type directly.

If required, you can also use these functions to switch an Xdf file (including a tbl_xdf) between standard and composite. Calling as_standard_xdf on a composite Xdf file will convert it to standard format, and vice-versa for calling as_composite_xdf on a standard Xdf.

compositeOutput3 <- as_composite_xdf(output3)
compositeOutput3
delete_xdf(output3)
delete_xdf(compositeOutput3)

New and enhanced verbs

In addition to the persist verb described above, dplyrXdf adds new functionality for existing verbs, as well as some new single-table verbs.

The subset verb

In dplyr, subsetting data is handled by two verbs: filter for subsetting by rows, and select for subsetting by columns. This is fine for data frames, where everything runs in memory, and for SQL databases, where the hard work is done by the database. For Xdf files, however, this is suboptimal, as each verb translates into a separate I/O step where the data is read from disk, subsetted, then written out again. This can waste a lot of time with large datasets.

As it turns out, base R has a subset generic which (as the name says) performs subsetting on both rows and columns. You've probably used it with data frames:

subset(flights, month <= 6 & day == 1, c(dep_time, dep_delay, carrier))

dplyrXdf implements a method for subset that works for Xdf files. The code is exactly the same as for a data frame, except that it creates another Xdf file. This produces the same result as a filter followed by a select, but requires only half the amount of I/O.

flightsXdfSub <- subset(flightsXdf, month <= 6 & day == 1, c(dep_time, dep_delay, carrier))
class(flightsXdfSub)
head(flightsXdfSub)

Passing RevoScaleR arguments with .rxArgs

The RevoScaleR functions typically have several arguments beyond those used by dplyrXdf verbs. While usually you don't need to touch these, it can sometimes be useful to do so. For example, when using mutate or transmute, you could specify more complicated transformations via a transformFunc. Similarly, rather than chaining together a mutate and a summarise --- which would involve creating an intermediate file --- you could incorporate the variable transformation into the summarise itself. More low-level uses of such arguments include setting the block size for an Xdf file, changing the compression level, limiting the number of rows, and so on.

Most of the dplyrXdf verbs accept a .rxArgs argument as a way of transmitting these extra arguments to the underlying RevoScaleR code. This should be a named list specifying the names and values of the arguments to be passed. The exact arguments will vary depending on the RevoScaleR function that is called, which will in turn depend on the verb. Here is a list of the verbs and the RevoScaleR function that they call:

Here are some examples to illustrate the use of .rxArgs:

# subset, transform and summarise in the one step
flightsSubsetSmry <- flightsXdf %>% group_by(day) %>%
    summarise(delay=mean(delay), n=n(),
        .rxArgs=list(
            transforms=list(delay=(dep_delay + arr_delay)/2),
            rowSelection=carrier == "UA"
        )
    )
head(flightsSubsetSmry)

# a complex transformation involving a transformFunc
flightsTrans <- transmute(flightsXdf, 
    .rxArgs=list(
        transformFunc=function(varlist) with(varlist, {
            delay <- (dep_delay + arr_delay)/2
            date <- as.Date(sprintf("%d-%02d-%02d", year, month, day))
            weekday <- weekdays(date)
            weekendDelay <- ifelse(weekday %in% c("Saturday", "Sunday"),
                                   delay, NA)
            list(delay=delay, weekday=weekday, weekendDelay=weekendDelay)
        })
    )
)
head(flightsTrans)

# fit a model using open source R, and then score the training dataset
# we pass the model object via transformObjects, and the package to load
# via transformPackages
library(rpart)
flightsModel <- rpart(arr_delay ~ dep_delay + carrier + hour, data=flights)

flightsScores <- transmute(flightsXdf,
    pred=predict(model, data.frame(dep_delay, carrier, hour)),
    .rxArgs=list(
        transformObjects=list(model=flightsModel),
        transformPackages="rpart"
    )
)
head(flightsScores)

You should use .rxArgs with caution, as some verbs modify the data as part of their normal functioning, so the results you get back may not be as expected. It's also easy to write convoluted code that makes your pipelines hard to read. Nevertheless, if you are working with very large datasets and speed is important, this is one way to improve the efficiency of your code.

Setting the summary method for summarise

For best performance, when using summarise you should request only those summary statistics supported by rxCube and/or rxSummary: sum, mean, min, max, sd, var and n (the count of observations). If you request something else, dplyrXdf will split the dataset into multiple data frames, one per group, and call dplyr::summarise on each data frame; this will generally work as intended, but may be slow.

The dplyrXdf version of summarise can choose from a number of methods for computing the summary statistics. While it's usually smart enough to choose the best method, you can set this manually with the .method argument, which takes a number from 1 to 5:

  1. Use rxCube
  2. Use rxSummary
  3. Use rxSummary but create the groups by concatenating the grouping variables together; this is to work around a limitation in the RevoScaleR functions on the maximum number of cells in a cube
  4. Split the dataset into multiple data frames, call dplyr::summarise on each
  5. Split the dataset into multiple Xdf files, call rxSummary on each

Only methods 1, 2 and 3 support the use of .rxArgs.

In addition, dplyrXdf summarise doesn't support expressions as summary statistics. For example, something like summarise(datasrc, weightedMean=sum(x*wt)/sum(wt)) works when datasrc is a data frame, but not when it is an Xdf. To get the desired result, one workaround would be to use three verbs in a pipeline:

datasrc %>%
    mutate(xwt=sum(x*wt)) %>%
    summarise(xwt=sum(xwt), wt=sum(wt)) %>%
    mutate(weightedMean=xwt/wt)

In this particular case though, you could also use rxCube's built-in pweight argument to compute the weighted mean:

datasrc %>%
    summarise(weightedMean=mean(x), .rxArgs=list(pweight="wt"))

Creating factors with factorise

Many RevoScaleR functions are optimised to work best with factors, or require factors as input. dplyrXdf provides a simple shell to the rxFactors function to convert non-factor variables to factors. The syntax is as follows:

factorise(data, x1, x2, ...)

where x1, x2, ... are the variables to convert. Note that the generated factor variables will overwrite the originals. For performance reasons, the levels of the generated factors are not sorted in alphabetical order. You can also specify the levels for the factor(s) in question, using the standard name=value syntax:

factorise(data, x1=c("a", "b", "c"))

This will convert the variable x1 into a factor with levels a, b and c. Any values that don't match the specified levels will be turned into NAs.

The verbs in dplyrXdf will usually create factors on the fly as needed, so you shouldn't need to call factorise very often. However, should you need it, factorise provides an explicit way to create factors within the framework of dplyrXdf and pipelines.

There are a number of ways to specify the variables to convert, in addition to naming them explicitly. The functions all_character(), all_numeric() and all_integer() will convert all the variables falling under these categories. A logical variable counts as integer for this purpose. You can also use the helper functions available to dplyr::select_vars to choose variables based on their names.

By default, if no variables are specified in the factorise call, then all character variables will be converted to factors. As with select, renaming variables as part of the factor conversion is not supported.

Executing code with do and do_xdf

The do verb is an exception to the rule that dplyrXdf verbs write their output as Xdf files. This is because do executes arbitrary R code, and can return arbitrary R objects; while a data frame is capable of storing these objects, an Xdf file is limited to character and numeric vectors only.

The doXdf verb is similar to do, but where do splits its input into one data frame per group, do_xdf splits it into one Xdf file per group. This allows do-like functionality with grouped data, where each group can be arbitrarily large. The syntax for the two functions is essentially the same, although the code passed to do_xdf must obviously know how to handle Xdfs.

# fit a regression model by carrier, using rxLinMod
flightsMods <- flightsXdf %>%
    group_by(carrier) %>%
    do_xdf(model=rxLinMod(arr_delay ~ dep_delay + hour, data=.))

flightsMods$model[[1]]

Parallel processing of grouped data

dplyrXdf takes a fairly literal approach to implementing the split-apply-combine strategy for grouped data. Most verbs that have specific methods for groups will split the data into multiple Xdf files, and then process each file separately (the exception is summarise, because rxCube and rxSummary can deal with groups natively). This makes it easy to parallelise the processing of groups in dplyrXdf.

dplyrXdf will automatically use the RevoScaleR compute context that you set via rxSetComputeContext(). For example, if you set the compute context to RxLocalParallel(), dplyrXdf will create a cluster of slave nodes in the background and send the data to the nodes by group. The cluster is destroyed when the verb returns.

For more flexibility, you can manually create a cluster and set the compute context to RxForeachDoPar(). This has a number of benefits. First, the cluster is persistent, so no time is wasted in creating and then destroying it each time you want to work with groups. Second, and perhaps more importantly, the RxForeachDoPar compute context is agnostic as to the nature of the underlying cluster. In particular, you can create a cluster of multiple (virtual) machines, which will be much more scalable than a cluster of multiple processes on a single machine. The main requirements are that you must be able to register the cluster with the foreach package; and all the machines must have access to the same filesystem, such as a network drive or file share.

The code below gives an example of creating a cluster of VMs, and then running dplyrXdf code on it. This uses the doAzureParallel package, which is an easy-to-use way to create clusters of Azure Data Science VMs. However, you could use any other package to create the cluster (such as the parallel package that comes with open source R; SNOW; Rmpi; etc).

library(doAzureParallel)

# doAzureParallel uses json files for cluster setup; see the documentation on GitHub for details
setCredentials("credentials.json")

# create and register a cluster of VMs in Azure
cl <- makeCluster("cluster.json")
registerDoAzureParallel(cl)

# tell dplyrXdf to use our cluster
rxSetComputeContext("dopar")

# set the dplyrXdf working directory to a cluster-accessible location
set_dplyrxdf_dir("n:/clusterdata")

# fit models in parallel across the cluster
flightsXdf %>%
    group_by(carrier) %>%
    do(model=lm(arr_delay ~ dep_delay + hour, data=.))

# shut down the cluster when we're done
stopCluster(cl)
rxSetComputeContext("local")

Note that the above assumes your data is stored on the native filesystem (either on your machine's local hard disk, or in a network share). If your data is stored in a Hadoop or Spark cluster, dplyrXdf will take advantage of the Hadoop and Spark compute contexts to process data in parallel on the worker nodes. See the HDFS vignette for more information on working with Xdf files in Spark.

Data frame methods

dplyrXdf includes a number of convenience functions for exporting data to data frames:

These are simple wrappers around RevoScaleR's rxDataStep function that turn off the default size check. Make sure you have enough memory to hold the data before using them!

head(flightsXdf$arr_delay)

head(pull(flightsXdf, arr_delay))  # same as above
unlink(c("flights.xdf", "output1.xdf"))
clean_dplyrxdf_dir("native")
rxSetComputeContext(cc)


RevolutionAnalytics/dplyrXdf documentation built on June 3, 2019, 9:08 p.m.