Execute a MapReduce Job

Share:

Description

Execute a MapReduce job

Usage

1
2
3
mrExec(data, setup = NULL, map = NULL, reduce = NULL, output = NULL,
  overwrite = FALSE, control = NULL, params = NULL, packages = NULL,
  verbose = TRUE)

Arguments

data

a ddo/ddf object, or list of ddo/ddf objects

setup

an expression of R code (created using the R command expression) to be run before map and reduce

map

an R expression that is evaluated during the map stage. For each task, this expression is executed multiple times (see details).

reduce

a vector of R expressions with names pre, reduce, and post that is evaluated during the reduce stage. For example reduce = expression(pre = {...}, reduce = {...}, post = {...}). reduce is optional, and if not specified the map output key-value pairs will be the result. If it is not specified, then a default identity reduce is performed. Setting it to 0 will skip the reduce altogether.

output

a "kvConnection" object indicating where the output data should reside (see localDiskConn, hdfsConn). If NULL (default), output will be an in-memory "ddo" object. If a character string, it will be treated as a path to be passed to the same type of connection as data - relative paths will be relative to the working directory of that back end.

overwrite

logical; should existing output location be overwritten? (also can specify overwrite = "backup" to move the existing output to _bak)

control

parameters specifying how the backend should handle things (most-likely parameters to rhwatch in RHIPE) - see rhipeControl and localDiskControl

params

a named list of objects external to the input data that are needed in the map or reduce phases

packages

a vector of R package names that contain functions used in fn (most should be taken care of automatically such that this is rarely necessary to specify)

verbose

logical - print messages about what is being done

Value

"ddo" object - to keep it simple. It is up to the user to update or cast as "ddf" if that is the desired result.

Author(s)

Ryan Hafen

Examples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# compute min and max Sepal Length by species for iris data
# using a random partitioning of it as input
d <- divide(iris, by = rrDiv(20))

mapExp <- expression({
  lapply(map.values, function(r) {
    by(r, r$Species, function(x) {
      collect(
        as.character(x$Species[1]),
        range(x$Sepal.Length, na.rm = TRUE)
      )
    })
  })
})

reduceExp <- expression(
  pre = {
    rng <- c(Inf, -Inf)
  }, reduce = {
    rx <- unlist(reduce.values)
    rng <- c(min(rng[1], rx, na.rm = TRUE), max(rng[2], rx, na.rm = TRUE))
  }, post = {
    collect(reduce.key, rng)
})

res <- mrExec(d, map = mapExp, reduce = reduceExp)
as.list(res)

Want to suggest features or report bugs for rdrr.io? Use the GitHub issue tracker.