knitr::opts_chunk$set(collapse=TRUE, comment="#>") options(dplyr.print_min=5L, dplyr.print_max=5L) cc <- rxSetComputeContext("local")
This vignette goes into more details on using dplyrXdf. Most of dplyrXdf works the same way as dplyr, so if you know how to use the latter, then you also (mostly) know how to use the former. However, there are some places in which the two packages are different. These will be described below.
As with the introductory vignette, we'll use the nycflights13 flights data to demonstrate the use of dplyrXdf.
The easiest way to import data into an Xdf file is via the as_xdf
function. This works with any RevoScaleR data source, as well as any object that can be converted into a data frame. For example, you can use this to import data from a SQL Server database table, a SAS or SPSS dataset, or a delimited text file. The syntax is very simple, as can be seen by importing the flights data:
library(dplyrXdf) # also loads dplyr library(nycflights13) flightsXdf <- as_xdf(flights) tbl_vars(flightsXdf)
This will create an Xdf file whose name is taken from the input data, in this case "flights.xdf".
To demonstrate importing from something other than a data frame, let's write the flights data to a csv file, and then import it. As before, the name of the output Xdf file will be taken from the name of the input, with the extension changed to ".xdf". To avoid overwriting the Xdf we've just created, we should therefore call our csv file something other than "flights". However, as_xdf
has a file
argument to specify the name of the output Xdf, so let's use that instead:
# create a RevoScaleR text data source pointing to a csv file write.csv(flights, "flights.csv", row.names=FALSE) flightsCsv <- RxTextData("flights.csv") flightsXdf2 <- as_xdf(flights, file="flights2.xdf") tbl_vars(flightsXdf2)
file.remove("flights.csv") delete_xdf(flightsXdf2)
As well as a csv file, as_xdf
will work with any RevoScaleR data source, or with any R object that can be treated as a data frame. In particular, the following code will import a table from a SQL Server database.
sqlSrc <- RxSqlServerData(table="mytable", server="host", databaseName="mydatabase", ...) # this will create the file 'mytable.xdf' xdfSrc <- as_xdf(sqlSrc)
However, rather than importing the data, dplyrXdf can also work directly with a database. If given a RxOdbcData
, RxTeradata
or RxSqlServerData
source as the input to a pipeline, dplyrXdf will convert it to a dplyr tbl. For more information, see the vignette on SQL databases.
To facilitate the task of file management, dplyrXdf defines a new tbl_xdf
class that extends the RxXdfData
class. This is what allows it to keep track of which data sources should remain untouched, and which can be modified or overwritten as part of a pipeline. To the base RevoScaleR functions, a tbl_xdf
object is just a normal Xdf data source; thus, existing code dealing with Xdfs should work with minimal modification. However, the verbs implemented in dplyrXdf will recognise when they are passed a tbl_xdf
, as opposed to a normal Xdf file, in which case they will delete their input file after writing the output file. Thus there is always only one file that represents the latest stage of a pipeline.
In general, you never need to create a tbl_xdf
manually. An exception is if you want to create a tbl_xdf
from a non-Xdf data source. In that case you can do the following, which will transparently import the data into an Xdf file:
txt <- RxTextData("path/to/file.txt") txtTbl <- as(txt, "tbl_xdf")
A side-effect of dplyrXdf managing files for you is that you should be careful when passing the result from an initial pipeline into subsequent pipelines. Consider the following example:
# pipeline 1 output1 <- flightsXdf %>% mutate(delay=(arr_delay + dep_delay)/2) # use the output from pipeline 1 output2 <- output1 %>% group_by(carrier) %>% summarise(delay=mean(delay)) # reuse the output from pipeline 1 -- WRONG output3 <- output1 %>% group_by(dest) %>% summarise(delay=mean(delay))
The bug in this code is that the second pipeline will overwrite or delete its input, so the third pipeline will fail. This is consistent with dplyrXdf's philosophy of only saving the most recent output of a pipeline, where a pipeline is defined as all operations starting from a raw Xdf file. However, in this case it isn't what we want.
Similarly, dplyrXdf creates its output files in a temporary directory, which will be deleted when you quit R. This saves you having to manually delete files that are no longer in use, but it does mean that you must copy the results of your analyses to a permanent location if you want to keep it around.
dplyrXdf gives you a number of ways to deal with these issues.
First, all dplyrXdf verbs support a special argument .outFile
, which controls how the output data is handled. If you don't specify a value for this argument, the data will be saved to a tbl_xdf
which will be managed by dplyrXdf. This supports the default behaviour of dplyrXdf, whereby data files are automatically created and deleted inside a pipeline. There are two other options for .outFile
:
If you specify .outFile = NULL
, the data will be returned in memory as a data frame.
If .outFile
is a character string giving a file name, the data will be saved to an Xdf file at that location, and a persistent Xdf data source will be returned.
To show how this works, we can modify pipeline 1 above to save its output to a persistent location. This resets the status of the pipeline, so that subsequent operations will know not to modify the data.
# pipeline 1 -- use .outFile to save the data output1 <- flightsXdf %>% mutate(delay=(arr_delay + dep_delay)/2, .outFile="output1.xdf") # use the output from pipeline 1 output2 <- output1 %>% group_by(carrier) %>% summarise(delay=mean(delay)) # reuse the output from pipeline 1 -- this works as expected output3 <- output1 %>% group_by(dest) %>% summarise(delay=mean(delay))
persist
verbThe second way of creating a persistent Xdf file is with the persist
verb. This is a simple function that moves or copies its input to the specified location. Like the .outFile
argument described above, it will also reset the status of the pipeline.
# pipeline 1 -- use persist to save the data output1 <- flightsXdf %>% mutate(delay=(arr_delay + dep_delay)/2) %>% persist("output1.xdf") # use the output from pipeline 1 output2 <- output1 %>% group_by(carrier) %>% summarise(delay=mean(delay)) # reuse the output from pipeline 1 -- this also works as expected output3 <- output1 %>% group_by(dest) %>% summarise(delay=mean(delay))
In general, setting .outFile
to save an Xdf file is preferred to using the persist
verb. You would use persist
if you have already run a pipeline, and want to save its output after the fact.
as_xdf
Another way to create a Xdf file from a tbl_xdf
is via as_xdf
, which was described previously. as_xdf
always creates a raw Xdf data source, and if called from a tbl_xdf
, has the effect of removing the tbl information. This does not modify the file at all, unlike persist
. The advantage of this is that it's fast (since the data is not touched); the disadvantage is that the file remains in the dplyrXdf temporary directory. Because of this, you still have to make a new copy if you want the data to remain after you quit your R session.
outputXdf <- as_xdf(output3) output3 # no longer a tbl_xdf outputXdf
A number of functions are supplied for manipulating Xdf files as files:
copy_xdf
and move_xdf
copy and move an Xdf file, optionally renaming it as well.rename_xdf
does a strict rename, ie without changing the file's location.delete_xdf
deletes the Xdf file.All of these except delete_xdf
return an Xdf data source object pointing to the new file's name/location. For example, we can use move_xdf
to move the output of as_xdf
, from above, to a more permanent location.
output3 <- move_xdf(output3, "d:/data/output3.xdf") output3
By default, dplyrXdf will save the files it creates into the R working directory. On some systems, this may be located on a filesystem that is relatively small; this is rarely an issue with open source R, as all its objects are loaded into memory, but can be problematic with large Xdf files. You can view the location of the current Xdf tbl directory with get_dplyrxdf_dir
:
get_dplyrxdf_dir()
Similarly, you can change the location of the Xdf tbl directory with the set_dplyrxdf_dir
function:
# set the tbl directory to a network drive (on Windows) set_dplyrxdf_dir("n:/Rtemp")
For best results, you should avoid setting the Xdf tbl directory to a remote location/network drive unless you have a fast network connection.
RevoScaleR actually supports two kinds of Xdf files: standard (what we've been using so far) and composite. A composite Xdf is actually a directory that contains multiple data and metadata files; the RevoScaleR functions know how to treat this as a single data source. Similarly, the dplyrXdf verbs can work transparently with both kinds of Xdfs, with the output of a verb being of the same kind as the input.
Xdf files in the native filesystem can be either standard or composite, but those in HDFS must be composite to work properly. By default, dplyrXdf will create a composite Xdfs if the input dataset is stored in HDFS, and standard otherwise. There are also as_standard_xdf
and as_composite_xdf
functions if you want to create a specific type directly.
If required, you can also use these functions to switch an Xdf file (including a tbl_xdf
) between standard and composite. Calling as_standard_xdf
on a composite Xdf file will convert it to standard format, and vice-versa for calling as_composite_xdf
on a standard Xdf.
compositeOutput3 <- as_composite_xdf(output3) compositeOutput3
delete_xdf(output3) delete_xdf(compositeOutput3)
In addition to the persist
verb described above, dplyrXdf adds new functionality for existing verbs, as well as some new single-table verbs.
subset
verbIn dplyr, subsetting data is handled by two verbs: filter
for subsetting by rows, and select
for subsetting by columns. This is fine for data frames, where everything runs in memory, and for SQL databases, where the hard work is done by the database. For Xdf files, however, this is suboptimal, as each verb translates into a separate I/O step where the data is read from disk, subsetted, then written out again. This can waste a lot of time with large datasets.
As it turns out, base R has a subset
generic which (as the name says) performs subsetting on both rows and columns. You've probably used it with data frames:
subset(flights, month <= 6 & day == 1, c(dep_time, dep_delay, carrier))
dplyrXdf implements a method for subset
that works for Xdf files. The code is exactly the same as for a data frame, except that it creates another Xdf file. This produces the same result as a filter
followed by a select
, but requires only half the amount of I/O.
flightsXdfSub <- subset(flightsXdf, month <= 6 & day == 1, c(dep_time, dep_delay, carrier)) class(flightsXdfSub) head(flightsXdfSub)
.rxArgs
The RevoScaleR functions typically have several arguments beyond those used by dplyrXdf verbs. While usually you don't need to touch these, it can sometimes be useful to do so. For example, when using mutate
or transmute
, you could specify more complicated transformations via a transformFunc
. Similarly, rather than chaining together a mutate
and a summarise
--- which would involve creating an intermediate file --- you could incorporate the variable transformation into the summarise
itself. More low-level uses of such arguments include setting the block size for an Xdf file, changing the compression level, limiting the number of rows, and so on.
Most of the dplyrXdf verbs accept a .rxArgs
argument as a way of transmitting these extra arguments to the underlying RevoScaleR code. This should be a named list specifying the names and values of the arguments to be passed. The exact arguments will vary depending on the RevoScaleR function that is called, which will in turn depend on the verb. Here is a list of the verbs and the RevoScaleR function that they call:
summarise
: depending on the method chosen, rxCube
or rxSummary
arrange
: rxSort
factorise
: depending on the data source, rxFactors
(for an Xdf) or rxImport
(for a non-Xdf file source)left_join
, right_join
et al): rxMerge
rxDataStep
Here are some examples to illustrate the use of .rxArgs
:
# subset, transform and summarise in the one step flightsSubsetSmry <- flightsXdf %>% group_by(day) %>% summarise(delay=mean(delay), n=n(), .rxArgs=list( transforms=list(delay=(dep_delay + arr_delay)/2), rowSelection=carrier == "UA" ) ) head(flightsSubsetSmry) # a complex transformation involving a transformFunc flightsTrans <- transmute(flightsXdf, .rxArgs=list( transformFunc=function(varlist) with(varlist, { delay <- (dep_delay + arr_delay)/2 date <- as.Date(sprintf("%d-%02d-%02d", year, month, day)) weekday <- weekdays(date) weekendDelay <- ifelse(weekday %in% c("Saturday", "Sunday"), delay, NA) list(delay=delay, weekday=weekday, weekendDelay=weekendDelay) }) ) ) head(flightsTrans) # fit a model using open source R, and then score the training dataset # we pass the model object via transformObjects, and the package to load # via transformPackages library(rpart) flightsModel <- rpart(arr_delay ~ dep_delay + carrier + hour, data=flights) flightsScores <- transmute(flightsXdf, pred=predict(model, data.frame(dep_delay, carrier, hour)), .rxArgs=list( transformObjects=list(model=flightsModel), transformPackages="rpart" ) ) head(flightsScores)
You should use .rxArgs
with caution, as some verbs modify the data as part of their normal functioning, so the results you get back may not be as expected. It's also easy to write convoluted code that makes your pipelines hard to read. Nevertheless, if you are working with very large datasets and speed is important, this is one way to improve the efficiency of your code.
summarise
For best performance, when using summarise
you should request only those summary statistics supported by rxCube
and/or rxSummary
: sum, mean, min, max, sd, var and n (the count of observations). If you request something else, dplyrXdf will split the dataset into multiple data frames, one per group, and call dplyr::summarise
on each data frame; this will generally work as intended, but may be slow.
The dplyrXdf version of summarise
can choose from a number of methods for computing the summary statistics. While it's usually smart enough to choose the best method, you can set this manually with the .method
argument, which takes a number from 1 to 5:
rxCube
rxSummary
rxSummary
but create the groups by concatenating the grouping variables together; this is to work around a limitation in the RevoScaleR functions on the maximum number of cells in a cubedplyr::summarise
on eachrxSummary
on eachOnly methods 1, 2 and 3 support the use of .rxArgs
.
In addition, dplyrXdf summarise
doesn't support expressions as summary statistics. For example, something like summarise(datasrc, weightedMean=sum(x*wt)/sum(wt))
works when datasrc
is a data frame, but not when it is an Xdf. To get the desired result, one workaround would be to use three verbs in a pipeline:
datasrc %>% mutate(xwt=sum(x*wt)) %>% summarise(xwt=sum(xwt), wt=sum(wt)) %>% mutate(weightedMean=xwt/wt)
In this particular case though, you could also use rxCube
's built-in pweight
argument to compute the weighted mean:
datasrc %>% summarise(weightedMean=mean(x), .rxArgs=list(pweight="wt"))
factorise
Many RevoScaleR functions are optimised to work best with factors, or require factors as input. dplyrXdf provides a simple shell to the rxFactors
function to convert non-factor variables to factors. The syntax is as follows:
factorise(data, x1, x2, ...)
where x1
, x2
, ... are the variables to convert. Note that the generated factor variables will overwrite the originals. For performance reasons, the levels of the generated factors are not sorted in alphabetical order. You can also specify the levels for the factor(s) in question, using the standard name=value syntax:
factorise(data, x1=c("a", "b", "c"))
This will convert the variable x1
into a factor with levels a
, b
and c
. Any values that don't match the specified levels will be turned into NAs.
The verbs in dplyrXdf will usually create factors on the fly as needed, so you shouldn't need to call factorise
very often. However, should you need it, factorise
provides an explicit way to create factors within the framework of dplyrXdf and pipelines.
There are a number of ways to specify the variables to convert, in addition to naming them explicitly. The functions all_character()
, all_numeric()
and all_integer()
will convert all the variables falling under these categories. A logical variable counts as integer for this purpose. You can also use the helper functions available to dplyr::select_vars
to choose variables based on their names.
By default, if no variables are specified in the factorise
call, then all character variables will be converted to factors. As with select
, renaming variables as part of the factor conversion is not supported.
do
and do_xdf
The do
verb is an exception to the rule that dplyrXdf
verbs write their output as Xdf files. This is because do
executes arbitrary R code, and can return arbitrary R objects; while a data frame is capable of storing these objects, an Xdf file is limited to character and numeric vectors only.
The doXdf
verb is similar to do
, but where do
splits its input into one data frame per group, do_xdf
splits it into one Xdf file per group. This allows do
-like functionality with grouped data, where each group can be arbitrarily large. The syntax for the two functions is essentially the same, although the code passed to do_xdf
must obviously know how to handle Xdfs.
# fit a regression model by carrier, using rxLinMod flightsMods <- flightsXdf %>% group_by(carrier) %>% do_xdf(model=rxLinMod(arr_delay ~ dep_delay + hour, data=.)) flightsMods$model[[1]]
dplyrXdf takes a fairly literal approach to implementing the split-apply-combine strategy for grouped data. Most verbs that have specific methods for groups will split the data into multiple Xdf files, and then process each file separately (the exception is summarise
, because rxCube
and rxSummary
can deal with groups natively). This makes it easy to parallelise the processing of groups in dplyrXdf.
dplyrXdf will automatically use the RevoScaleR compute context that you set via rxSetComputeContext()
. For example, if you set the compute context to RxLocalParallel()
, dplyrXdf will create a cluster of slave nodes in the background and send the data to the nodes by group. The cluster is destroyed when the verb returns.
For more flexibility, you can manually create a cluster and set the compute context to RxForeachDoPar()
. This has a number of benefits. First, the cluster is persistent, so no time is wasted in creating and then destroying it each time you want to work with groups. Second, and perhaps more importantly, the RxForeachDoPar
compute context is agnostic as to the nature of the underlying cluster. In particular, you can create a cluster of multiple (virtual) machines, which will be much more scalable than a cluster of multiple processes on a single machine. The main requirements are that you must be able to register the cluster with the foreach package; and all the machines must have access to the same filesystem, such as a network drive or file share.
The code below gives an example of creating a cluster of VMs, and then running dplyrXdf code on it. This uses the doAzureParallel package, which is an easy-to-use way to create clusters of Azure Data Science VMs. However, you could use any other package to create the cluster (such as the parallel package that comes with open source R; SNOW; Rmpi; etc).
library(doAzureParallel) # doAzureParallel uses json files for cluster setup; see the documentation on GitHub for details setCredentials("credentials.json") # create and register a cluster of VMs in Azure cl <- makeCluster("cluster.json") registerDoAzureParallel(cl) # tell dplyrXdf to use our cluster rxSetComputeContext("dopar") # set the dplyrXdf working directory to a cluster-accessible location set_dplyrxdf_dir("n:/clusterdata") # fit models in parallel across the cluster flightsXdf %>% group_by(carrier) %>% do(model=lm(arr_delay ~ dep_delay + hour, data=.)) # shut down the cluster when we're done stopCluster(cl) rxSetComputeContext("local")
Note that the above assumes your data is stored on the native filesystem (either on your machine's local hard disk, or in a network share). If your data is stored in a Hadoop or Spark cluster, dplyrXdf will take advantage of the Hadoop and Spark compute contexts to process data in parallel on the worker nodes. See the HDFS vignette for more information on working with Xdf files in Spark.
dplyrXdf includes a number of convenience functions for exporting data to data frames:
as.data.frame
method for Xdf files (and other RevoScaleR data sources).pull
, $
and [[
that will extract a given column as a vector in memory.collect
and compute
methods, while meant for downloading data sources from HDFS, can also be used to export to data frames.These are simple wrappers around RevoScaleR's rxDataStep
function that turn off the default size check. Make sure you have enough memory to hold the data before using them!
head(flightsXdf$arr_delay) head(pull(flightsXdf, arr_delay)) # same as above
unlink(c("flights.xdf", "output1.xdf")) clean_dplyrxdf_dir("native") rxSetComputeContext(cc)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.