R/mapreduce_kvHDFS.R

utils::globalVariables(
  c("rhwrite", "rhread", "rhls", "rhIterator", "rhabsolute.hdfs.path",
   "rhmapfile", "rhoptions", "rhinit", "rhmkdir", "rhchmod", "rhload",
   "rhload", "rhdel", "rhmv", "rhfmt", "rhwatch", "rhsave", "rhchmod",
   "x", "curEnv", "k", "v", "setup", ".jnew", ".jcall", ".")
)

### mrExec for kvHDFS objects

#' @export
mrExecInternal.kvHDFSList <- function(data, setup = NULL, map = NULL, reduce = NULL, output = NULL, control = NULL, params = NULL) {

  setup2 <- expression({
    collect <- rhcollect
    counter <- rhcounter
    .libPaths(libPaths)
  })
  setup <- appendExpression(setup2, setup)

  params$libPaths <- .libPaths()

  mkd <- utils::getFromNamespace("mkdHDFSTempFolder", "Rhipe")

  if(inherits(output, "hdfsConn")) {
    if(is.expression(reduce))
      output$type <- "map" # force type = "map" if there is a reduce - why not?
  } else if(is.character(output)) {
    output <- hdfsConn(output)
  } else {
    output <- hdfsConn(mkd(file = "tmp_output"), autoYes = TRUE)
  }
  outFile <- rhfmt(output$loc, type = output$type)

  # set write.job.info to TRUE
  wji <- rhoptions()$write.job.info
  rhoptions(write.job.info = TRUE)

  conns <- lapply(data, function(x) getAttribute(x, "conn"))
  locs <- sapply(conns, function(x) x$loc)
  types <- sapply(conns, function(x) x$type)
  if(length(unique(types)) != 1)
    stop("Currently all inputs must have the same type ('map', 'seq', 'text')")

  # create a lookup for file being processed and it's dataSourceName
  nms <- names(data)
  dataSourceName <- lapply(seq_along(data), function(i) nms[i])
  names(dataSourceName) <- locs
  params$dataSourceName <- dataSourceName
  map2 <- expression({
    mif <- Sys.getenv("mapred.input.file")
    .ind <- which(sapply(names(dataSourceName), function(x) grepl(x, mif)))
    .dataSourceName <- dataSourceName[[.ind]]
  })
  map <- appendExpression(map2, map)

  co <- rhoptions()$copyObjects
  co2 <- co
  co2$auto <- FALSE
  rhoptions(copyObjects = co2)

  res <- rhwatch(
    setup = setup,
    map = map,
    reduce = reduce,
    input = rhfmt(locs, type = types[1]),
    output = outFile,
    mapred = control$mapred,
    # combiner = control$combiner,
    readback = FALSE,
    parameters = params
  )

  rhoptions(copyObjects = co)
  # set write.job.info back to what it was
  rhoptions(write.job.info = wji)

  # get counters into a list of lists format
  counters <- res[[1]]$counters
  for(i in seq_along(counters)) {
    nms <- rownames(counters[[i]])
    counters[[i]] <- as.list(counters[[i]])
    names(counters[[i]]) <- nms
  }

  list(data = output, counters = counters)
}

#' Specify Control Parameters for RHIPE Job
#'
#' Specify control parameters for a RHIPE job.  See \code{rhwatch} for details about each of the parameters.
#'
#' @param mapred,setup,combiner,cleanup,orderby,shared,jarfiles,zips,jobname arguments to \code{rhwatch} in RHIPE
#' @examples
#' \dontrun{
#' # input data on HDFS
#' d <- ddf(hdfsConn("/path/to/big/data/on/hdfs"))
#'
#' # set RHIPE / Hadoop parameters
#' # buffer sizes control how many k/v pairs are sent to map / reduce tasks at a time
#' # mapred.reduce.tasks is a Hadoop config parameter that controls # of reduce tasks
#' rhctl <- rhipeControl(mapred = list(
#'   rhipe_map_buff_size = 10000,
#'   mapred.reduce.tasks = 72,
#'   rhipe_reduce_buff_size = 1)
#'
#' # divide input data using these control parameters
#' divide(d, by = "var", output = hdfsConn("/path/to/output"), control = rhctl)
#' }
#' @export
rhipeControl <- function(mapred = NULL, setup = NULL, combiner = FALSE, cleanup = NULL, orderby = "bytes", shared = NULL, jarfiles = NULL, zips = NULL, jobname = "") {
  res <- list(mapred = mapred, setup = setup, combiner = combiner, cleanup = cleanup, orderby = orderby, shared = shared, jarfiles = jarfiles, zips = zips, jobname = jobname)
  class(res) <- c("rhipeControl", "list")
  res
}

#' @export
defaultControl.kvHDFS <- function(x) {
  res <- getOption("defaultRhipeControl")
  if(inherits(res, "rhipeControl")) {
    return(res)
  } else {
    return(rhipeControl())
  }
}

Try the datadr package in your browser

Any scripts or data that you put into this service are public.

datadr documentation built on May 1, 2019, 8:06 p.m.