knitr::opts_chunk$set(collapse=TRUE, comment="#>")
options(dplyr.print_min=5L, dplyr.print_max=5L)

dplyrXdf:::detectHdfsConnection()

Microsoft R Server includes the ability to work with Hadoop and Spark clusters. As part of this, it also lets you work with datasets, in particular Xdf files, that are stored in HDFS.

As of version 1.0, dplyrXdf also supports Xdf files in HDFS. Most verbs will work exactly the same as with data in the native filesystem, and there are also a number of utility functions to ease working with Hadoop and Spark. You will need to have Microsoft R Server version 9.0 to use dplyrXdf with HDFS, and ideally version 9.1 for full functionality.

Basics: Hadoop, Spark and HDFS

If you're used to dplyr and how it handles databases, Microsoft R Server and dplyrXdf work a little differently. What dplyr calls a "src" (a remote data source) is handled by two related concepts in MRS: a compute context and a filesystem. The compute context determines where computations take place: on a local machine, in a database like SQL Server, or in a Hadoop or Spark cluster. The filesystem is where the data files are stored. Note that not all data sources have a filesystem: eg a SQL Server table is part of a database, not a filesystem as such.

In general, there are only two filesystems you should have to deal with: the native filesystem, meaning the hard disk of the machine where your R session is running, and HDFS. To MRS and dplyrXdf, an Xdf file is much the same regardless of which filesystem it's in (there are some limitations for HDFS files, but they are the exception, not the rule). You can use the same code and pipelines without having to worry about filesystem details.

The compute context is important because it's how you benefit from the parallelism provided by Hadoop and Spark. Note that it doesn't change the way you write your code -- only how it's executed.

For more information about working with HDFS, Hadoop and Spark, see the documentation at Microsoft Docs, or the Microsoft Learn Analytics course materials on Spark.

Working with a cluster

To use dplyrXdf on a cluster, you'll have to install it, and dplyr, on the machine where you'll be running your R sessions. This can be either the cluster edge node, or if you're connecting remotely, on your remote machine. For full functionality, you should also install dplyr on the cluster worker nodes. You don't need to install dplyrXdf on the worker nodes, though.

Your dplyrXdf code should work the same regardless of whether you're running on the edge node or a remote client. The package abstracts away the underlying differences between the two scenarios, so that from your point of view, you're simply connected to a cluster somewhere in the cloud. Similarly, dplyrXdf should work with all flavours of Spark and Hadoop that MRS supports (HDInsight, Cloudera, Hortonworks and MapR).

Uploading and downloading

There are a number of ways to get your data into and out of HDFS. Note that there are several data formats in use in Hadoop and Spark environments; dplyrXdf focuses mainly on Xdf files.

To copy a dataset from the native filesystem into HDFS, use copy_to. This is the standard dplyr verb for copying data into a remote src. In this case, the src is a filesystem, which RevoScaleR represents via a RxHdfsFileSystem object:

library(dplyrXdf)
library(nycflights13)

hd <- RxHdfsFileSystem()

# copy a data frame into an Xdf file in HDFS
flightsHd <- copy_to(hd, flights)

flightsHd

as_data_frame(flightsHd)

This will copy the flights table into HDFS, returning an Xdf data source object pointing to the file. If the path isn't specified, the data is saved as an Xdf file in your HDFS home directory, normally /user/<username>.

You can also use copy_to_hdfs, which is a shortcut that saves having to create an explicit filesystem object:

# same as above
flightsHd <- copy_to_hdfs(flights)

To download an Xdf file from HDFS to the native filesystem, use collect and compute. Again, these are the standard dplyr verbs for copying data from a remote src. For dplyrXdf, the difference between collect and compute is that the former by default will return a data frame, whereas the latter will save the copied data in an Xdf file and return a tbl_xdf object pointing to that file.

flightsLocal <- compute(flightsHd)

flightsLocal

as_data_frame(flightsLocal)

If you want to look at the first few rows of an Xdf file, it may be faster to use compute to copy the entire file off HDFS, and then run head, than to run head on the original. This is due to quirks in how RevoScaleR works in Spark and Hadoop.

copy_to, collect and compute are for copying datasets (R objects, including data frames and Xdf files) to and from HDFS. To transfer arbitrary files and directories, dplyrXdf also provides the hdfs_upload and hdfs_download functions. If you've used the base R function download.file or a command-line file transfer utility like ftp or pscp, the syntax should be familiar: hdfs_download(src, dest) downloads the file at src to the location dest, while hdfs_upload(src, dest) uploads src to dest.

# create a csv file and upload it
write.csv(flights, "flights.csv", row.names=FALSE)
hdfs_upload("flights.csv", "/tmp")

If you uploaded a non-Xdf data source (like a csv file), you can then import it into Xdf format with as_xdf:

flightsCsv <- RxTextData("/tmp/flights.csv", fileSystem=RxHdfsFileSystem())
flightsHd2 <- as_xdf(flightsCsv, "flights2")

as_data_frame(flightsHd2)

Note on composite datasets

Note that there are two kinds of Xdf files: standard and composite. A composite Xdf file is actually a directory containing multiple data and metadata files. The RevoScaleR functions can treat a composite Xdf as a single dataset, and so can dplyrXdf. Xdf files in HDFS must be composite in order to work properly; by default, copy_to will convert an existing Xdf file into composite, if it's not already in that format.

Sparklyr interoperability and Hive tables

If your R session is on the edge node, RevoScaleR provides the ability to share a Spark connection with the sparklyr package from RStudio. You do this via the interop argument to rxSparkConnect:

# establishes a Spark connection that can be shared with sparklyr
rxSparkConnect(..., interop="sparklyr")

dplyrXdf can take advantage of this interoperability when working with Hive tables. To see how this works, let's use the hivesampledata table which is provided as part of every Azure HDInsight cluster:

cc <- rxSparkConnect(interop="sparklyr")

sampleHiv <- RxHiveData(table="hivesampletable")
head(sampleHiv, 5)
#>   clientid querytime market deviceplatform devicemake devicemodel        state
#> 1        8  18:54:20  en-US        Android    Samsung    SCH-i500   California
#> 2       23  19:19:44  en-US        Android        HTC  Incredible Pennsylvania
#> 3       23  19:19:46  en-US        Android        HTC  Incredible Pennsylvania
#> 4       23  19:19:47  en-US        Android        HTC  Incredible Pennsylvania
#> 5       28  01:37:50  en-US        Android   Motorola     Droid X     Colorado
#>         country querydwelltime sessionid sessionpagevieworder
#> 1 United States      13.920401         0                    0
#> 2 United States             NA         0                    0
#> 3 United States       1.475742         0                    1
#> 4 United States       0.245968         0                    2
#> 5 United States      20.309534         1                    1

sampleHiv %>%
    filter(deviceplatform == "Android") %>%
    group_by(devicemake) %>%
    summarise(n=n()) %>%
    head()
#> # Source:     lazy query [?? x 2]
#> # Database:   spark_connection
#>     devicemake     n
#>          <chr> <dbl>
#> 1        Casio   996
#> 2     Motorola  1524
#> 3       Huawei   230
#> 4 SonyEricsson   139
#> 5      Kyocera   117
#> 6      Unknown  2133

In this example, dplyrXdf has converted the RxHiveData source into a sparklyr tbl, and the subsequent pipeline will be run by sparklyr (not by dplyrXdf) in the cluster.

If you don't want to use sparklyr -- for example, if you want to take advantage of MRS features like a transformFunc to execute arbitrary R code -- you can import the Hive table to an Xdf file in HDFS. The pipeline will then execute according to your compute context. This will probably still be in-cluster, since you must be in the Spark compute context to access Hive tables. However, executing the pipeline via sparklyr is likely to be more efficient, since it removes the need to import the data.

# this will create the composite Xdf 'samplehivetable' in the HDFS user directory
sampleXdf <- as_xdf(sampleHiv)

sampleXdf %>%
    filter(deviceplatform == "Android") %>%
    group_by(devicemake) %>%
    summarise(n=n(), .method=4) %>%
    head()
#>     devicemake     n
#> 1       Archos     1
#> 2      Samsung 16244
#> 3      Unknown  2133
#> 4      Kyocera   117
#> 5          HTC  2242
#> 6 SonyEricsson   139

Working with files and directories

dplyrXdf provides the following functions to let you manipulate files and directories in HDFS. By and large they wrap similar functions provided by the RevoScaleR package, which in turn call various Hadoop filesystem commands.

# create a new directory
hdfs_dir_create("/tmp/mydata")

# check that it exists
hdfs_dir_exists("/tmp/mydata")

# copy files into the new directory
hdfs_file_copy("flights", "/tmp/mydata")

# create a new data source
flightsHd3 <- RxXdfData("/tmp/mydata/flights", fileSystem=RxHdfsFileSystem())

# read the data
names(flightsHd3)

Azure Data Lake Store support

As with RevoScaleR, dplyrXdf supports HDFS data located in attached ADLS storage. When working with Xdf files, you will normally specify the HDFS filesystem as part of the fileSystem argument. The standard dplyr/dplyrXdf verbs can both read and write to ADLS, with the location of the file being essentially transparent to the user.

adlFs <- RxHdfsFileSystem(hostName="adl://mycluster.azuredatalakestore.net")
adlXdf <- RxXdfData("/path/to/file", fileSystem=adlFs)

# adlXdf %>% verb1 %>% verb2 %>% ...

The hdfs_* functions include a host argument to let you specify which HDFS filesystem to access: either the "native" filesystem, or ADLS. The default will be whichever filesystem is currently being used. For example the following code uses hdfs_dir to list the files in the root directory, first for the native HDFS filesystem, and then for an attached ADLS filesystem.

hdfs_dir("/", host="default")
#> Directory listing of /
#>  [1] "HdiNotebooks"             "HdiSamples"
#>  [3] "ams"                      "amshbase"
#>  [5] "app-logs"                 "apps"
#>  [7] "atshistory"               "cluster-info"
#>  [9] "custom-scriptaction-logs" "example"
#> [11] "hbase"                    "hdp"
#> [13] "hive"                     "mapred"
#> [15] "mr-history"               "tmp"
#> [17] "user"

hdfs_dir("/", host="adl://mycluster.azuredatalakestore.net")
#> Directory listing of adl://mycluster.azuredatalakestore.net/
#> [1] "clusters"         "file31bc3ee52d32" "file31bc79a3de7"  "file5d301ef514e5"
#> [5] "folder1"          "tmp"              "user"

You can also provide a full URI as a path:

hdfs_dir("adl://mycluster.azuredatalakestore.net/")
#> Directory listing of adl://mycluster.azuredatalakestore.net/
#> [1] "clusters"         "file31bc3ee52d32" "file31bc79a3de7"  "file5d301ef514e5"
#> [5] "folder1"          "tmp"              "user"

The hdfs_host function returns the name of the currently active HDFS filesystem host, or if an Xdf data source is supplied, the host for that data source:

hdfs_host()
#> [1] "adl://mycluster.azuredatalakestore.net"

xdf <- RxXdfData("file", fileSystem=RxHdfsFileSystem(hostName="default"))
hdfs_host(xdf)
#> [1] "default"

Miscellaneous functions

The in_hdfs function returns whether a given data source is stored in HDFS.

in_hdfs(flightsHd)

# also works with non-Revo data sources, like data frames
in_hdfs(iris)

in_hdfs(flights)

The local_exec function runs an expression in the local compute context. This can be useful if you need to work with local datasets while connected to a remote cluster. By default, RevoScaleR functions will throw an error if you provide a local data source as an input when you are in the Hadoop or Spark compute context. local_exec temporarily changes to the local compute context, runs your code, and then changes back to the original context.

# try to access a local Xdf file
names(flightsLocal)

local_exec(names(flightsLocal))

Limitations and glitches

Most of the single-table dplyr verbs supported by dplyrXdf will work for datasets in HDFS. The main exceptions are:

Support for two-table verbs is similarly more limited for datasets in HDFS than in the native filesystem. First, only the Spark compute context supports joining (not Hadoop), and only for Xdf data sources and Spark data sources (RxHiveData, RxOrcData and RxParquetData). Only the "standard" joins (left_join, right_join, inner_join and full_join) are supported.

The RevoScaleR functions do not recognise tbl_xdf objects, which are automatically generated by dplyrXdf, when in the Spark or Hadoop compute context. To remedy this, wrap any such object inside as_xdf() before calling a Revo function:

rxLinMod(y ~ x1 + x2 + ..., data=as_xdf(tbl))

Finally, the Xdf data sources created by dplyrXdf will always contain absolute HDFS paths (that is, of the form "/path/to/file" rather than "file". This is because of limitations in RevoScaleR support for relative paths in HDFS. The data source inputs to dplyrXdf verbs can contain relative paths; these will be internally translated to absolute.

hdfs_dir_remove(c("flights", "flights2", "/tmp/mydata", "/tmp/flights.csv"))
clean_dplyrxdf_dir("hdfs")
clean_dplyrxdf_dir("native")


RevolutionAnalytics/dplyrXdf documentation built on June 3, 2019, 9:08 p.m.