R/utils.R

Defines functions traverseParentDirs getOne windows_with_hadoop hadoop_home_set is_windows isAtomicLengthOne basenameSansExtFromUrl rbindRaws captureJVMException handledCallJMethod handledCallJStatic isSparkRShell isClientMode isMasterLocal getSparkContext launchScript varargsToJProperties splitString assignNewEnv convertNamedListToEnv structToList listToStruct mergePartitions appendPartitionLengths cleanClosure processClosure numToInt storageLevelToString getStorageLevel varargsToStrEnv varargsToEnv overrideEnvs convertEnvsToList updateOrCreatePair joinTaggedList mergeCompactLists genCompactLists sortKeyValueList initAccumulator addItemToAccumulator serializeToString serializeToBytes mult31AndAdd wrapInt hashCode isRDD convertJListToRList

Documented in hashCode

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Utilities and Helpers

# Given a JList<T>, returns an R list containing the same elements, the number
# of which is optionally upper bounded by `logicalUpperBound` (by default,
# return all elements).  Takes care of deserializations and type conversions.
convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
  serializedMode = "byte") {
  arrSize <- callJMethod(jList, "size")

  # Datasets with serializedMode == "string" (such as an RDD directly generated by textFile()):
  # each partition is not dense-packed into one Array[Byte], and `arrSize`
  # here corresponds to number of logical elements. Thus we can prune here.
  if (serializedMode == "string" && !is.null(logicalUpperBound)) {
    arrSize <- min(arrSize, logicalUpperBound)
  }

  results <- if (arrSize > 0) {
    lapply(0 : (arrSize - 1),
          function(index) {
            obj <- callJMethod(jList, "get", as.integer(index))

            # Assume it is either an R object or a Java obj ref.
            if (inherits(obj, "jobj")) {
              if (isInstanceOf(obj, "scala.Tuple2")) {
                # JavaPairRDD[Array[Byte], Array[Byte]].

                keyBytes <- callJMethod(obj, "_1")
                valBytes <- callJMethod(obj, "_2")
                res <- list(unserialize(keyBytes),
                  unserialize(valBytes))
              } else {
                stop("utils.R: convertJListToRList only supports ",
                  "RDD[Array[Byte]] and ",
                  "JavaPairRDD[Array[Byte], Array[Byte]] for now")
              }
            } else {
              if (inherits(obj, "raw")) {
                if (serializedMode == "byte") {
                  # RDD[Array[Byte]]. `obj` is a whole partition.
                  res <- unserialize(obj)
                  # For serialized datasets, `obj` (and `rRaw`) here corresponds to
                  # one whole partition dense-packed together. We deserialize the
                  # whole partition first, then cap the number of elements to be returned.
                } else if (serializedMode == "row") {
                  res <- readRowList(obj)
                  # For DataFrames that have been converted to RRDDs, we call readRowList
                  # which will read in each row of the RRDD as a list and deserialize
                  # each element.
                  flatten <<- FALSE
                  # Use global assignment to change the flatten flag. This means
                  # we don't have to worry about the default argument in other functions
                  # e.g. collect
                }
                # TODO: is it possible to distinguish element boundary so that we can
                # unserialize only what we need?
                if (!is.null(logicalUpperBound)) {
                  res <- head(res, n = logicalUpperBound)
                }
              } else {
                # obj is of a primitive Java type, is simplified to R's
                # corresponding type.
                res <- list(obj)
              }
            }
            res
          })
  } else {
    list()
  }

  if (flatten) {
    as.list(unlist(results, recursive = FALSE))
  } else {
    as.list(results)
  }
}

# Returns TRUE if `name` refers to an RDD in the given environment `env`
isRDD <- function(name, env) {
  obj <- get(name, envir = env)
  inherits(obj, "RDD")
}

#' Compute the hashCode of an object
#'
#' Java-style function to compute the hashCode for the given object. Returns
#' an integer value.
#'
#' @details
#' This only works for integer, numeric and character types right now.
#'
#' @param key the object to be hashed
#' @return the hash code as an integer
#' @examples
#'\dontrun{
#' hashCode(1L) # 1
#' hashCode(1.0) # 1072693248
#' hashCode("1") # 49
#'}
#' @note hashCode since 1.4.0
hashCode <- function(key) {
  if (class(key) == "integer") {
    as.integer(key[[1]])
  } else if (class(key) == "numeric") {
    # Convert the double to long and then calculate the hash code
    rawVec <- writeBin(key[[1]], con = raw())
    intBits <- packBits(rawToBits(rawVec), "integer")
    as.integer(bitwXor(intBits[2], intBits[1]))
  } else if (class(key) == "character") {
    # TODO: SPARK-7839 means we might not have the native library available
    n <- nchar(key)
    if (n == 0) {
      0L
    } else {
      asciiVals <- sapply(charToRaw(key), function(x) { strtoi(x, 16L) })
      hashC <- 0
      for (k in seq_len(length(asciiVals))) {
        hashC <- mult31AndAdd(hashC, asciiVals[k])
      }
      as.integer(hashC)
    }
  } else {
    warning("Could not hash object, returning 0")
    as.integer(0)
  }
}

# Helper function used to wrap a 'numeric' value to integer bounds.
# Useful for implementing C-like integer arithmetic
wrapInt <- function(value) {
  if (value > .Machine$integer.max) {
    value <- value - 2 * .Machine$integer.max - 2
  } else if (value < -1 * .Machine$integer.max) {
    value <- 2 * .Machine$integer.max + value + 2
  }
  value
}

# Multiply `val` by 31 and add `addVal` to the result. Ensures that
# integer-overflows are handled at every step.
#
# TODO: this function does not handle integer overflow well
mult31AndAdd <- function(val, addVal) {
  vec <- c(bitwShiftL(val, c(4, 3, 2, 1, 0)), addVal)
  vec[is.na(vec)] <- 0
  Reduce(function(a, b) {
          wrapInt(as.numeric(a) + as.numeric(b))
         },
         vec)
}

# Create a new RDD with serializedMode == "byte".
# Return itself if already in "byte" format.
serializeToBytes <- function(rdd) {
  if (!inherits(rdd, "RDD")) {
    stop("Argument 'rdd' is not an RDD type.")
  }
  if (getSerializedMode(rdd) != "byte") {
    ser.rdd <- lapply(rdd, function(x) { x })
    return(ser.rdd)
  } else {
    return(rdd)
  }
}

# Create a new RDD with serializedMode == "string".
# Return itself if already in "string" format.
serializeToString <- function(rdd) {
  if (!inherits(rdd, "RDD")) {
    stop("Argument 'rdd' is not an RDD type.")
  }
  if (getSerializedMode(rdd) != "string") {
    ser.rdd <- lapply(rdd, function(x) { toString(x) })
    # force it to create jrdd using "string"
    getJRDD(ser.rdd, serializedMode = "string")
    return(ser.rdd)
  } else {
    return(rdd)
  }
}

# Fast append to list by using an accumulator.
# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
#
# The accumulator should has three fields size, counter and data.
# This function amortizes the allocation cost by doubling
# the size of the list every time it fills up.
addItemToAccumulator <- function(acc, item) {
  if (acc$counter == acc$size) {
    acc$size <- acc$size * 2
    length(acc$data) <- acc$size
  }
  acc$counter <- acc$counter + 1
  acc$data[[acc$counter]] <- item
}

initAccumulator <- function() {
  acc <- new.env()
  acc$counter <- 0
  acc$data <- list(NULL)
  acc$size <- 1
  acc
}

# Utility function to sort a list of key value pairs
# Used in unit tests
sortKeyValueList <- function(kv_list, decreasing = FALSE) {
  keys <- sapply(kv_list, function(x) x[[1]])
  kv_list[order(keys, decreasing = decreasing)]
}

# Utility function to generate compact R lists from grouped rdd
# Used in Join-family functions
# param:
#   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
#   cnull Boolean list where each element determines whether the corresponding list should
#         be converted to list(NULL)
genCompactLists <- function(tagged_list, cnull) {
  len <- length(tagged_list)
  lists <- list(vector("list", len), vector("list", len))
  index <- list(1, 1)

  for (x in tagged_list) {
    tag <- x[[1]]
    idx <- index[[tag]]
    lists[[tag]][[idx]] <- x[[2]]
    index[[tag]] <- idx + 1
  }

  len <- lapply(index, function(x) x - 1)
  for (i in (1:2)) {
    if (cnull[[i]] && len[[i]] == 0) {
      lists[[i]] <- list(NULL)
    } else {
      length(lists[[i]]) <- len[[i]]
    }
  }

  lists
}

# Utility function to merge compact R lists
# Used in Join-family functions
# param:
#   left/right Two compact lists ready for Cartesian product
mergeCompactLists <- function(left, right) {
  result <- list()
  length(result) <- length(left) * length(right)
  index <- 1
  for (i in left) {
    for (j in right) {
      result[[index]] <- list(i, j)
      index <- index + 1
    }
  }
  result
}

# Utility function to wrapper above two operations
# Used in Join-family functions
# param (same as genCompactLists):
#   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
#   cnull Boolean list where each element determines whether the corresponding list should
#         be converted to list(NULL)
joinTaggedList <- function(tagged_list, cnull) {
  lists <- genCompactLists(tagged_list, cnull)
  mergeCompactLists(lists[[1]], lists[[2]])
}

# Utility function to reduce a key-value list with predicate
# Used in *ByKey functions
# param
#   pair key-value pair
#   keys/vals env of key/value with hashes
#   updateOrCreatePred predicate function
#   updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
#   createFn create function for new pair, similar with `createCombiner` @combinebykey
updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
  # assume hashVal bind to `$hash`, key/val with index 1/2
  hashVal <- pair$hash
  key <- pair[[1]]
  val <- pair[[2]]
  if (updateOrCreatePred(pair)) {
    assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
  } else {
    assign(hashVal, do.call(createFn, list(val)), envir = vals)
    assign(hashVal, key, envir = keys)
  }
}

# Utility function to convert key&values envs into key-val list
convertEnvsToList <- function(keys, vals) {
  lapply(ls(keys),
         function(name) {
           list(keys[[name]], vals[[name]])
         })
}

# Utility function to merge 2 environments with the second overriding values in the first
# env1 is changed in place
overrideEnvs <- function(env1, env2) {
  lapply(ls(env2),
         function(name) {
           env1[[name]] <- env2[[name]]
         })
}

# Utility function to capture the varargs into environment object
varargsToEnv <- function(...) {
  # Based on http://stackoverflow.com/a/3057419/4577954
  pairs <- list(...)
  env <- new.env()
  for (name in names(pairs)) {
    env[[name]] <- pairs[[name]]
  }
  env
}

# Utility function to capture the varargs into environment object but all values are converted
# into string.
varargsToStrEnv <- function(...) {
  pairs <- list(...)
  nameList <- names(pairs)
  env <- new.env()
  ignoredNames <- list()

  if (is.null(nameList)) {
    # When all arguments are not named, names(..) returns NULL.
    ignoredNames <- pairs
  } else {
    for (i in seq_along(pairs)) {
      name <- nameList[i]
      value <- pairs[i]
      if (identical(name, "")) {
        # When some of arguments are not named, name is "".
        ignoredNames <- append(ignoredNames, value)
      } else {
        value <- pairs[[name]]
        if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
          stop("Unsupported type for ", name, " : ", toString(class(value)), ". ",
               "Supported types are logical, numeric, character and NULL.", call. = FALSE)
        }
        if (is.logical(value)) {
          env[[name]] <- tolower(as.character(value))
        } else if (is.null(value)) {
          env[[name]] <- value
        } else {
          env[[name]] <- as.character(value)
        }
      }
    }
  }

  if (length(ignoredNames) != 0) {
    warning("Unnamed arguments ignored: ", toString(ignoredNames), ".", call. = FALSE)
  }
  env
}

getStorageLevel <- function(newLevel = c("DISK_ONLY",
                                         "DISK_ONLY_2",
                                         "DISK_ONLY_3",
                                         "MEMORY_AND_DISK",
                                         "MEMORY_AND_DISK_2",
                                         "MEMORY_AND_DISK_SER",
                                         "MEMORY_AND_DISK_SER_2",
                                         "MEMORY_ONLY",
                                         "MEMORY_ONLY_2",
                                         "MEMORY_ONLY_SER",
                                         "MEMORY_ONLY_SER_2",
                                         "OFF_HEAP")) {
  match.arg(newLevel)
  storageLevelClass <- "org.apache.spark.storage.StorageLevel"
  storageLevel <- switch(newLevel,
                         "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
                         "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
                         "DISK_ONLY_3" = callJStatic(storageLevelClass, "DISK_ONLY_3"),
                         "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
                         "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
                         "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
                                                             "MEMORY_AND_DISK_SER"),
                         "MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
                                                               "MEMORY_AND_DISK_SER_2"),
                         "MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
                         "MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
                         "MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
                         "MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
                         "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}

storageLevelToString <- function(levelObj) {
  useDisk <- callJMethod(levelObj, "useDisk")
  useMemory <- callJMethod(levelObj, "useMemory")
  useOffHeap <- callJMethod(levelObj, "useOffHeap")
  deserialized <- callJMethod(levelObj, "deserialized")
  replication <- callJMethod(levelObj, "replication")
  shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
    "NONE"
  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
    "DISK_ONLY"
  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
    "DISK_ONLY_2"
  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 3) {
    "DISK_ONLY_3"
  } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
    "MEMORY_ONLY"
  } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
    "MEMORY_ONLY_2"
  } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
    "MEMORY_ONLY_SER"
  } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
    "MEMORY_ONLY_SER_2"
  } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
    "MEMORY_AND_DISK"
  } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
    "MEMORY_AND_DISK_2"
  } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
    "MEMORY_AND_DISK_SER"
  } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
    "MEMORY_AND_DISK_SER_2"
  } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
    "OFF_HEAP"
  } else {
    NULL
  }
  fullInfo <- callJMethod(levelObj, "toString")
  if (is.null(shortName)) {
    fullInfo
  } else {
    paste(shortName, "-", fullInfo)
  }
}

# Utility function for functions where an argument needs to be integer but we want to allow
# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
numToInt <- function(num) {
  if (as.integer(num) != num) {
    warning("Coercing ", as.list(sys.call())[[2L]], " to integer.")
  }
  as.integer(num)
}

# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
# user defined function (UDF), and to examine variables in the UDF to decide
# if their values should be included in the new function environment.
# param
#   node The current AST node in the traversal.
#   oldEnv The original function environment.
#   defVars An Accumulator of variables names defined in the function's calling environment,
#           including function argument and local variable names.
#   checkedFunc An environment of function objects examined during cleanClosure. It can
#               be considered as a "name"-to-"list of functions" mapping.
#   newEnv A new function environment to store necessary function dependencies, an output argument.
processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
  nodeLen <- length(node)

  if (nodeLen > 1 && typeof(node) == "language") {
    # Recursive case: current AST node is an internal node, check for its children.
    if (length(node[[1]]) > 1) {
      for (i in 1:nodeLen) {
        processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
      }
    } else {
      # if node[[1]] is length of 1, check for some R special functions.
      nodeChar <- as.character(node[[1]])
      if (nodeChar == "{" || nodeChar == "(") {
        # Skip start symbol.
        for (i in 2:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
        }
      } else if (nodeChar == "<-" || nodeChar == "=" ||
                   nodeChar == "<<-") {
        # Assignment Ops.
        defVar <- node[[2]]
        if (length(defVar) == 1 && typeof(defVar) == "symbol") {
          # Add the defined variable name into defVars.
          addItemToAccumulator(defVars, as.character(defVar))
        } else {
          processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
        }
        for (i in 3:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
        }
      } else if (nodeChar == "function") {
        # Function definition.
        # Add parameter names.
        newArgs <- names(node[[2]])
        lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) })
        for (i in 3:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
        }
      } else if (nodeChar == "$") {
        # Skip the field.
        processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
      } else if (nodeChar == "::" || nodeChar == ":::") {
        processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv)
      } else {
        for (i in 1:nodeLen) {
          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
        }
      }
    }
  } else if (nodeLen == 1 &&
               (typeof(node) == "symbol" || typeof(node) == "language")) {
    # Base case: current AST node is a leaf node and a symbol or a function call.
    nodeChar <- as.character(node)
    if (!nodeChar %in% defVars$data) {
      # Not a function parameter or local variable.
      func.env <- oldEnv
      topEnv <- parent.env(.GlobalEnv)
      # Search in function environment, and function's enclosing environments
      # up to global environment. There is no need to look into package environments
      # above the global or namespace environment that is not SparkR below the global,
      # as they are assumed to be loaded on workers.
      while (!identical(func.env, topEnv)) {
        # Namespaces other than "SparkR" will not be searched.
        if (!isNamespace(func.env) ||
            (getNamespaceName(func.env) == "SparkR" &&
               !(nodeChar %in% getNamespaceExports("SparkR")) &&
                  # Note that generic S4 methods should not be set to the environment of
                  # cleaned closure. It does not work with R 4.0.0+. See also SPARK-31918.
                  nodeChar != "" && !methods::isGeneric(nodeChar, func.env))) {
          # Only include SparkR internals.

          # Set parameter 'inherits' to FALSE since we do not need to search in
          # attached package environments.
          if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE),
                       error = function(e) { FALSE })) {
            obj <- get(nodeChar, envir = func.env, inherits = FALSE)
            if (is.function(obj)) {
              # If the node is a function call.
              funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
                               ifnotfound = list(list(NULL)))[[1]]
              found <- sapply(funcList, function(func) {
                ifelse(
                  identical(func, obj) &&
                    # Also check if the parent environment is identical to current parent
                    identical(parent.env(environment(func)), func.env),
                  TRUE, FALSE)
              })
              if (sum(found) > 0) {
                # If function has been examined ignore
                break
              }
              # Function has not been examined, record it and recursively clean its closure.
              assign(nodeChar,
                     if (is.null(funcList[[1]])) {
                       list(obj)
                     } else {
                       append(funcList, obj)
                     },
                     envir = checkedFuncs)
              obj <- cleanClosure(obj, checkedFuncs)
            }
            assign(nodeChar, obj, envir = newEnv)
            break
          }
        }

        # Continue to search in enclosure.
        func.env <- parent.env(func.env)
      }
    }
  }
}

# Utility function to get user defined function (UDF) dependencies (closure).
# More specifically, this function captures the values of free variables defined
# outside a UDF, and stores them in the function's environment.
# param
#   func A function whose closure needs to be captured.
#   checkedFunc An environment of function objects examined during cleanClosure. It can be
#               considered as a "name"-to-"list of functions" mapping.
# return value
#   a new version of func that has a correct environment (closure).
cleanClosure <- function(func, checkedFuncs = new.env()) {
  if (is.function(func)) {
    newEnv <- new.env(parent = .GlobalEnv)
    func.body <- body(func)
    oldEnv <- environment(func)
    # defVars is an Accumulator of variables names defined in the function's calling
    # environment. First, function's arguments are added to defVars.
    defVars <- initAccumulator()
    argNames <- names(as.list(args(func)))
    for (i in 1:(length(argNames) - 1)) {
      # Remove the ending NULL in pairlist.
      addItemToAccumulator(defVars, argNames[i])
    }
    # Recursively examine variables in the function body.
    processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv)
    environment(func) <- newEnv
  }
  func
}

# Append partition lengths to each partition in two input RDDs if needed.
# param
#   x An RDD.
#   Other An RDD.
# return value
#   A list of two result RDDs.
appendPartitionLengths <- function(x, other) {
  if (getSerializedMode(x) != getSerializedMode(other) ||
      getSerializedMode(x) == "byte") {
    # Append the number of elements in each partition to that partition so that we can later
    # know the boundary of elements from x and other.
    #
    # Note that this appending also serves the purpose of reserialization, because even if
    # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
    # as a single byte array. For example, partitions of an RDD generated from partitionBy()
    # may be encoded as multiple byte arrays.
    appendLength <- function(part) {
      len <- length(part)
      part[[len + 1]] <- len + 1
      part
    }
    x <- lapplyPartition(x, appendLength)
    other <- lapplyPartition(other, appendLength)
  }
  list(x, other)
}

# Perform zip or cartesian between elements from two RDDs in each partition
# param
#   rdd An RDD.
#   zip A boolean flag indicating this call is for zip operation or not.
# return value
#   A result RDD.
mergePartitions <- function(rdd, zip) {
  serializerMode <- getSerializedMode(rdd)
  partitionFunc <- function(partIndex, part) {
    len <- length(part)
    if (len > 0) {
      if (serializerMode == "byte") {
        lengthOfValues <- part[[len]]
        lengthOfKeys <- part[[len - lengthOfValues]]
        stopifnot(len == lengthOfKeys + lengthOfValues)

        # For zip operation, check if corresponding partitions
        # of both RDDs have the same number of elements.
        if (zip && lengthOfKeys != lengthOfValues) {
          stop("Can only zip RDDs with same number of elements ",
               "in each pair of corresponding partitions.")
        }

        if (lengthOfKeys > 1) {
          keys <- part[1 : (lengthOfKeys - 1)]
        } else {
          keys <- list()
        }
        if (lengthOfValues > 1) {
          values <- part[(lengthOfKeys + 1) : (len - 1)]
        } else {
          values <- list()
        }

        if (!zip) {
          return(mergeCompactLists(keys, values))
        }
      } else {
        keys <- part[c(TRUE, FALSE)]
        values <- part[c(FALSE, TRUE)]
      }
      mapply(
        function(k, v) { list(k, v) },
        keys,
        values,
        SIMPLIFY = FALSE,
        USE.NAMES = FALSE)
    } else {
      part
    }
  }

  PipelinedRDD(rdd, partitionFunc)
}

# Convert a named list to struct so that
# SerDe won't confuse between a normal named list and struct
listToStruct <- function(list) {
  stopifnot(class(list) == "list")
  stopifnot(!is.null(names(list)))
  class(list) <- "struct"
  list
}

# Convert a struct to a named list
structToList <- function(struct) {
  stopifnot(class(list) == "struct")

  class(struct) <- "list"
  struct
}

# Convert a named list to an environment to be passed to JVM
convertNamedListToEnv <- function(namedList) {
  # Make sure each item in the list has a name
  names <- names(namedList)
  stopifnot(
    if (is.null(names)) {
      length(namedList) == 0
    } else {
      !any(is.na(names))
    })

  env <- new.env()
  for (name in names) {
    env[[name]] <- namedList[[name]]
  }
  env
}

# Assign a new environment for attach() and with() methods
assignNewEnv <- function(data) {
  stopifnot(class(data) == "SparkDataFrame")
  cols <- columns(data)
  stopifnot(length(cols) > 0)

  env <- new.env()
  for (i in seq_len(length(cols))) {
    assign(x = cols[i], value = data[, cols[i], drop = F], envir = env)
  }
  env
}

# Utility function to split by ',' and whitespace, remove empty tokens
splitString <- function(input) {
  Filter(nzchar, unlist(strsplit(input, ",|\\s")))
}

varargsToJProperties <- function(...) {
  pairs <- list(...)
  props <- newJObject("java.util.Properties")
  if (length(pairs) > 0) {
    lapply(ls(pairs), function(k) {
      callJMethod(props, "setProperty", as.character(k), as.character(pairs[[k]]))
    })
  }
  props
}

launchScript <- function(script, combinedArgs, wait = FALSE, stdout = "", stderr = "") {
  if (.Platform$OS.type == "windows") {
    scriptWithArgs <- paste(script, combinedArgs, sep = " ")
    # on Windows, intern = F seems to mean output to the console. (documentation on this is missing)
    shell(scriptWithArgs, translate = TRUE, wait = wait, intern = wait)
  } else {
    # http://stat.ethz.ch/R-manual/R-devel/library/base/html/system2.html
    # stdout = F means discard output
    # stdout = "" means to its console (default)
    # Note that the console of this child process might not be the same as the running R process.
    system2(script, combinedArgs, stdout = stdout, wait = wait, stderr = stderr)
  }
}

getSparkContext <- function() {
  if (!exists(".sparkRjsc", envir = .sparkREnv)) {
    stop("SparkR has not been initialized. Please call sparkR.session()")
  }
  sc <- get(".sparkRjsc", envir = .sparkREnv)
  sc
}

isMasterLocal <- function(master) {
  grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE)
}

isClientMode <- function(master) {
  grepl("([a-z]+)-client$", master, perl = TRUE)
}

isSparkRShell <- function() {
  grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}

# Works identically with `callJStatic(...)` but throws a pretty formatted exception.
handledCallJStatic <- function(cls, method, ...) {
  result <- tryCatch(callJStatic(cls, method, ...),
                     error = function(e) {
                       captureJVMException(e, method)
                     })
  result
}

# Works identically with `callJMethod(...)` but throws a pretty formatted exception.
handledCallJMethod <- function(obj, method, ...) {
  result <- tryCatch(callJMethod(obj, method, ...),
                     error = function(e) {
                       captureJVMException(e, method)
                     })
  result
}

captureJVMException <- function(e, method) {
  rawmsg <- as.character(e)
  if (any(grepl("^Error in .*?: ", rawmsg))) {
    # If the exception message starts with "Error in ...", this is possibly
    # "Error in invokeJava(...)". Here, it replaces the characters to
    # `paste("Error in", method, ":")` in order to identify which function
    # was called in JVM side.
    stacktrace <- strsplit(rawmsg, "Error in .*?: ")[[1]]
    rmsg <- paste("Error in", method, ":")
    stacktrace <- paste(rmsg[1], stacktrace[2])
  } else {
    # Otherwise, do not convert the error message just in case.
    stacktrace <- rawmsg
  }

  # StreamingQueryException could wrap an IllegalArgumentException, so look for that first
  if (any(grepl("org.apache.spark.sql.streaming.StreamingQueryException: ",
                stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.streaming.StreamingQueryException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "streaming query error - ", first, call. = FALSE)
  } else if (any(grepl("java.lang.IllegalArgumentException: ", stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "illegal argument - ", first, call. = FALSE)
  } else if (any(grepl("org.apache.spark.sql.AnalysisException: ", stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "analysis error - ", first, call. = FALSE)
  } else
    if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
                  stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "no such database - ", first, call. = FALSE)
  } else
    if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
                  stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "no such table - ", first, call. = FALSE)
  } else if (any(grepl("org.apache.spark.sql.catalyst.parser.ParseException: ",
                       stacktrace, fixed = TRUE))) {
    msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.parser.ParseException: ",
                    fixed = TRUE)[[1]]
    # Extract "Error in ..." message.
    rmsg <- msg[1]
    # Extract the first message of JVM exception.
    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
    stop(rmsg, "parse error - ", first, call. = FALSE)
  } else {
    stop(stacktrace, call. = FALSE)
  }
}

# rbind a list of rows with raw (binary) columns
#
# @param inputData a list of rows, with each row a list
# @return data.frame with raw columns as lists
rbindRaws <- function(inputData) {
  row1 <- inputData[[1]]
  rawcolumns <- ("raw" == sapply(row1, class))

  listmatrix <- do.call(rbind, inputData)
  # A dataframe with all list columns
  out <- as.data.frame(listmatrix)
  out[!rawcolumns] <- lapply(out[!rawcolumns], unlist)
  out
}

# Get basename without extension from URL
basenameSansExtFromUrl <- function(url) {
  # split by '/'
  splits <- unlist(strsplit(url, "^.+/"))
  last <- tail(splits, 1)
  # this is from file_path_sans_ext
  # first, remove any compression extension
  filename <- sub("[.](gz|bz2|xz)$", "", last)
  # then, strip extension by the last '.'
  sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename)
}

isAtomicLengthOne <- function(x) {
  is.atomic(x) && length(x) == 1
}

is_windows <- function() {
  .Platform$OS.type == "windows"
}

hadoop_home_set <- function() {
  !identical(Sys.getenv("HADOOP_HOME"), "")
}

windows_with_hadoop <- function() {
  !is_windows() || hadoop_home_set()
}

# get0 not supported before R 3.2.0
getOne <- function(x, envir, inherits = TRUE, ifnotfound = NULL) {
  mget(x[1L], envir = envir, inherits = inherits, ifnotfound = list(ifnotfound))[[1L]]
}

# Returns a vector of parent directories, traversing up count times, starting with a full path
# e.g. traverseParentDirs("/Users/user/Library/Caches/spark/spark2.2", 1) should return
# this "/Users/user/Library/Caches/spark/spark2.2"
# and  "/Users/user/Library/Caches/spark"
traverseParentDirs <- function(x, count) {
  if (dirname(x) == x || count <= 0) x else c(x, Recall(dirname(x), count - 1))
}

Try the SparkR package in your browser

Any scripts or data that you put into this service are public.

SparkR documentation built on June 3, 2021, 5:05 p.m.