knitr::opts_chunk$set(collapse=TRUE, comment="#>") options(dplyr.print_min=5L, dplyr.print_max=5L) dplyrXdf:::detectHdfsConnection()
Microsoft R Server includes the ability to work with Hadoop and Spark clusters. As part of this, it also lets you work with datasets, in particular Xdf files, that are stored in HDFS.
As of version 1.0, dplyrXdf also supports Xdf files in HDFS. Most verbs will work exactly the same as with data in the native filesystem, and there are also a number of utility functions to ease working with Hadoop and Spark. You will need to have Microsoft R Server version 9.0 to use dplyrXdf with HDFS, and ideally version 9.1 for full functionality.
If you're used to dplyr and how it handles databases, Microsoft R Server and dplyrXdf work a little differently. What dplyr calls a "src" (a remote data source) is handled by two related concepts in MRS: a compute context and a filesystem. The compute context determines where computations take place: on a local machine, in a database like SQL Server, or in a Hadoop or Spark cluster. The filesystem is where the data files are stored. Note that not all data sources have a filesystem: eg a SQL Server table is part of a database, not a filesystem as such.
In general, there are only two filesystems you should have to deal with: the native filesystem, meaning the hard disk of the machine where your R session is running, and HDFS. To MRS and dplyrXdf, an Xdf file is much the same regardless of which filesystem it's in (there are some limitations for HDFS files, but they are the exception, not the rule). You can use the same code and pipelines without having to worry about filesystem details.
The compute context is important because it's how you benefit from the parallelism provided by Hadoop and Spark. Note that it doesn't change the way you write your code -- only how it's executed.
If you are in the RxHadoopMR
or RxSpark
compute context, and your data is in HDFS, then the RevoScaleR functions will execute in parallel on the cluster worker nodes. You can do this in an R session either on a remote client, or on the cluster edge node.
If you are in the local compute context, and your R session is running on the cluster edge node, you can still access data in HDFS. In this case, the data is streamed to the edge node for processing -- essentially, RevoScaleR will treat HDFS as simply a big local hard disk. (This will not work if you are on a remote client; in this case, you must be in the Hadoop or Spark compute context to access the cluster.)
If you are in the RxHadoopMR
or RxSpark
compute context and your data is not in HDFS, then the RevoScaleR functions will (usually) throw a warning or error.
For more information about working with HDFS, Hadoop and Spark, see the documentation at Microsoft Docs, or the Microsoft Learn Analytics course materials on Spark.
To use dplyrXdf on a cluster, you'll have to install it, and dplyr, on the machine where you'll be running your R sessions. This can be either the cluster edge node, or if you're connecting remotely, on your remote machine. For full functionality, you should also install dplyr on the cluster worker nodes. You don't need to install dplyrXdf on the worker nodes, though.
Your dplyrXdf code should work the same regardless of whether you're running on the edge node or a remote client. The package abstracts away the underlying differences between the two scenarios, so that from your point of view, you're simply connected to a cluster somewhere in the cloud. Similarly, dplyrXdf should work with all flavours of Spark and Hadoop that MRS supports (HDInsight, Cloudera, Hortonworks and MapR).
There are a number of ways to get your data into and out of HDFS. Note that there are several data formats in use in Hadoop and Spark environments; dplyrXdf focuses mainly on Xdf files.
To copy a dataset from the native filesystem into HDFS, use copy_to
. This is the standard dplyr verb for copying data into a remote src. In this case, the src is a filesystem, which RevoScaleR represents via a RxHdfsFileSystem
object:
library(dplyrXdf) library(nycflights13) hd <- RxHdfsFileSystem() # copy a data frame into an Xdf file in HDFS flightsHd <- copy_to(hd, flights) flightsHd as_data_frame(flightsHd)
This will copy the flights table into HDFS, returning an Xdf data source object pointing to the file. If the path isn't specified, the data is saved as an Xdf file in your HDFS home directory, normally /user/<username>
.
You can also use copy_to_hdfs
, which is a shortcut that saves having to create an explicit filesystem object:
# same as above flightsHd <- copy_to_hdfs(flights)
To download an Xdf file from HDFS to the native filesystem, use collect
and compute
. Again, these are the standard dplyr verbs for copying data from a remote src. For dplyrXdf, the difference between collect
and compute
is that the former by default will return a data frame, whereas the latter will save the copied data in an Xdf file and return a tbl_xdf
object pointing to that file.
flightsLocal <- compute(flightsHd) flightsLocal as_data_frame(flightsLocal)
If you want to look at the first few rows of an Xdf file, it may be faster to use compute
to copy the entire file off HDFS, and then run head
, than to run head
on the original. This is due to quirks in how RevoScaleR works in Spark and Hadoop.
copy_to
, collect
and compute
are for copying datasets (R objects, including data frames and Xdf files) to and from HDFS. To transfer arbitrary files and directories, dplyrXdf also provides the hdfs_upload
and hdfs_download
functions. If you've used the base R function download.file
or a command-line file transfer utility like ftp
or pscp
, the syntax should be familiar: hdfs_download(src, dest)
downloads the file at src
to the location dest
, while hdfs_upload(src, dest)
uploads src
to dest
.
# create a csv file and upload it write.csv(flights, "flights.csv", row.names=FALSE) hdfs_upload("flights.csv", "/tmp")
If you uploaded a non-Xdf data source (like a csv file), you can then import it into Xdf format with as_xdf
:
flightsCsv <- RxTextData("/tmp/flights.csv", fileSystem=RxHdfsFileSystem()) flightsHd2 <- as_xdf(flightsCsv, "flights2") as_data_frame(flightsHd2)
Note that there are two kinds of Xdf files: standard and composite. A composite Xdf file is actually a directory containing multiple data and metadata files. The RevoScaleR functions can treat a composite Xdf as a single dataset, and so can dplyrXdf. Xdf files in HDFS must be composite in order to work properly; by default, copy_to
will convert an existing Xdf file into composite, if it's not already in that format.
If your R session is on the edge node, RevoScaleR provides the ability to share a Spark connection with the sparklyr package from RStudio. You do this via the interop
argument to rxSparkConnect
:
# establishes a Spark connection that can be shared with sparklyr rxSparkConnect(..., interop="sparklyr")
dplyrXdf can take advantage of this interoperability when working with Hive tables. To see how this works, let's use the hivesampledata
table which is provided as part of every Azure HDInsight cluster:
cc <- rxSparkConnect(interop="sparklyr") sampleHiv <- RxHiveData(table="hivesampletable") head(sampleHiv, 5) #> clientid querytime market deviceplatform devicemake devicemodel state #> 1 8 18:54:20 en-US Android Samsung SCH-i500 California #> 2 23 19:19:44 en-US Android HTC Incredible Pennsylvania #> 3 23 19:19:46 en-US Android HTC Incredible Pennsylvania #> 4 23 19:19:47 en-US Android HTC Incredible Pennsylvania #> 5 28 01:37:50 en-US Android Motorola Droid X Colorado #> country querydwelltime sessionid sessionpagevieworder #> 1 United States 13.920401 0 0 #> 2 United States NA 0 0 #> 3 United States 1.475742 0 1 #> 4 United States 0.245968 0 2 #> 5 United States 20.309534 1 1 sampleHiv %>% filter(deviceplatform == "Android") %>% group_by(devicemake) %>% summarise(n=n()) %>% head() #> # Source: lazy query [?? x 2] #> # Database: spark_connection #> devicemake n #> <chr> <dbl> #> 1 Casio 996 #> 2 Motorola 1524 #> 3 Huawei 230 #> 4 SonyEricsson 139 #> 5 Kyocera 117 #> 6 Unknown 2133
In this example, dplyrXdf has converted the RxHiveData
source into a sparklyr tbl, and the subsequent pipeline will be run by sparklyr (not by dplyrXdf) in the cluster.
If you don't want to use sparklyr -- for example, if you want to take advantage of MRS features like a transformFunc
to execute arbitrary R code -- you can import the Hive table to an Xdf file in HDFS. The pipeline will then execute according to your compute context. This will probably still be in-cluster, since you must be in the Spark compute context to access Hive tables. However, executing the pipeline via sparklyr is likely to be more efficient, since it removes the need to import the data.
# this will create the composite Xdf 'samplehivetable' in the HDFS user directory sampleXdf <- as_xdf(sampleHiv) sampleXdf %>% filter(deviceplatform == "Android") %>% group_by(devicemake) %>% summarise(n=n(), .method=4) %>% head() #> devicemake n #> 1 Archos 1 #> 2 Samsung 16244 #> 3 Unknown 2133 #> 4 Kyocera 117 #> 5 HTC 2242 #> 6 SonyEricsson 139
dplyrXdf provides the following functions to let you manipulate files and directories in HDFS. By and large they wrap similar functions provided by the RevoScaleR package, which in turn call various Hadoop filesystem commands.
hdfs_file_copy(src, dest)
copies the file or directory given by src
to the location dest
. It is vectorised in both src
and dest
, meaning src1
will be copied to dest1
, src2
to dest2
, and so on. It is analogous to base R's file.copy
for the native filesystem.hdfs_file_move(src, dest)
is similar, but moves files/directories. It is analogous to base R's file.rename
.hdfs_file_remove
deletes the path. Its counterpart in base R is file.remove
.hdfs_dir_create
and hdfs_dir_remove
create and delete directories. They are analogous to dir.create
and unlink(recursive=TRUE)
.hdfs_file_exists
and hdfs_dir_exists
test for the existence of a file or directory, like base R's file.exists
and dir.exists
.hdfs_dir
lists files in a HDFS directory, returning a vector of file names. It has a number of options for recursively listing subdirectories, returning subdirectories only (omitting files), etc.# create a new directory hdfs_dir_create("/tmp/mydata") # check that it exists hdfs_dir_exists("/tmp/mydata") # copy files into the new directory hdfs_file_copy("flights", "/tmp/mydata") # create a new data source flightsHd3 <- RxXdfData("/tmp/mydata/flights", fileSystem=RxHdfsFileSystem()) # read the data names(flightsHd3)
As with RevoScaleR, dplyrXdf supports HDFS data located in attached ADLS storage. When working with Xdf files, you will normally specify the HDFS filesystem as part of the fileSystem
argument. The standard dplyr/dplyrXdf verbs can both read and write to ADLS, with the location of the file being essentially transparent to the user.
adlFs <- RxHdfsFileSystem(hostName="adl://mycluster.azuredatalakestore.net") adlXdf <- RxXdfData("/path/to/file", fileSystem=adlFs) # adlXdf %>% verb1 %>% verb2 %>% ...
The hdfs_*
functions include a host
argument to let you specify which HDFS filesystem to access: either the "native" filesystem, or ADLS. The default will be whichever filesystem is currently being used. For example the following code uses hdfs_dir
to list the files in the root directory, first for the native HDFS filesystem, and then for an attached ADLS filesystem.
hdfs_dir("/", host="default") #> Directory listing of / #> [1] "HdiNotebooks" "HdiSamples" #> [3] "ams" "amshbase" #> [5] "app-logs" "apps" #> [7] "atshistory" "cluster-info" #> [9] "custom-scriptaction-logs" "example" #> [11] "hbase" "hdp" #> [13] "hive" "mapred" #> [15] "mr-history" "tmp" #> [17] "user" hdfs_dir("/", host="adl://mycluster.azuredatalakestore.net") #> Directory listing of adl://mycluster.azuredatalakestore.net/ #> [1] "clusters" "file31bc3ee52d32" "file31bc79a3de7" "file5d301ef514e5" #> [5] "folder1" "tmp" "user"
You can also provide a full URI as a path:
hdfs_dir("adl://mycluster.azuredatalakestore.net/") #> Directory listing of adl://mycluster.azuredatalakestore.net/ #> [1] "clusters" "file31bc3ee52d32" "file31bc79a3de7" "file5d301ef514e5" #> [5] "folder1" "tmp" "user"
The hdfs_host
function returns the name of the currently active HDFS filesystem host, or if an Xdf data source is supplied, the host for that data source:
hdfs_host() #> [1] "adl://mycluster.azuredatalakestore.net" xdf <- RxXdfData("file", fileSystem=RxHdfsFileSystem(hostName="default")) hdfs_host(xdf) #> [1] "default"
The in_hdfs
function returns whether a given data source is stored in HDFS.
in_hdfs(flightsHd) # also works with non-Revo data sources, like data frames in_hdfs(iris) in_hdfs(flights)
The local_exec
function runs an expression in the local compute context. This can be useful if you need to work with local datasets while connected to a remote cluster. By default, RevoScaleR functions will throw an error if you provide a local data source as an input when you are in the Hadoop or Spark compute context. local_exec
temporarily changes to the local compute context, runs your code, and then changes back to the original context.
# try to access a local Xdf file names(flightsLocal) local_exec(names(flightsLocal))
Most of the single-table dplyr verbs supported by dplyrXdf will work for datasets in HDFS. The main exceptions are:
arrange
, distinct
, sample_n
and sample_frac
do not support HDFS datacbind
and rbind
require the compute context to be localmutate
and transmute
also require that the compute context is local. Consider whether you really need to group before transforming; many transformations do not require grouping information. If your data fits into memory, you can also use do
or do_xdf
.summarise
on HDFS data will always send the output to the edge node/remote client, before writing it back to the cluster. This is a consequence of the way in which rxCube
and rxSummary
work, by creating an in-memory data structure.Support for two-table verbs is similarly more limited for datasets in HDFS than in the native filesystem. First, only the Spark compute context supports joining (not Hadoop), and only for Xdf data sources and Spark data sources (RxHiveData
, RxOrcData
and RxParquetData
). Only the "standard" joins (left_join
, right_join
, inner_join
and full_join
) are supported.
The RevoScaleR functions do not recognise tbl_xdf
objects, which are automatically generated by dplyrXdf, when in the Spark or Hadoop compute context. To remedy this, wrap any such object inside as_xdf()
before calling a Revo function:
rxLinMod(y ~ x1 + x2 + ..., data=as_xdf(tbl))
Finally, the Xdf data sources created by dplyrXdf will always contain absolute HDFS paths (that is, of the form "/path/to/file"
rather than "file"
. This is because of limitations in RevoScaleR support for relative paths in HDFS. The data source inputs to dplyrXdf verbs can contain relative paths; these will be internally translated to absolute.
hdfs_dir_remove(c("flights", "flights2", "/tmp/mydata", "/tmp/flights.csv")) clean_dplyrxdf_dir("hdfs") clean_dplyrxdf_dir("native")
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.