# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#    http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

# Utilities and Helpers

# Given a JList<T>, returns an R list containing the same elements, the number
# of which is optionally upper bounded by `logicalUpperBound` (by default,
# return all elements).  Takes care of deserializations and type conversions.
convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
  serializedMode = "byte") {
  arrSize <- callJMethod(jList, "size")

  # Datasets with serializedMode == "string" (such as an RDD directly generated by textFile()):
  # each partition is not dense-packed into one Array[Byte], and `arrSize`
  # here corresponds to number of logical elements. Thus we can prune here.
  if (serializedMode == "string" && !is.null(logicalUpperBound)) {
    arrSize <- min(arrSize, logicalUpperBound)

  results <- if (arrSize > 0) {
    lapply(0 : (arrSize - 1),
          function(index) {
            obj <- callJMethod(jList, "get", as.integer(index))

            # Assume it is either an R object or a Java obj ref.
            if (inherits(obj, "jobj")) {
              if (isInstanceOf(obj, "scala.Tuple2")) {
                # JavaPairRDD[Array[Byte], Array[Byte]].

                keyBytes <- callJMethod(obj, "_1")
                valBytes <- callJMethod(obj, "_2")
                res <- list(unserialize(keyBytes),
              } else {
                stop("utils.R: convertJListToRList only supports ",
                  "RDD[Array[Byte]] and ",
                  "JavaPairRDD[Array[Byte], Array[Byte]] for now")
            } else {
              if (inherits(obj, "raw")) {
                if (serializedMode == "byte") {
                  # RDD[Array[Byte]]. `obj` is a whole partition.
                  res <- unserialize(obj)
                  # For serialized datasets, `obj` (and `rRaw`) here corresponds to
                  # one whole partition dense-packed together. We deserialize the
                  # whole partition first, then cap the number of elements to be returned.
                } else if (serializedMode == "row") {
                  res <- readRowList(obj)
                  # For DataFrames that have been converted to RRDDs, we call readRowList
                  # which will read in each row of the RRDD as a list and deserialize
                  # each element.
                  flatten <<- FALSE
                  # Use global assignment to change the flatten flag. This means
                  # we don't have to worry about the default argument in other functions
                  # e.g. collect
                # TODO: is it possible to distinguish element boundary so that we can
                # unserialize only what we need?
                if (!is.null(logicalUpperBound)) {
                  res <- head(res, n = logicalUpperBound)
              } else {
                # obj is of a primitive Java type, is simplified to R's
                # corresponding type.
                res <- list(obj)
  } else {

  if (flatten) {
    as.list(unlist(results, recursive = FALSE))
  } else {

# Returns TRUE if `name` refers to an RDD in the given environment `env`
isRDD <- function(name, env) {
  obj <- get(name, envir = env)
  inherits(obj, "RDD")

#' Compute the hashCode of an object
#' Java-style function to compute the hashCode for the given object. Returns
#' an integer value.
#' @details
#' This only works for integer, numeric and character types right now.
#' @param key the object to be hashed
#' @return the hash code as an integer
#' @examples
#' hashCode(1L) # 1
#' hashCode(1.0) # 1072693248
#' hashCode("1") # 49
#' @note hashCode since 1.4.0
hashCode <- function(key) {
  if (class(key) == "integer") {
  } else if (class(key) == "numeric") {
    # Convert the double to long and then calculate the hash code
    rawVec <- writeBin(key[[1]], con = raw())
    intBits <- packBits(rawToBits(rawVec), "integer")
    as.integer(bitwXor(intBits[2], intBits[1]))
  } else if (class(key) == "character") {
    # TODO: SPARK-7839 means we might not have the native library available
    n <- nchar(key)
    if (n == 0) {
    } else {
      asciiVals <- sapply(charToRaw(key), function(x) { strtoi(x, 16L) })
      hashC <- 0
      for (k in seq_len(length(asciiVals))) {
        hashC <- mult31AndAdd(hashC, asciiVals[k])
  } else {
    warning("Could not hash object, returning 0")

# Helper function used to wrap a 'numeric' value to integer bounds.
# Useful for implementing C-like integer arithmetic
wrapInt <- function(value) {
  if (value > .Machine$integer.max) {
    value <- value - 2 * .Machine$integer.max - 2
  } else if (value < -1 * .Machine$integer.max) {
    value <- 2 * .Machine$integer.max + value + 2

# Multiply `val` by 31 and add `addVal` to the result. Ensures that
# integer-overflows are handled at every step.
# TODO: this function does not handle integer overflow well
mult31AndAdd <- function(val, addVal) {
  vec <- c(bitwShiftL(val, c(4, 3, 2, 1, 0)), addVal)
  vec[is.na(vec)] <- 0
  Reduce(function(a, b) {
          wrapInt(as.numeric(a) + as.numeric(b))

# Create a new RDD with serializedMode == "byte".
# Return itself if already in "byte" format.
serializeToBytes <- function(rdd) {
  if (!inherits(rdd, "RDD")) {
    stop("Argument 'rdd' is not an RDD type.")
  if (getSerializedMode(rdd) != "byte") {
    ser.rdd <- lapply(rdd, function(x) { x })
  } else {

# Create a new RDD with serializedMode == "string".
# Return itself if already in "string" format.
serializeToString <- function(rdd) {
  if (!inherits(rdd, "RDD")) {
    stop("Argument 'rdd' is not an RDD type.")
  if (getSerializedMode(rdd) != "string") {
    ser.rdd <- lapply(rdd, function(x) { toString(x) })
    # force it to create jrdd using "string"
    getJRDD(ser.rdd, serializedMode = "string")
  } else {

# Fast append to list by using an accumulator.
# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
# The accumulator should has three fields size, counter and data.
# This function amortizes the allocation cost by doubling
# the size of the list every time it fills up.
addItemToAccumulator <- function(acc, item) {
  if (acc$counter == acc$size) {
    acc$size <- acc$size * 2
    length(acc$data) <- acc$size
  acc$counter <- acc$counter + 1
  acc$data[[acc$counter]] <- item

initAccumulator <- function() {
  acc <- new.env()
  acc$counter <- 0
  acc$data <- list(NULL)
  acc$size <- 1

# Utility function to sort a list of key value pairs
# Used in unit tests
sortKeyValueList <- function(kv_list, decreasing = FALSE) {
  keys <- sapply(kv_list, function(x) x[[1]])
  kv_list[order(keys, decreasing = decreasing)]

# Utility function to generate compact R lists from grouped rdd
# Used in Join-family functions
# param:
#   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
#   cnull Boolean list where each element determines whether the corresponding list should
#         be converted to list(NULL)
genCompactLists <- function(tagged_list, cnull) {
  len <- length(tagged_list)
  lists <- list(vector("list", len), vector("list", len))
  index <- list(1, 1)

  for (x in tagged_list) {
    tag <- x[[1]]
    idx <- index[[tag]]
    lists[[tag]][[idx]] <- x[[2]]
    index[[tag]] <- idx + 1

  len <- lapply(index, function(x) x - 1)
  for (i in (1:2)) {
    if (cnull[[i]] && len[[i]] == 0) {
      lists[[i]] <- list(NULL)
    } else {
      length(lists[[i]]) <- len[[i]]


# Utility function to merge compact R lists
# Used in Join-family functions
# param:
#   left/right Two compact lists ready for Cartesian product
mergeCompactLists <- function(left, right) {
  result <- list()
  length(result) <- length(left) * length(right)
  index <- 1
  for (i in left) {
    for (j in right) {
      result[[index]] <- list(i, j)
      index <- index + 1

# Utility function to wrapper above two operations
# Used in Join-family functions
# param (same as genCompactLists):
#   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
#   cnull Boolean list where each element determines whether the corresponding list should
#         be converted to list(NULL)
joinTaggedList <- function(tagged_list, cnull) {
  lists <- genCompactLists(tagged_list, cnull)
  mergeCompactLists(lists[[1]], lists[[2]])

# Utility function to reduce a key-value list with predicate
# Used in *ByKey functions
# param
#   pair key-value pair
#   keys/vals env of key/value with hashes
#   updateOrCreatePred predicate function
#   updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
#   createFn create function for new pair, similar with `createCombiner` @combinebykey
updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
  # assume hashVal bind to `$hash`, key/val with index 1/2
  hashVal <- pair$hash
  key <- pair[[1]]
  val <- pair[[2]]
  if (updateOrCreatePred(pair)) {
    assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
  } else {
    assign(hashVal, do.call(createFn, list(val)), envir = vals)
    assign(hashVal, key, envir = keys)

# Utility function to convert key&values envs into key-val list
convertEnvsToList <- function(keys, vals) {
         function(name) {
           list(keys[[name]], vals[[name]])

# Utility function to merge 2 environments with the second overriding values in the first
# env1 is changed in place
overrideEnvs <- function(env1, env2) {
         function(name) {
           env1[[name]] <- env2[[name]]

# Utility function to capture the varargs into environment object
varargsToEnv <- function(...) {
  # Based on http://stackoverflow.com/a/3057419/4577954
  pairs <- list(...)
  env <- new.env()
  for (name in names(pairs)) {
    env[[name]] <- pairs[[name]]

# Utility function to capture the varargs into environment object but all values are converted
# into string.
varargsToStrEnv <- function(...) {
  pairs <- list(...)
  nameList <- names(pairs)
  env <- new.env()
  ignoredNames <- list()

  if (is.null(nameList)) {
    # When all arguments are not named, names(..) returns NULL.
    ignoredNames <- pairs
  } else {
    for (i in seq_along(pairs)) {
      name <- nameList[i]
      value <- pairs[i]
      if (identical(name, "")) {
        # When some of arguments are not named, name is "".
        ignoredNames <- append(ignoredNames, value)
      } else {
        value <- pairs[[name]]
        if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
          stop("Unsupported type for ", name, " : ", toString(class(value)), ". ",
               "Supported types are logical, numeric, character and NULL.", call. = FALSE)
        if (is.logical(value)) {
          env[[name]] <- tolower(as.character(value))
        } else if (is.null(value)) {
          env[[name]] <- value
        } else {
          env[[name]] <- as.character(value)

  if (length(ignoredNames) != 0) {
    warning("Unnamed arguments ignored: ", toString(ignoredNames), ".", call. = FALSE)

getStorageLevel <- function(newLevel = c("DISK_ONLY",
                                         "OFF_HEAP")) {
  storageLevelClass <- "org.apache.spark.storage.StorageLevel"
  storageLevel <- switch(newLevel,
                         "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
                         "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
                         "DISK_ONLY_3" = callJStatic(storageLevelClass, "DISK_ONLY_3"),
                         "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
                         "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
                         "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
                         "MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
                         "MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
                         "MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
                         "MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
                         "MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
                         "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))

storageLevelToString <- function(levelObj) {
  useDisk <- callJMethod(levelObj, "useDisk")
  useMemory <- callJMethod(levelObj, "useMemory")
  useOffHeap <- callJMethod(levelObj, "useOffHeap")
  deserialized <- callJMethod(levelObj, "deserialized")
  replication <- callJMethod(levelObj, "replication")
  shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 3) {
  } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
  } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
  } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
  } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
  } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
  } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
  } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
  } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
  } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
  } else {
  fullInfo <- callJMethod(levelObj, "toString")
  if (is.null(shortName)) {
  } else {
    paste(shortName, "-", fullInfo)

# Utility function for functions where an argument needs to be integer but we want to allow
# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
numToInt <- function(num) {
  if (as.integer(num) != num) {
    warning("Coercing ", as.list(sys.call())[[2L]], " to integer.")

# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
# user defined function (UDF), and to examine variables in the UDF to decide
# if their values should be included in the new function environment.
# param
#   node The current AST node in the traversal.
#   oldEnv The original function environment.
#   defVars An Accumulator of variables names defined in the function's calling environment,
#           including function argument and local variable names.
#   checkedFunc An environment of function objects examined during cleanClosure. It can
#               be considered as a "name"-to-"list of functions" mapping.
#   newEnv A new function environment to store necessary function dependencies, an output argument.
processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
  nodeLen <- length(node)

  if (nodeLen > 1 && typeof(node) == "language") {
    # Recursive case: current AST node is an internal node, check for its children.
    if (length(node[[1]]) > 1) {
      for (i in 1:nodeLen) {
        processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
    } else {
      # if node[[1]] is length of 1, check for some R special functions.
      nodeChar <- as.character(node[[1]])
      if (nodeChar == "{" || nodeChar == "(") {
        # Skip start symbol.
        for (i in 2:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
      } else if (nodeChar == "<-" || nodeChar == "=" ||
                   nodeChar == "<<-") {
        # Assignment Ops.
        defVar <- node[[2]]
        if (length(defVar) == 1 && typeof(defVar) == "symbol") {
          # Add the defined variable name into defVars.
          addItemToAccumulator(defVars, as.character(defVar))
        } else {
          processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
        for (i in 3:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
      } else if (nodeChar == "function") {
        # Function definition.
        # Add parameter names.
        newArgs <- names(node[[2]])
        lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) })
        for (i in 3:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
      } else if (nodeChar == "$") {
        # Skip the field.
        processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
      } else if (nodeChar == "::" || nodeChar == ":::") {
        processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv)
      } else {
        for (i in 1:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
  } else if (nodeLen == 1 &&
               (typeof(node) == "symbol" || typeof(node) == "language")) {
    # Base case: current AST node is a leaf node and a symbol or a function call.
    nodeChar <- as.character(node)
    if (!nodeChar %in% defVars$data) {
      # Not a function parameter or local variable.
      func.env <- oldEnv
      topEnv <- parent.env(.GlobalEnv)
      # Search in function environment, and function's enclosing environments
      # up to global environment. There is no need to look into package environments
      # above the global or namespace environment that is not SparkR below the global,
      # as they are assumed to be loaded on workers.
      while (!identical(func.env, topEnv)) {
        # Namespaces other than "SparkR" will not be searched.
        if (!isNamespace(func.env) ||
            (getNamespaceName(func.env) == "SparkR" &&
               !(nodeChar %in% getNamespaceExports("SparkR")) &&
                  # Note that generic S4 methods should not be set to the environment of
                  # cleaned closure. It does not work with R 4.0.0+. See also SPARK-31918.
                  nodeChar != "" && !methods::isGeneric(nodeChar, func.env))) {
          # Only include SparkR internals.

          # Set parameter 'inherits' to FALSE since we do not need to search in
          # attached package environments.
          if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE),
                       error = function(e) { FALSE })) {
            obj <- get(nodeChar, envir = func.env, inherits = FALSE)
            if (is.function(obj)) {
              # If the node is a function call.
              funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
                               ifnotfound = list(list(NULL)))[[1]]
              found <- sapply(funcList, function(func) {
                  identical(func, obj) &&
                    # Also check if the parent environment is identical to current parent
                    identical(parent.env(environment(func)), func.env),
                  TRUE, FALSE)
              if (sum(found) > 0) {
                # If function has been examined ignore
              # Function has not been examined, record it and recursively clean its closure.
                     if (is.null(funcList[[1]])) {
                     } else {
                       append(funcList, obj)
                     envir = checkedFuncs)
              obj <- cleanClosure(obj, checkedFuncs)
            assign(nodeChar, obj, envir = newEnv)

        # Continue to search in enclosure.
        func.env <- parent.env(func.env)

# Utility function to get user defined function (UDF) dependencies (closure).
# More specifically, this function captures the values of free variables defined
# outside a UDF, and stores them in the function's environment.
# param
#   func A function whose closure needs to be captured.
#   checkedFunc An environment of function objects examined during cleanClosure. It can be
#               considered as a "name"-to-"list of functions" mapping.
# return value
#   a new version of func that has a correct environment (closure).
cleanClosure <- function(func, checkedFuncs = new.env()) {
  if (is.function(func)) {
    newEnv <- new.env(parent = .GlobalEnv)
    func.body <- body(func)
    oldEnv <- environment(func)
    # defVars is an Accumulator of variables names defined in the function's calling
    # environment. First, function's arguments are added to defVars.
    defVars <- initAccumulator()
    argNames <- names(as.list(args(func)))
    for (i in 1:(length(argNames) - 1)) {
      # Remove the ending NULL in pairlist.
      addItemToAccumulator(defVars, argNames[i])
    # Recursively examine variables in the function body.
    processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv)
    environment(func) <- newEnv

# Append partition lengths to each partition in two input RDDs if needed.
# param
#   x An RDD.
#   Other An RDD.
# return value
#   A list of two result RDDs.
appendPartitionLengths <- function(x, other) {
  if (getSerializedMode(x) != getSerializedMode(other) ||
      getSerializedMode(x) == "byte") {
    # Append the number of elements in each partition to that partition so that we can later
    # know the boundary of elements from x and other.
    # Note that this appending also serves the purpose of reserialization, because even if
    # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
    # as a single byte array. For example, partitions of an RDD generated from partitionBy()
    # may be encoded as multiple byte arrays.
    appendLength <- function(part) {
      len <- length(part)
      part[[len + 1]] <- len + 1
    x <- lapplyPartition(x, appendLength)
    other <- lapplyPartition(other, appendLength)
  list(x, other)

# Perform zip or cartesian between elements from two RDDs in each partition
# param
#   rdd An RDD.
#   zip A boolean flag indicating this call is for zip operation or not.
# return value
#   A result RDD.
mergePartitions <- function(rdd, zip) {
  serializerMode <- getSerializedMode(rdd)
  partitionFunc <- function(partIndex, part) {
    len <- length(part)
    if (len > 0) {
      if (serializerMode == "byte") {
        lengthOfValues <- part[[len]]
        lengthOfKeys <- part[[len - lengthOfValues]]
        stopifnot(len == lengthOfKeys + lengthOfValues)

        # For zip operation, check if corresponding partitions
        # of both RDDs have the same number of elements.
        if (zip && lengthOfKeys != lengthOfValues) {
          stop("Can only zip RDDs with same number of elements ",
               "in each pair of corresponding partitions.")

        if (lengthOfKeys > 1) {
          keys <- part[1 : (lengthOfKeys - 1)]
        } else {
          keys <- list()
        if (lengthOfValues > 1) {
          values <- part[(lengthOfKeys + 1) : (len - 1)]
        } else {
          values <- list()

        if (!zip) {
          return(mergeCompactLists(keys, values))
      } else {
        keys <- part[c(TRUE, FALSE)]
        values <- part[c(FALSE, TRUE)]
        function(k, v) { list(k, v) },
        USE.NAMES = FALSE)
    } else {

  PipelinedRDD(rdd, partitionFunc)

# Convert a named list to struct so that
# SerDe won't confuse between a normal named list and struct
listToStruct <- function(list) {
  stopifnot(class(list) == "list")
  class(list) <- "struct"

# Convert a struct to a named list
structToList <- function(struct) {
  stopifnot(class(list) == "struct")

  class(struct) <- "list"

# Convert a named list to an environment to be passed to JVM
convertNamedListToEnv <- function(namedList) {
  # Make sure each item in the list has a name
  names <- names(namedList)
    if (is.null(names)) {
      length(namedList) == 0
    } else {

  env <- new.env()
  for (name in names) {
    env[[name]] <- namedList[[name]]

# Assign a new environment for attach() and with() methods
assignNewEnv <- function(data) {
  stopifnot(class(data) == "SparkDataFrame")
  cols <- columns(data)
  stopifnot(length(cols) > 0)

  env <- new.env()
  for (i in seq_len(length(cols))) {
    assign(x = cols[i], value = data[, cols[i], drop = F], envir = env)

# Utility function to split by ',' and whitespace, remove empty tokens
splitString <- function(input) {
  Filter(nzchar, unlist(strsplit(input, ",|\\s")))

varargsToJProperties <- function(...) {
  pairs <- list(...)
  props <- newJObject("java.util.Properties")
  if (length(pairs) > 0) {
    lapply(ls(pairs), function(k) {
      callJMethod(props, "setProperty", as.character(k), as.character(pairs[[k]]))

launchScript <- function(script, combinedArgs, wait = FALSE, stdout = "", stderr = "") {
  if (.Platform$OS.type == "windows") {
    scriptWithArgs <- paste(script, combinedArgs, sep = " ")
    # on Windows, intern = F seems to mean output to the console. (documentation on this is missing)
    shell(scriptWithArgs, translate = TRUE, wait = wait, intern = wait)
  } else {
    # http://stat.ethz.ch/R-manual/R-devel/library/base/html/system2.html
    # stdout = F means discard output
    # stdout = "" means to its console (default)
    # Note that the console of this child process might not be the same as the running R process.
    system2(script, combinedArgs, stdout = stdout, wait = wait, stderr = stderr)

getSparkContext <- function() {
  if (!exists(".sparkRjsc", envir = .sparkREnv)) {
    stop("SparkR has not been initialized. Please call sparkR.session()")
  sc <- get(".sparkRjsc", envir = .sparkREnv)

isMasterLocal <- function(master) {
  grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE)

isClientMode <- function(master) {
  grepl("([a-z]+)-client$", master, perl = TRUE)

isSparkRShell <- function() {
  grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)

# Works identically with `callJStatic(...)` but throws a pretty formatted exception.
handledCallJStatic <- function(cls, method, ...) {
  result <- tryCatch(callJStatic(cls, method, ...),
                     error = function(e) {
                       captureJVMException(e, method)

# Works identically with `callJMethod(...)` but throws a pretty formatted exception.
handledCallJMethod <- function(obj, method, ...) {
  result <- tryCatch(callJMethod(obj, method, ...),
                     error = function(e) {
                       captureJVMException(e, method)

captureJVMException <- function(e, method) {
  rawmsg <- as.character(e)
  if (any(grepl("^Error in .*?: ", rawmsg))) {
    # If the exception message starts with "Error in ...", this is possibly
    # "Error in invokeJava(...)". Here, it replaces the characters to
    # `paste("Error in", method, ":")` in order to identify which function
    # was called in JVM side.
    stacktrace <- strsplit(rawmsg, "Error in .*?: ")[[1]]
    rmsg <- paste("Error in", method, ":")
    stacktrace <- paste(rmsg[1], stacktrace[2])
  } else {
    # Otherwise, do not convert the error message just in case.
    stacktrace <- rawmsg

  # StreamingQueryException could wrap an IllegalArgumentException, so look for that first
  if (any(grepl("org.apache.spark.sql.streaming.StreamingQueryException: ",
                stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.streaming.StreamingQueryException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "streaming query error - ", first, call. = FALSE)
  } else if (any(grepl("java.lang.IllegalArgumentException: ", stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "illegal argument - ", first, call. = FALSE)
  } else if (any(grepl("org.apache.spark.sql.AnalysisException: ", stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "analysis error - ", first, call. = FALSE)
  } else
    if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
                  stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "no such database - ", first, call. = FALSE)
  } else
    if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
                  stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "no such table - ", first, call. = FALSE)
  } else if (any(grepl("org.apache.spark.sql.catalyst.parser.ParseException: ",
                       stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.parser.ParseException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "parse error - ", first, call. = FALSE)
  } else {
    stop(stacktrace, call. = FALSE)

# rbind a list of rows with raw (binary) columns
# @param inputData a list of rows, with each row a list
# @return data.frame with raw columns as lists
rbindRaws <- function(inputData) {
  row1 <- inputData[[1]]
  rawcolumns <- ("raw" == sapply(row1, class))

  listmatrix <- do.call(rbind, inputData)
  # A dataframe with all list columns
  out <- as.data.frame(listmatrix)
  out[!rawcolumns] <- lapply(out[!rawcolumns], unlist)

# Get basename without extension from URL
basenameSansExtFromUrl <- function(url) {
  # split by '/'
  splits <- unlist(strsplit(url, "^.+/"))
  last <- tail(splits, 1)
  # this is from file_path_sans_ext
  # first, remove any compression extension
  filename <- sub("[.](gz|bz2|xz)$", "", last)
  # then, strip extension by the last '.'
  sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename)

isAtomicLengthOne <- function(x) {
  is.atomic(x) && length(x) == 1

is_windows <- function() {
  .Platform$OS.type == "windows"

hadoop_home_set <- function() {
  !identical(Sys.getenv("HADOOP_HOME"), "")

windows_with_hadoop <- function() {
  !is_windows() || hadoop_home_set()

# get0 not supported before R 3.2.0
getOne <- function(x, envir, inherits = TRUE, ifnotfound = NULL) {
  mget(x[1L], envir = envir, inherits = inherits, ifnotfound = list(ifnotfound))[[1L]]

# Returns a vector of parent directories, traversing up count times, starting with a full path
# e.g. traverseParentDirs("/Users/user/Library/Caches/spark/spark2.2", 1) should return
# this "/Users/user/Library/Caches/spark/spark2.2"
# and  "/Users/user/Library/Caches/spark"
traverseParentDirs <- function(x, count) {
  if (dirname(x) == x || count <= 0) x else c(x, Recall(dirname(x), count - 1))

