Nothing
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Worker class
# Get current system time
currentTimeSecs <- function() {
as.numeric(Sys.time())
}
# Get elapsed time
elapsedSecs <- function() {
proc.time()[3]
}
compute <- function(mode, partition, serializer, deserializer, key,
colNames, computeFunc, inputData) {
if (mode > 0) {
if (deserializer == "row") {
# Transform the list of rows into a data.frame
# Note that the optional argument stringsAsFactors for rbind is
# available since R 3.2.4. So we set the global option here.
oldOpt <- getOption("stringsAsFactors")
options(stringsAsFactors = FALSE)
# Handle binary data types
if ("raw" %in% sapply(inputData[[1]], class)) {
inputData <- SparkR:::rbindRaws(inputData)
} else {
inputData <- do.call(rbind.data.frame, inputData)
}
options(stringsAsFactors = oldOpt)
names(inputData) <- colNames
} else {
# Check to see if inputData is a valid data.frame
stopifnot(deserializer == "byte" || deserializer == "arrow")
stopifnot(is.data.frame(inputData))
}
if (mode == 2) {
output <- computeFunc(key, inputData)
} else {
output <- computeFunc(inputData)
}
if (serializer == "row") {
# Transform the result data.frame back to a list of rows
output <- split(output, seq(nrow(output)))
} else {
# Serialize the output to a byte array
stopifnot(serializer == "byte" || serializer == "arrow")
}
} else {
output <- computeFunc(partition, inputData)
}
return(output)
}
outputResult <- function(serializer, output, outputCon) {
if (serializer == "byte") {
SparkR:::writeRawSerialize(outputCon, output)
} else if (serializer == "row") {
SparkR:::writeRowSerialize(outputCon, output)
} else if (serializer == "arrow") {
SparkR:::writeSerializeInArrow(outputCon, output)
} else {
# write lines one-by-one with flag
lapply(output, function(line) SparkR:::writeString(outputCon, line))
}
}
# Constants
specialLengths <- list(END_OF_STREAM = 0L, TIMING_DATA = -1L)
# Timing R process boot
bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
connectionTimeout <- as.integer(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
dirs <- strsplit(rLibDir, ",")[[1]]
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
# SparkR namespace
.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
inputCon <- socketConnection(
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
SparkR:::doServerAuth(inputCon, Sys.getenv("SPARKR_WORKER_SECRET"))
outputCon <- socketConnection(
port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
SparkR:::doServerAuth(outputCon, Sys.getenv("SPARKR_WORKER_SECRET"))
# read the index of the current partition inside the RDD
partition <- SparkR:::readInt(inputCon)
deserializer <- SparkR:::readString(inputCon)
serializer <- SparkR:::readString(inputCon)
# Include packages as required
packageNames <- unserialize(SparkR:::readRaw(inputCon))
for (pkg in packageNames) {
suppressPackageStartupMessages(library(as.character(pkg), character.only = TRUE))
}
# read function dependencies
funcLen <- SparkR:::readInt(inputCon)
computeFunc <- unserialize(SparkR:::readRawLen(inputCon, funcLen))
env <- environment(computeFunc)
parent.env(env) <- .GlobalEnv # Attach under global environment.
# Timing init envs for computing
initElap <- elapsedSecs()
# Read and set broadcast variables
numBroadcastVars <- SparkR:::readInt(inputCon)
if (numBroadcastVars > 0) {
for (bcast in seq(1:numBroadcastVars)) {
bcastId <- SparkR:::readInt(inputCon)
value <- unserialize(SparkR:::readRaw(inputCon))
SparkR:::setBroadcastValue(bcastId, value)
}
}
# Timing broadcast
broadcastElap <- elapsedSecs()
# Initial input timing
inputElap <- broadcastElap
# If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int
# as number of partitions to create.
numPartitions <- SparkR:::readInt(inputCon)
# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode
mode <- SparkR:::readInt(inputCon)
if (mode > 0) {
colNames <- SparkR:::readObject(inputCon)
}
isEmpty <- SparkR:::readInt(inputCon)
computeInputElapsDiff <- 0
outputComputeElapsDiff <- 0
if (isEmpty != 0) {
if (numPartitions == -1) {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
data <- SparkR:::readDeserialize(inputCon)
} else if (deserializer == "string") {
data <- as.list(readLines(inputCon))
} else if (deserializer == "row" && mode == 2) {
dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon)
keys <- dataWithKeys$keys
data <- dataWithKeys$data
} else if (deserializer == "row") {
data <- SparkR:::readMultipleObjects(inputCon)
} else if (deserializer == "arrow" && mode == 2) {
dataWithKeys <- SparkR:::readDeserializeWithKeysInArrow(inputCon)
keys <- dataWithKeys$keys
data <- dataWithKeys$data
} else if (deserializer == "arrow" && mode == 1) {
data <- SparkR:::readDeserializeInArrow(inputCon)
# See https://stat.ethz.ch/pipermail/r-help/2010-September/252046.html
# rbind.fill might be an alternative to make it faster if plyr is installed.
# Also, note that, 'dapply' applies a function to each partition.
data <- do.call("rbind", data)
}
# Timing reading input data for execution
inputElap <- elapsedSecs()
if (mode > 0) {
if (mode == 1) {
output <- compute(mode, partition, serializer, deserializer, NULL,
colNames, computeFunc, data)
} else {
# gapply mode
outputs <- list()
for (i in seq_len(length(data))) {
# Timing reading input data for execution
computeStart <- elapsedSecs()
output <- compute(mode, partition, serializer, deserializer, keys[[i]],
colNames, computeFunc, data[[i]])
computeElap <- elapsedSecs()
if (serializer == "arrow") {
outputs[[length(outputs) + 1L]] <- output
} else {
outputResult(serializer, output, outputCon)
outputComputeElapsDiff <- outputComputeElapsDiff + (elapsedSecs() - computeElap)
}
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - computeStart)
}
if (serializer == "arrow") {
# See https://stat.ethz.ch/pipermail/r-help/2010-September/252046.html
# rbind.fill might be an alternative to make it faster if plyr is installed.
outputStart <- elapsedSecs()
combined <- do.call("rbind", outputs)
SparkR:::writeSerializeInArrow(outputCon, combined)
outputComputeElapsDiff <- elapsedSecs() - outputStart
}
}
} else {
output <- compute(mode, partition, serializer, deserializer, NULL,
colNames, computeFunc, data)
}
if (mode != 2) {
# Not a gapply mode
computeElap <- elapsedSecs()
outputResult(serializer, output, outputCon)
outputElap <- elapsedSecs()
computeInputElapsDiff <- computeElap - inputElap
outputComputeElapsDiff <- outputElap - computeElap
}
} else {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
data <- SparkR:::readDeserialize(inputCon)
} else if (deserializer == "string") {
data <- readLines(inputCon)
} else if (deserializer == "row") {
data <- SparkR:::readMultipleObjects(inputCon)
}
# Timing reading input data for execution
inputElap <- elapsedSecs()
res <- new.env()
# Step 1: hash the data to an environment
hashTupleToEnvir <- function(tuple) {
# NOTE: execFunction is the hash function here
hashVal <- computeFunc(tuple[[1]])
bucket <- as.character(hashVal %% numPartitions)
acc <- res[[bucket]]
# Create a new accumulator
if (is.null(acc)) {
acc <- SparkR:::initAccumulator()
}
SparkR:::addItemToAccumulator(acc, tuple)
res[[bucket]] <- acc
}
invisible(lapply(data, hashTupleToEnvir))
# Timing computing
computeElap <- elapsedSecs()
# Step 2: write out all of the environment as key-value pairs.
for (name in ls(res)) {
SparkR:::writeInt(outputCon, 2L)
SparkR:::writeInt(outputCon, as.integer(name))
# Truncate the accumulator list to the number of elements we have
length(res[[name]]$data) <- res[[name]]$counter
SparkR:::writeRawSerialize(outputCon, res[[name]]$data)
}
# Timing output
outputElap <- elapsedSecs()
computeInputElapsDiff <- computeElap - inputElap
outputComputeElapsDiff <- outputElap - computeElap
}
}
# Report timing
SparkR:::writeInt(outputCon, specialLengths$TIMING_DATA)
SparkR:::writeDouble(outputCon, bootTime)
SparkR:::writeDouble(outputCon, initElap - bootElap) # init
SparkR:::writeDouble(outputCon, broadcastElap - initElap) # broadcast
SparkR:::writeDouble(outputCon, inputElap - broadcastElap) # input
SparkR:::writeDouble(outputCon, computeInputElapsDiff) # compute
SparkR:::writeDouble(outputCon, outputComputeElapsDiff) # output
# End of output
SparkR:::writeInt(outputCon, specialLengths$END_OF_STREAM)
close(outputCon)
close(inputCon)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.