Description Usage Arguments Details Author(s) References Examples
High-level R function for using Hadoop Streaming.
1 2 3 |
mapper |
a function which is executed on each worker node. The so-called mapper typically maps input key/value pairs to a set of intermediate key/value pairs. |
reducer |
a function which is executed on each worker node. The so-called reducer reduces a set of intermediate values which share a key to a smaller set of values. If no reducer is used leave empty. |
input |
specifies the directory holding the data in the DFS. |
output |
specifies the output directory in the DFS containing the results after the streaming job finished. |
henv |
Hadoop local environment. |
mapper_args |
additional arguments to the mapper. |
reducer_args |
additional arguments to the reducer. |
cmdenv_arg |
additional arguments passed as environment variables to distributed tasks. |
streaming_args |
additional arguments passed to the Hadoop
Streaming utility. By default, only the number of reducers will be
set using |
The function hive_stream()
starts a MapReduce job on the given
data located on the HDFS.
Stefan Theussl
Apache Hadoop Streaming (https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | ## A simple word count example
## Put some xml files on the HDFS:
## Not run: DFS_put( system.file("defaults/core/", package = "hive"),
"/tmp/input" )
## End(Not run)
## Not run: DFS_put( system.file("defaults/hdfs/hdfs-default.xml", package = "hive"),
"/tmp/input" )
## End(Not run)
## Not run: DFS_put( system.file("defaults/mapred/mapred-default.xml", package = "hive"),
"/tmp/input" )
## End(Not run)
## Define the mapper and reducer function to be applied:
## Note that a Hadoop map or reduce job retrieves data line by line from stdin.
## Not run:
mapper <- function(x){
con <- file( "stdin", open = "r" )
while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
terms <- unlist(strsplit(line, " "))
terms <- terms[nchar(terms) > 1 ]
if( length(terms) )
cat( paste(terms, 1, sep = "\t"), sep = "\n")
}
}
reducer <- function(x){
env <- new.env( hash = TRUE )
con <- file( "stdin", open = "r" )
while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
keyvalue <- unlist( strsplit(line, "\t") )
if( exists(keyvalue[1], envir = env, inherits = FALSE) ){
assign( keyvalue[1], get(keyvalue[1], envir = env) + as.integer(keyvalue[2]),
envir = env )
} else {
assign( keyvalue[1], as.integer(keyvalue[2]), envir = env )
}
}
env <- as.list(env)
for( term in names(env) )
writeLines( paste(c(term, env[[term]]), collapse ="\t") )
}
hive_set_nreducer(1)
hive_stream( mapper = mapper, reducer = reducer, input = "/tmp/input", output = "/tmp/output" )
DFS_list("/tmp/output")
head( DFS_read_lines("/tmp/output/part-00000") )
## End(Not run)
## Don't forget to clean file system
## Not run: DFS_dir_remove("/tmp/input")
## Not run: DFS_dir_remove("/tmp/output")
|
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.