# Imported from:
# https://github.com/apache/spark/blob/v1.6.2/R/pkg/R/utils.R
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Fast append to list by using an accumulator.
# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
#
# The accumulator should has three fields size, counter and data.
# This function amortizes the allocation cost by doubling
# the size of the list every time it fills up.
addItemToAccumulator <- function(acc, item) {
if (acc$counter == acc$size) {
acc$size <- acc$size * 2
length(acc$data) <- acc$size
}
acc$counter <- acc$counter + 1
acc$data[[acc$counter]] <- item
}
initAccumulator <- function() {
acc <- new.env()
acc$counter <- 0
acc$data <- list(NULL)
acc$size <- 1
acc
}
# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
# user defined function (UDF), and to examine variables in the UDF to decide
# if their values should be included in the new function environment.
# param
# node The current AST node in the traversal.
# oldEnv The original function environment.
# defVars An Accumulator of variables names defined in the function's calling environment,
# including function argument and local variable names.
# checkedFunc An environment of function objects examined during cleanClosure. It can
# be considered as a "name"-to-"list of functions" mapping.
# newEnv A new function environment to store necessary function dependencies, an output argument.
processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
nodeLen <- length(node)
if (nodeLen > 1 && typeof(node) == "language") {
# Recursive case: current AST node is an internal node, check for its children.
if (length(node[[1]]) > 1) {
for (i in 1:nodeLen) {
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
}
} else {
# if node[[1]] is length of 1, check for some R special functions.
nodeChar <- as.character(node[[1]])
if (nodeChar == "{" || nodeChar == "(") {
# Skip start symbol.
for (i in 2:nodeLen) {
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
}
} else if (nodeChar == "<-" || nodeChar == "=" ||
nodeChar == "<<-") {
# Assignment Ops.
defVar <- node[[2]]
if (length(defVar) == 1 && typeof(defVar) == "symbol") {
# Add the defined variable name into defVars.
addItemToAccumulator(defVars, as.character(defVar))
} else {
processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
}
for (i in 3:nodeLen) {
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
}
} else if (nodeChar == "function") {
# Function definition.
# Add parameter names.
newArgs <- names(node[[2]])
lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) })
for (i in 3:nodeLen) {
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
}
} else if (nodeChar == "$") {
# Skip the field.
processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
} else if (nodeChar == "::" || nodeChar == ":::") {
processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv)
} else {
for (i in 1:nodeLen) {
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
}
}
}
} else if (nodeLen == 1 &&
(typeof(node) == "symbol" || typeof(node) == "language")) {
# Base case: current AST node is a leaf node and a symbol or a function call.
nodeChar <- as.character(node)
if (!nodeChar %in% defVars$data) {
# Not a function parameter or local variable.
func.env <- oldEnv
topEnv <- parent.env(.GlobalEnv)
# Search in function environment, and function's enclosing environments
# up to global environment. There is no need to look into package environments
# above the global or namespace environment that is not sparklyr below the global,
# as they are assumed to be loaded on workers.
while (!identical(func.env, topEnv)) {
# Namespaces other than "sparklyr" will not be searched.
if (!isNamespace(func.env) ||
(getNamespaceName(func.env) == "sparklyr" &&
!(nodeChar %in% getNamespaceExports("sparklyr")))) {
# Only include sparklyr internals.
# Set parameter 'inherits' to FALSE since we do not need to search in
# attached package environments.
if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE),
error = function(e) { FALSE })) {
obj <- get(nodeChar, envir = func.env, inherits = FALSE)
if (is.function(obj)) {
# If the node is a function call.
funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
ifnotfound = list(list(NULL)))[[1]]
found <- sapply(funcList, function(func) {
ifelse(identical(func, obj), TRUE, FALSE)
})
if (sum(found) > 0) {
# If function has been examined, ignore.
break
}
# Function has not been examined, record it and recursively clean its closure.
assign(nodeChar,
if (is.null(funcList[[1]])) {
list(obj)
} else {
append(funcList, obj)
},
envir = checkedFuncs)
obj <- cleanClosure(obj, checkedFuncs)
}
assign(nodeChar, obj, envir = newEnv)
break
}
}
# Continue to search in enclosure.
func.env <- parent.env(func.env)
}
}
}
}
# Utility function to get user defined function (UDF) dependencies (closure).
# More specifically, this function captures the values of free variables defined
# outside a UDF, and stores them in the function's environment.
# param
# func A function whose closure needs to be captured.
# checkedFunc An environment of function objects examined during cleanClosure. It can be
# considered as a "name"-to-"list of functions" mapping.
# return value
# a new version of func that has an correct environment (closure).
cleanClosure <- function(func, checkedFuncs = new.env()) {
if (is.function(func)) {
newEnv <- new.env(parent = .GlobalEnv)
func.body <- body(func)
oldEnv <- environment(func)
# defVars is an Accumulator of variables names defined in the function's calling
# environment. First, function's arguments are added to defVars.
defVars <- initAccumulator()
argNames <- names(as.list(args(func)))
for (i in 1:(length(argNames) - 1)) {
# Remove the ending NULL in pairlist.
addItemToAccumulator(defVars, argNames[i])
}
# Recursively examine variables in the function body.
processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv)
environment(func) <- newEnv
}
func
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.