knitr::opts_chunk$set(collapse=TRUE, comment="#>")
options(dplyr.print_min=5L, dplyr.print_max=5L)

cc <- rxSetComputeContext("local")

Introduction

The dplyrXdf package is a suite of tools to facilitate working with Microsoft R Server (MRS). Its features include:

Note that dplyrXdf is a shell on top of the existing functions provided by Microsoft R Server, which is a commercial distribution of R. You must have MRS installed to make use of dplyrXdf. In particular, Microsoft R Open, the free distribution of R from Microsoft, does not include support for Xdf files.

A sample dplyrXdf pipeline

For this example we'll use the flights dataset from the nycflights13 package. This is one of the datasets used in the dplyr vignettes, and crops up in many other places besides.

library(dplyrXdf)  # also loads dplyr
library(nycflights13)

# write the data as an xdf file
flightsXdf <- rxDataStep(flights, "flights.xdf", overwrite=TRUE)

Consider a simple task: get the average delay and total distance covered (in kilometers) in the first half of 2013, by carrier, sorted by descending delay. This isn't very complicated, conceptually speaking: we want to do a row selection, then some basic transformations, followed by a summary, and then order the output by one of the columns.

This translates into the following sequence of RevoScaleR function calls:

# select the rows
flights_rx1 <- rxDataStep(flightsXdf, outFile="flights_rx1.xdf",
                          rowSelection=month <= 6 & year == 2013,
                          overwrite=TRUE)

# variable transformations
flights_rx2 <- rxDataStep(flights_rx1, outFile="flights_rx2.xdf",
                          transforms=list(dist_km=distance*1.6093,
                                          delay=(arr_delay + dep_delay)/2),
                          overwrite=TRUE)

# convert carrier into a factor variable (or rxSummary will complain)
flights_rx3 <- rxFactors(flights_rx2, factorInfo="carrier",
                         outFile="flights_rx3.xdf", overwrite=TRUE)

# use rxSummary to get the summary table(s) (could also use rxCube twice)
flights_rx4 <- rxSummary(~delay:carrier + dist_km:carrier, data=flights_rx3,
                         summaryStats=c("mean", "sum"))

# extract the desired tables from the rxSummary output
flights_rx4_1 <- flights_rx4$categorical[[1]][c("carrier", "Means")]
names(flights_rx4_1)[2] <- "mean_delay"

flights_rx4_2 <- flights_rx4$categorical[[2]][c("carrier", "Sum")]
names(flights_rx4_2)[2] <- "sum_dist"

# merge the tables together
flights_rx5 <- merge(flights_rx4_1, flights_rx4_2, by="carrier", all=TRUE)

# sort the results
flights_rx5 <- flights_rx5[order(flights_rx5$mean_delay, decreasing=TRUE), ]

head(flights_rx5)

The equivalent in dplyrXdf would be the following pipeline:

flightsSmry <- flightsXdf %>%
    filter(month <= 6, year == 2013) %>%
    mutate(dist_km=distance*1.6093, delay=(arr_delay + dep_delay)/2) %>%
    group_by(carrier) %>%
    summarise(mean_delay=mean(delay), sum_dist=sum(dist_km)) %>%
    arrange(desc(mean_delay))

head(flightsSmry)

Even with this relatively straightforward example, dplyrXdf hides the complexity of calling RevoScaleR functions while retaining their power. In particular, note the following:

Here are a couple of additional features that may not be immediately apparent from the code above:

The output from a dplyrXdf pipeline can be used directly by other RevoScaleR and MicrosoftML functions, such as those for statistical model fitting and machine learning. For example, we could fit a simple linear regression to the output from the above pipeline, using rxLinMod:

rxLinMod(mean_delay ~ sum_dist + carrier, data=flightsSmry)

Basic functionality

Single-table verbs

dplyrXdf supports all the basic dplyr single-table verbs:

Under the hood, they work by translating your pipeline into calls to the base RevoScaleR functions for working with xdf files: for example, mutate calls rxDataStep to compute transformations; arrange calls rxSort, and so on.

These verbs work exactly as they do in dplyr. Thus if you know how to use dplyr, then you also know how to use the bulk of dplyrXdf.

Two-table verbs

dplyrXdf supports all the table-join verbs from dplyr, except for the set operations intersect, setdiff and setequal:

The syntax is again the same as for the dplyr versions, including joining on non-matching column names. The underlying implementation uses rxMerge with the appropriate arguments for each type of join.

For example, one of the joins in the dplyr two-table verbs vignette joins the flights table with the airports table, using the columns dest (in flights) and faa (in airports). The same code in dplyr also works in dplyrXdf:

airportsXdf <- rxDataStep(airports, "airports.xdf", overwrite=TRUE)
flightsJoin <- left_join(
    flightsXdf %>% select(year:day, hour, origin, dest, tailnum, carrier),
    airportsXdf,
    by=c("dest"="faa"))
head(flightsJoin)

Utility functions

In addition to the above verbs, dplyrXdf provides a number of functions to ease working with Xdf data sources, both in the native filesystem and in HDFS.

Working with HDFS, Hadoop and Spark

If you are working on a Hadoop or Spark cluster, dplyrXdf supports data stored both in the native filesystem and in HDFS. Most verbs will work transparently with datasets stored in either filesystem; in general, the output of a verb will be created in the same filesystem as the input. Similarly, dplyrXdf can handle R sessions both on the cluster edge node, and from a remote client, without any major change in functionality (the exception is sparklyr interoperability; see below).

The following functions are available to ease the task of file management on HDFS:

In addition, dplyrXdf has the ability to call the sparklyr package to run Spark-native pipelines on Hive tables, without having to import the data to Xdf format. This can provide efficiency gains, since it reduces the amount of I/O. However, this is only supported if your R session is taking place on the edge node (not a remote session).

Working with SQL Server

One of the key strengths of dplyr is its ability to interoperate with SQL databases. Given a database table as input, dplyr can translate the verbs in a pipeline into a SQL query which is then execute in the database. For large tables, this can often be much more efficient than importing the data and running them locally.

If the input to a pipeline is an RxOdbcData, RxTeradata or RxSqlServerData source, dplyrXdf will convert it to a dplyr tbl, and the subsequent pipeline will then be run in the database, rather than the data being imported to the local R session. This does require a compatible dplyr backend for the database in question; at the time of writing (September 2017) the dbplyr package includes such a backend for SQL Server, and one for Teradata is in development.

Conclusion

This article has been a quick executive-summary introduction to dplyrXdf. If you have any suggestions on features to add (including bits of dplyr that have been left out) or bugs that need fixing, please file an issue on the repo, or contact me at hongooi@microsoft.com.

unlink(c("airports.xdf", "flights.xdf", "flights_rx1.xdf", "flights_rx2.xdf", "flights_rx3.xdf"))
clean_dplyrxdf_dir("native")
rxSetComputeContext(cc)


RevolutionAnalytics/dplyrXdf documentation built on June 3, 2019, 9:08 p.m.