Nothing
utils::globalVariables(
c("rhwrite", "rhread", "rhls", "rhIterator", "rhabsolute.hdfs.path",
"rhmapfile", "rhoptions", "rhinit", "rhmkdir", "rhchmod", "rhload",
"rhload", "rhdel", "rhmv", "rhfmt", "rhwatch", "rhsave", "rhchmod",
"x", "curEnv", "k", "v", "setup", ".jnew", ".jcall", ".")
)
### mrExec for kvHDFS objects
#' @export
mrExecInternal.kvHDFSList <- function(data, setup = NULL, map = NULL, reduce = NULL, output = NULL, control = NULL, params = NULL) {
setup2 <- expression({
collect <- rhcollect
counter <- rhcounter
.libPaths(libPaths)
})
setup <- appendExpression(setup2, setup)
params$libPaths <- .libPaths()
mkd <- utils::getFromNamespace("mkdHDFSTempFolder", "Rhipe")
if(inherits(output, "hdfsConn")) {
if(is.expression(reduce))
output$type <- "map" # force type = "map" if there is a reduce - why not?
} else if(is.character(output)) {
output <- hdfsConn(output)
} else {
output <- hdfsConn(mkd(file = "tmp_output"), autoYes = TRUE)
}
outFile <- rhfmt(output$loc, type = output$type)
# set write.job.info to TRUE
wji <- rhoptions()$write.job.info
rhoptions(write.job.info = TRUE)
conns <- lapply(data, function(x) getAttribute(x, "conn"))
locs <- sapply(conns, function(x) x$loc)
types <- sapply(conns, function(x) x$type)
if(length(unique(types)) != 1)
stop("Currently all inputs must have the same type ('map', 'seq', 'text')")
# create a lookup for file being processed and it's dataSourceName
nms <- names(data)
dataSourceName <- lapply(seq_along(data), function(i) nms[i])
names(dataSourceName) <- locs
params$dataSourceName <- dataSourceName
map2 <- expression({
mif <- Sys.getenv("mapred.input.file")
.ind <- which(sapply(names(dataSourceName), function(x) grepl(x, mif)))
.dataSourceName <- dataSourceName[[.ind]]
})
map <- appendExpression(map2, map)
co <- rhoptions()$copyObjects
co2 <- co
co2$auto <- FALSE
rhoptions(copyObjects = co2)
res <- rhwatch(
setup = setup,
map = map,
reduce = reduce,
input = rhfmt(locs, type = types[1]),
output = outFile,
mapred = control$mapred,
# combiner = control$combiner,
readback = FALSE,
parameters = params
)
rhoptions(copyObjects = co)
# set write.job.info back to what it was
rhoptions(write.job.info = wji)
# get counters into a list of lists format
counters <- res[[1]]$counters
for(i in seq_along(counters)) {
nms <- rownames(counters[[i]])
counters[[i]] <- as.list(counters[[i]])
names(counters[[i]]) <- nms
}
list(data = output, counters = counters)
}
#' Specify Control Parameters for RHIPE Job
#'
#' Specify control parameters for a RHIPE job. See \code{rhwatch} for details about each of the parameters.
#'
#' @param mapred,setup,combiner,cleanup,orderby,shared,jarfiles,zips,jobname arguments to \code{rhwatch} in RHIPE
#' @examples
#' \dontrun{
#' # input data on HDFS
#' d <- ddf(hdfsConn("/path/to/big/data/on/hdfs"))
#'
#' # set RHIPE / Hadoop parameters
#' # buffer sizes control how many k/v pairs are sent to map / reduce tasks at a time
#' # mapred.reduce.tasks is a Hadoop config parameter that controls # of reduce tasks
#' rhctl <- rhipeControl(mapred = list(
#' rhipe_map_buff_size = 10000,
#' mapred.reduce.tasks = 72,
#' rhipe_reduce_buff_size = 1)
#'
#' # divide input data using these control parameters
#' divide(d, by = "var", output = hdfsConn("/path/to/output"), control = rhctl)
#' }
#' @export
rhipeControl <- function(mapred = NULL, setup = NULL, combiner = FALSE, cleanup = NULL, orderby = "bytes", shared = NULL, jarfiles = NULL, zips = NULL, jobname = "") {
res <- list(mapred = mapred, setup = setup, combiner = combiner, cleanup = cleanup, orderby = orderby, shared = shared, jarfiles = jarfiles, zips = zips, jobname = jobname)
class(res) <- c("rhipeControl", "list")
res
}
#' @export
defaultControl.kvHDFS <- function(x) {
res <- getOption("defaultRhipeControl")
if(inherits(res, "rhipeControl")) {
return(res)
} else {
return(rhipeControl())
}
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.