knitr::opts_chunk$set(collapse=TRUE, comment="#>") options(dplyr.print_min=5L, dplyr.print_max=5L) cc <- rxSetComputeContext("local")
The dplyrXdf package is a suite of tools to facilitate working with Microsoft R Server (MRS). Its features include:
A backend to the popular dplyr package for the Xdf file format. Xdf files are a technology provided by MRS to break R's memory barrier: instead of keeping data in-memory in data frames, it is saved on disk. The data is then processed in chunks, so that you only need enough memory to handle each chunk.
Interfaces to Microsoft SQL Server and HDInsight Hadoop and Spark clusters. dplyrXdf, in conjunction with dplyr, provides the ability to execute pipelines natively in-database and in-cluster, which for large datasets can be much more efficient than executing them locally.
Several functions to ease working with Xdf files, including functions for file management and for transferring data to and from remote backends.
Workarounds for various glitches and unexpected behaviour in MRS.
Note that dplyrXdf is a shell on top of the existing functions provided by Microsoft R Server, which is a commercial distribution of R. You must have MRS installed to make use of dplyrXdf. In particular, Microsoft R Open, the free distribution of R from Microsoft, does not include support for Xdf files.
For this example we'll use the flights dataset from the nycflights13 package. This is one of the datasets used in the dplyr vignettes, and crops up in many other places besides.
library(dplyrXdf) # also loads dplyr library(nycflights13) # write the data as an xdf file flightsXdf <- rxDataStep(flights, "flights.xdf", overwrite=TRUE)
Consider a simple task: get the average delay and total distance covered (in kilometers) in the first half of 2013, by carrier, sorted by descending delay. This isn't very complicated, conceptually speaking: we want to do a row selection, then some basic transformations, followed by a summary, and then order the output by one of the columns.
This translates into the following sequence of RevoScaleR function calls:
# select the rows flights_rx1 <- rxDataStep(flightsXdf, outFile="flights_rx1.xdf", rowSelection=month <= 6 & year == 2013, overwrite=TRUE) # variable transformations flights_rx2 <- rxDataStep(flights_rx1, outFile="flights_rx2.xdf", transforms=list(dist_km=distance*1.6093, delay=(arr_delay + dep_delay)/2), overwrite=TRUE) # convert carrier into a factor variable (or rxSummary will complain) flights_rx3 <- rxFactors(flights_rx2, factorInfo="carrier", outFile="flights_rx3.xdf", overwrite=TRUE) # use rxSummary to get the summary table(s) (could also use rxCube twice) flights_rx4 <- rxSummary(~delay:carrier + dist_km:carrier, data=flights_rx3, summaryStats=c("mean", "sum")) # extract the desired tables from the rxSummary output flights_rx4_1 <- flights_rx4$categorical[[1]][c("carrier", "Means")] names(flights_rx4_1)[2] <- "mean_delay" flights_rx4_2 <- flights_rx4$categorical[[2]][c("carrier", "Sum")] names(flights_rx4_2)[2] <- "sum_dist" # merge the tables together flights_rx5 <- merge(flights_rx4_1, flights_rx4_2, by="carrier", all=TRUE) # sort the results flights_rx5 <- flights_rx5[order(flights_rx5$mean_delay, decreasing=TRUE), ] head(flights_rx5)
The equivalent in dplyrXdf would be the following pipeline:
flightsSmry <- flightsXdf %>% filter(month <= 6, year == 2013) %>% mutate(dist_km=distance*1.6093, delay=(arr_delay + dep_delay)/2) %>% group_by(carrier) %>% summarise(mean_delay=mean(delay), sum_dist=sum(dist_km)) %>% arrange(desc(mean_delay)) head(flightsSmry)
Even with this relatively straightforward example, dplyrXdf hides the complexity of calling RevoScaleR functions while retaining their power. In particular, note the following:
The base RevoScaleR functions require you to specify the location for the output xdf file. It's very common to end up with many different versions of your data scattered around the filesystem, causing reproducibility problems and making it hard to keep track of changes. By contrast, the dplyrXdf pipeline has no filenames. This is because dplyrXdf handles the task of file management, so that you can focus on the data itself.
The summarise
verb is much simpler to work with than the RevoScaleR rxSummary
function. It doesn't require scanning through a list of output objects to find the information you're after, and it accepts grouping variables of any type (numeric, character or factor). (The alternative summarisation tool, rxCube
, is simpler but has its own limitations: for example, it supports a much narrower range of summary statistics, and can only compute one type of statistic at a time.)
The pipeline operator, along with the design of the verbs, makes it clear at a glance what is the sequence of operations being carried out. This is one of the major benefits of dplyr, and can now also be used with Xdf files.
Here are a couple of additional features that may not be immediately apparent from the code above:
The verbs in dplyrXdf all read from xdf files and write to xdf files. This preserves the ability to handle large datasets without running out of memory.
The source xdf file to a dplyrXdf pipeline is never modified. This provides a measure of security, so that even if there are bugs in your code, your original data is safe.
The output from a dplyrXdf pipeline can be used directly by other RevoScaleR and MicrosoftML functions, such as those for statistical model fitting and machine learning. For example, we could fit a simple linear regression to the output from the above pipeline, using rxLinMod
:
rxLinMod(mean_delay ~ sum_dist + carrier, data=flightsSmry)
dplyrXdf supports all the basic dplyr single-table verbs:
filter
and select
to choose rows and columnsmutate
and transmute
to do data transformationgroup_by
to define groupssummarise
and do
to carry out computations on grouped dataarrange
to sort by variablesrename
to rename columnsdistinct
to drop duplicatesUnder the hood, they work by translating your pipeline into calls to the base RevoScaleR functions for working with xdf files: for example, mutate
calls rxDataStep
to compute transformations; arrange
calls rxSort
, and so on.
These verbs work exactly as they do in dplyr. Thus if you know how to use dplyr, then you also know how to use the bulk of dplyrXdf.
dplyrXdf supports all the table-join verbs from dplyr, except for the set operations intersect
, setdiff
and setequal
:
left_join
, right_join
, inner_join
and full_join
anti_join
and semi_join
union
and union_all
The syntax is again the same as for the dplyr versions, including joining on non-matching column names. The underlying implementation uses rxMerge
with the appropriate arguments for each type of join.
For example, one of the joins in the dplyr two-table verbs vignette joins the flights table with the airports table, using the columns dest
(in flights) and faa
(in airports). The same code in dplyr also works in dplyrXdf:
airportsXdf <- rxDataStep(airports, "airports.xdf", overwrite=TRUE) flightsJoin <- left_join( flightsXdf %>% select(year:day, hour, origin, dest, tailnum, carrier), airportsXdf, by=c("dest"="faa")) head(flightsJoin)
In addition to the above verbs, dplyrXdf provides a number of functions to ease working with Xdf data sources, both in the native filesystem and in HDFS.
cbind
and rbind
combine Xdf filesas.data.frame
converts an Xdf file into a data frame[[
, $
and pull
extract a column into memorysample_frac
and sample_n
do random samplingcopy_xdf
, move_xdf
, rename_xdf
and delete_xdf
copy, move, rename and delete an Xdf fileis_xdf
and is_composite_xdf
return whether an object is a (composite) Xdfas_xdf
imports a data source or data frame into an Xdf file, optionally as composite; as_composite_xdf
and as_standard_xdf
are shortcuts for creating a composite and standard Xdf file respectivelyIf you are working on a Hadoop or Spark cluster, dplyrXdf supports data stored both in the native filesystem and in HDFS. Most verbs will work transparently with datasets stored in either filesystem; in general, the output of a verb will be created in the same filesystem as the input. Similarly, dplyrXdf can handle R sessions both on the cluster edge node, and from a remote client, without any major change in functionality (the exception is sparklyr interoperability; see below).
The following functions are available to ease the task of file management on HDFS:
copy_to
uploads a dataset to HDFS, saving it as Xdfcollect
and compute
do the opposite, downloading an Xdf file from HDFShdfs_upload
and hdfs_download
transfer arbitrary files to and from HDFShdfs_dir
lists files in a HDFS directoryhdfs_dir_exists
and hdfs_file_exists
test for directory/file existencehdfs_dir_create
and hdfs_dir_remove
create and delete directorieshdfs_file_copy
, hdfs_file_move
, hdfs_file_rename
and hdfs_file_remove
copy, move, rename and delete fileshdfs_expunge
empties the HDFS trashin_hdfs
returns whether a dataset is in HDFSlocal_exec
runs an expression in the local compute contextIn addition, dplyrXdf has the ability to call the sparklyr package to run Spark-native pipelines on Hive tables, without having to import the data to Xdf format. This can provide efficiency gains, since it reduces the amount of I/O. However, this is only supported if your R session is taking place on the edge node (not a remote session).
One of the key strengths of dplyr is its ability to interoperate with SQL databases. Given a database table as input, dplyr can translate the verbs in a pipeline into a SQL query which is then execute in the database. For large tables, this can often be much more efficient than importing the data and running them locally.
If the input to a pipeline is an RxOdbcData
, RxTeradata
or RxSqlServerData
source, dplyrXdf will convert it to a dplyr tbl, and the subsequent pipeline will then be run in the database, rather than the data being imported to the local R session. This does require a compatible dplyr backend for the database in question; at the time of writing (September 2017) the dbplyr package includes such a backend for SQL Server, and one for Teradata is in development.
This article has been a quick executive-summary introduction to dplyrXdf. If you have any suggestions on features to add (including bits of dplyr that have been left out) or bugs that need fixing, please file an issue on the repo, or contact me at hongooi@microsoft.com.
unlink(c("airports.xdf", "flights.xdf", "flights_rx1.xdf", "flights_rx2.xdf", "flights_rx3.xdf")) clean_dplyrxdf_dir("native") rxSetComputeContext(cc)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.