hadoop: Experimental Hadoop chunk-processing code

Description Usage Arguments Details Value Note Author(s) Examples

Description

hmr runs a chunk-wise Hadoop job.

hpath and hinput define HDFS file path and input source.

Usage

1
2
3
4
5
6
7
hmr(input, output, map = identity, reduce = identity, job.name,
    aux, formatter, packages = loadedNamespaces(), reducers,
    wait=TRUE, hadoop.conf, hadoop.opt, R="R",
    verbose = TRUE, persistent = FALSE, overwrite = FALSE,
    use.kinit = !is.null(getOption("hmr.kerberos.realm")))
hpath(path)
hinput(path, formatter = .default.formatter)

Arguments

input

input data - see details

output

output path (optional)

map

chunk compute function (map is a misnomer)

reduce

chunk combine function

job.name

name of the job to pass to Hadoop

aux

either a character vector of symbols names or a named list of values to push to the compute nodes

formatter

formatter to use. It is optional in hmr if the input source already contains a formatter definition. See below for details on how to sepcify separate formatters.

packages

character vector of package names to attach on the compute nodes

reducers

optional integer specifying the number of parallel jobs in the combine step. It is a hint in the sense that any number greater than one implies independence of the chunks in the combine step. Default is to not assume independence.

wait

logical, if TRUE then the command returns after the job finished, otherwise the command returns after the job has been submitted

hadoop.conf

optional string, path to the hadoop configuration directory for submission

hadoop.opt

additional Java options to pass to the job - named character vectors are passed as -D<name>=<value>, unnamed vectors are collapsed. Note: this is only a transitional interface to work around deficiencies in the job generation and should only be used as a last measure since the semantics is implementation specific and thus not prtable across systems.

R

command to call to run R on the Hadoop cluster

verbose

logical, indicating whether the output sent to standard error and standard out from hadoop should be printed to the console.

persistent

logical, if TRUE then an ROctopus job is started and the mapper is executed in "hot" ROctopus instances instead of regular R. The results in that case are ROctopus URLs.

overwrite

logical, if TRUE then the output directory is first deleted before the job is started.

use.kinit

logical, if TRUE automatically invokes kinit(realm=getOption("hmr.kerberos.realm")) before running any Hadoop commands.

path

HDFS path

Details

hmr creates and runs a Hadoop job to perform chunkwise compute + combine. The input is read using chunk.reader, processed using the formatter function and passed to the map function. The result is converted using as.output before going back to Hadoop. The chunkwise results are combined using the reduce function - the flow is the same as in the map case. Then result is returned as HDFS path. Either map or reduce can be identity (the default).

If the formatter if omitted then the format is taken from input object (if it has one) or the default formatter (mstrsplit with '\t' as key spearator, '|' as column separator) is used. If formater is a function then the same formatter is used for both the map and reduce steps. If separate formatters are required, the formatter can be a list with the entries map and/or reduce specifying the corresponding formatter function.

hpath tags a string as HDFS path. The sole purpose here is to distiguish local and HDFS paths.

hinput creates a subclass of HDFSpath which also contains the definition of the formatter for that path. The default formatter honors default Hadoop settings of '\t' as the key/value separator and '|' as the field separator.

Value

hmr returns the HDFS path to the result when finished.

hpath returns a character vector of class "HDFSpath"

hinput returns a subclass "hinput" of "HDFSpath" containing the additional "formatter" attribute.

Note

Requires properly installed Hadoop client. The installation must either be in /usr/lib/hadoop or one of HADOOP_HOME, HADOOP_PREFIX environment variables must be set accordingly.

Author(s)

Simon Urbanek

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
## Not run: 
## map points to ZIP codes and count the number of points per ZIP
## uses Tiger/LINE 2010 census data shapefiles
## we can use ctapply becasue Hadoop guarantees contiguous input

## require(fastshp); require(tl2010)
r <- hmr(
  hinput("/data/points"),
  map = function(x)
    table(zcta2010.db()[
       inside(zcta2010.shp(), x[,4], x[,5]), 1]),
  reduce = function(x) ctapply(as.numeric(x), names(x), sum))

## End(Not run)

s-u/hmr documentation built on June 8, 2020, 3:55 p.m.