R/install_spark.R

Defines functions spark_conf_file_set_value spark_hive_file_set_value spark_conf_log4j_set_value spark_install_tar spark_install_old_dir spark_install_dir spark_resolve_envpath spark_uninstall spark_install spark_home spark_install_info spark_default_version spark_install_find spark_install_version_expand spark_can_install

Documented in spark_default_version spark_install spark_install_dir spark_install_find spark_install_tar spark_uninstall

# Check if Spark can be installed in this system
spark_can_install <- function() {
  sparkDir <- spark_install_dir()
  if (dir.exists(sparkDir)) {
    file.access(sparkDir, 2) == 0
  } else {
    TRUE
  }
}

spark_install_version_expand <- function(version, installed_only) {
  if (installed_only) {
    versions <- spark_installed_versions()$spark
  }
  else {
    versions <- spark_available_versions(show_minor = TRUE)$spark
  }

  sub_versions <- substr(versions, 1, nchar(version))

  versions <- versions[version == sub_versions]

  if (length(versions) > 0) {
    out <- sort(versions, decreasing = TRUE)[1]
  } else {
    # If the version specified is not a known available version, then don't
    # attempt to exapnd it.
    out <- version
  }
  out
}

#' Find a given Spark installation by version.
#'
#' @inheritParams spark_install
#'
#' @param installed_only Search only the locally installed versions?
#' @param latest Check for latest version?
#' @param hint On failure should the installation code be provided?
#'
#' @keywords internal
#' @export
spark_install_find <- function(version = NULL,
                               hadoop_version = NULL,
                               installed_only = TRUE,
                               latest = FALSE,
                               hint = FALSE) {
  if (identical(grepl("^[0-9]+\\.[0-9]+$", version), TRUE)) {
    version <- spark_install_version_expand(version, installed_only)
  }

  sparkVersion <- version
  hadoopVersion <- hadoop_version
  installedOnly <- installed_only

  versions <- spark_versions(latest = latest)
  if (!is.null(sparkVersion) && sparkVersion == "master") {
    versions <- versions[versions$installed, ]
  } else {
    if (installedOnly) {
      versions <- versions[versions$installed, ]
    }
    versions <- if (is.null(sparkVersion)) versions else versions[versions$spark == sparkVersion, ]
    versions <- if (is.null(hadoopVersion)) versions else versions[versions$hadoop == hadoopVersion, ]
  }

  if (NROW(versions) == 0) {
    warning(
      "The Spark version specified may not be available.\n",
      "Please consider running `spark_available_versions()` to list all known ",
      "available Spark versions."
    )
    component_name <- sprintf(
      "spark-%s-bin-hadoop%s", sparkVersion, hadoopVersion %||% "2.7"
    )
    package_name <- paste0(component_name, ".tgz")
    spark_dir <- spark_install_dir()

    list(
      componentName = component_name,
      packageName = package_name,
      packageRemotePath = sprintf(
        "https://archive.apache.org/dist/spark/spark-%s/%s",
        sparkVersion, package_name
      ),
      packageLocalPath = file.path(spark_dir, package_name),
      sparkDir = spark_dir,
      sparkVersionDir = file.path(spark_install_dir(), component_name)
    )
  } else {
    versions <- versions[with(versions, order(default, spark, hadoop_default, decreasing = TRUE)), ]
    spark_install_info(
      as.character(versions[1, ]$spark),
      as.character(versions[1, ]$hadoop),
      latest = latest
      )
  }
}

#' determine the version that will be used by default if version is NULL
#'
#' @export
#'
#' @keywords internal
spark_default_version <- function() {
  # if we have versions installed then use the same logic as spark_connect to figure out
  # which version we will bind to when we pass version = NULL and hadoop_version = NULL
  if (nrow(spark_installed_versions()) > 0) {
    version <- spark_install_find(version = NULL, hadoop_version = NULL, installed_only = TRUE, latest = FALSE)
    spark <- version$sparkVersion
    hadoop <- version$hadoopVersion
    # otherwise check available versions and take the default
  } else {
    versions <- spark_versions()
    versions <- subset(versions, versions$default == TRUE & versions$hadoop_default == TRUE)
    version <- versions[1, ]
    spark <- version$spark
    hadoop <- version$hadoop
  }

  list(
    spark = spark,
    hadoop = hadoop
  )
}

spark_install_info <- function(sparkVersion = NULL, hadoopVersion = NULL, latest = TRUE) {
  versionInfo <- spark_versions_info(sparkVersion, hadoopVersion, latest = latest)

  componentName <- versionInfo$componentName
  packageName <- versionInfo$packageName
  packageRemotePath <- versionInfo$packageRemotePath

  sparkDir <- c(spark_install_old_dir(), spark_install_dir())

  sparkVersionDir <- file.path(sparkDir, componentName)
  sparkDir <- sparkDir[dir.exists(sparkVersionDir)]
  sparkDir <- if (length(sparkDir) == 0) spark_install_dir() else sparkDir[[1]]

  sparkVersionDir <- file.path(sparkDir, componentName)

  list(
    sparkDir = sparkDir,
    packageLocalPath = file.path(sparkDir, packageName),
    packageRemotePath = packageRemotePath,
    sparkVersionDir = sparkVersionDir,
    sparkConfDir = file.path(sparkVersionDir, "conf"),
    sparkVersion = sparkVersion,
    hadoopVersion = hadoopVersion,
    installed = file.exists(sparkVersionDir)
  )
}

# Do not remove (it's used by the RStudio IDE)
spark_home <- function() {
  home <- Sys.getenv("SPARK_HOME", unset = NA)
  if (is.na(home)) {
    home <- NULL
  }
  home
}

#' Download and install various versions of Spark
#'
#' Install versions of Spark for use with local Spark connections
#'   (i.e. \code{spark_connect(master = "local"})
#'
#' @param version Version of Spark to install. See \code{spark_available_versions} for a list of supported versions
#' @param hadoop_version Version of Hadoop to install. See \code{spark_available_versions} for a list of supported versions
#' @param reset Attempts to reset settings to defaults.
#' @param logging Logging level to configure install. Supported options: "WARN", "INFO"
#' @param verbose Report information as Spark is downloaded / installed
#' @param tarfile Path to TAR file conforming to the pattern spark-###-bin-(hadoop)?### where ###
#' reference spark and hadoop versions respectively.
#'
#' @return List with information about the installed version.
#'
#' @import utils
#' @export
spark_install <- function(version = NULL,
                          hadoop_version = NULL,
                          reset = TRUE,
                          logging = "INFO",
                          verbose = interactive()) {
  installInfo <- spark_install_find(version, hadoop_version, installed_only = FALSE, latest = TRUE)

  if (!dir.exists(installInfo$sparkDir)) {
    dir.create(installInfo$sparkDir, recursive = TRUE)
  }

  if (!dir.exists(installInfo$sparkVersionDir)) {
    if (verbose) {
      fmt <- paste(c(
        "Installing Spark %s for Hadoop %s or later.",
        "Downloading from:\n- '%s'",
        "Installing to:\n- '%s'"
      ), collapse = "\n")

      msg <- sprintf(
        fmt,
        installInfo$sparkVersion,
        installInfo$hadoopVersion,
        installInfo$packageRemotePath,
        aliased_path(installInfo$sparkVersionDir)
      )

      message(msg)
    }

    status <- suppressWarnings(download_file(
      installInfo$packageRemotePath,
      destfile = installInfo$packageLocalPath,
      quiet = !verbose
    ))

    if (status) {
      stopf("Failed to download Spark: download exited with status %s", status)
    }

    untar(
      tarfile = installInfo$packageLocalPath,
      exdir = installInfo$sparkDir,
      tar = "internal"
    )

    unlink(installInfo$packageLocalPath)

    if (verbose) {
      message("Installation complete.")
    }
  } else if (verbose) {
    fmt <- "Spark %s for Hadoop %s or later already installed."
    msg <- sprintf(fmt, installInfo$sparkVersion, installInfo$hadoopVersion)
    message(msg)
  }

  if (!file.exists(installInfo$sparkDir)) {
    stop("Spark version not found.")
  }

  if (!identical(logging, NULL)) {
    tryCatch(
      {
        spark_conf_log4j_set_value(
          installInfo = installInfo,
          reset = reset,
          logging = logging
        )
      },
      error = function(e) {
        warning("Failed to set logging settings")
      }
    )
  }

  hiveSitePath <- file.path(installInfo$sparkConfDir, "hive-site.xml")
  hivePath <- NULL
  if (!file.exists(hiveSitePath) || reset) {
    tryCatch(
      {
        hiveProperties <- list(
          "javax.jdo.option.ConnectionURL" = "jdbc:derby:memory:databaseName=metastore_db;create=true",
          "javax.jdo.option.ConnectionDriverName" = "org.apache.derby.jdbc.EmbeddedDriver"
        )

        if (os_is_windows()) {
          hivePath <- normalizePath(
            file.path(installInfo$sparkVersionDir, "tmp", "hive"),
            mustWork = FALSE,
            winslash = "/"
            )

          hiveProperties <- c(hiveProperties, list(
            "hive.exec.scratchdir" = hivePath,
            "hive.exec.local.scratchdir" = hivePath,
            "hive.metastore.warehouse.dir" = hivePath
          ))
        }

        spark_hive_file_set_value(hiveSitePath, hiveProperties)
      },
      error = function(e) {
        warning("Failed to apply custom hive-site.xml configuration")
      }
    )
  }

  spark_conf <- list()
  if (os_is_windows()) {
    spark_conf[["spark.local.dir"]] <- normalizePath(
      file.path(installInfo$sparkVersionDir, "tmp", "local"),
      mustWork = FALSE,
      winslash = "/"
      )
  }

  if (!is.null(hivePath)) {
    spark_conf[["spark.sql.warehouse.dir"]] <- hivePath
  }

  tryCatch(
    {
      spark_conf_file_set_value(
        installInfo,
        spark_conf,
        reset
      )
    },
    error = function(e) {
      warning("Failed to set spark-defaults.conf settings")
    }
  )

  invisible(installInfo)
}

#' @rdname spark_install
#'
#' @export
spark_uninstall <- function(version, hadoop_version) {
  info <- spark_versions_info(version, hadoop_version)
  sparkDir <- file.path(c(spark_install_old_dir(), spark_install_dir()), info$componentName)
  if (any(dir.exists(sparkDir))) {
    unlink(sparkDir, recursive = TRUE)

    if (all(!(dir.exists(sparkDir)))) {
      message(info$componentName, " successfully uninstalled.")
    } else {
      stop("Failed to completely uninstall ", info$componentName)
    }

    invisible(TRUE)
  } else {
    message(info$componentName, " not found (no uninstall performed)")
    invisible(FALSE)
  }
}

spark_resolve_envpath <- function(path_with_end) {
  if (os_is_windows()) {
    parts <- strsplit(path_with_end, "/")[[1]]
    first <- gsub("%", "", parts[[1]])
    if (nchar(Sys.getenv(first)) > 0) parts[[1]] <- Sys.getenv(first)
    do.call("file.path", as.list(parts))
  }
  else {
    normalizePath(path_with_end, mustWork = FALSE)
  }
}

#' @rdname spark_install
#' @importFrom jsonlite fromJSON
#' @export
spark_install_dir <- function() {
  json_pkg <- system.file("extdata/config.json", package = packageName())
  config <- fromJSON(json_pkg)
  getOption("spark.install.dir", spark_resolve_envpath(config$dirs[[.Platform$OS.type]]))
}

# Used for backwards compatibility with sparklyr 0.5 installation path
spark_install_old_dir <- function() {
  getOption("spark.install.dir", rappdirs::app_dir("spark", "rstudio")$cache())
}

#' @rdname spark_install
#' @export
spark_install_tar <- function(tarfile) {
  if (!file.exists(tarfile)) {
    stop("The file \"", tarfile, "\", does not exist.")
  }

  filePattern <- spark_versions_file_pattern()
  fileName <- basename(tarfile)
  if (length(grep(filePattern, fileName)) == 0) {
    stop(
      "The given file does not conform with the following pattern: ", filePattern
    )
  }

  untar(
    tarfile = tarfile,
    exdir = spark_install_dir(),
    tar = "internal"
  )
}

spark_conf_log4j_set_value <- function(installInfo, properties = NULL,
                                       reset = TRUE, logging = "INFO") {

  log_template <- NULL
  log_properties <- NULL
  log_template_v1 <- "log4j.properties.template"
  log_template_v2 <- "log4j2.properties.template"

  spark_conf_dir <- installInfo$sparkConfDir

  dir_contents <- list.files(spark_conf_dir)

  if(any(dir_contents == log_template_v2)) {
    log_template <- log_template_v2
    log_properties <- list(
      "rootLogger.level" = paste0("rootLogger.level = ", tolower(logging)),
      "rootLogger.appenderRef.file.ref" = "rootLogger.appenderRef.file.ref = File",
      "appender.file.type" = "appender.file.type = File",
      "appender.file.name" = "appender.file.name = File",
      "appender.file.append" = "appender.file.append = true",
      "appender.file.layout.type" = "appender.file.layout.type = PatternLayout",
      "appender.file.layout.pattern" =  "appender.file.layout.pattern = %d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex",
      "logger.jetty.name" = "logger.jetty.name = org.eclipse.jetty",
      "logger.jetty.level" = "logger.jetty.level = warn"
    )
    if(os_is_windows()){
      log_properties <- c(
        log_properties,
        "appender.file.fileName" = "appender.file.fileName = logs/log4j.spark.log"
        )
      }
  } else {
    if(any(dir_contents == log_template_v1)) {
      log_template <- log_template_v1
      log_properties <- list(
        "log4j.rootCategory" = paste0("log4j.rootCategory=", logging, ",console,localfile"),
        "log4j.appender.localfile" = "log4j.appender.localfile=org.apache.log4j.DailyRollingFileAppender",
        "log4j.appender.localfile.layout" = "log4j.appender.localfile.layout=org.apache.log4j.PatternLayout",
        "log4j.appender.localfile.layout.ConversionPattern" = "log4j.appender.localfile.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n"
      )
      if(os_is_windows()){
        log_properties <- c(
          log_properties,
          "log4j.appender.localfile.file" = "log4j.appender.localfile.file=logs/log4j.spark.log"
        )
      }
    }
  }

  if(is.null(log_template)) stop("No log4j template file found")

  if(!is.null(properties)) log_properties <- properties

  log_file <- substr(log_template, 1, nchar(log_template) - 9)

  log_path <- file.path(spark_conf_dir, log_file)
  log_path_template <- file.path(spark_conf_dir, log_template)

  if (!file.exists(log_path) || reset) {
    file.copy(
      from = log_path_template,
      to = log_path,
      overwrite = TRUE
      )
  }

  file_log <- file(log_path)
  lines <- readLines(file_log)

  lines[[length(lines) + 1]] <- ""

  lapply(names(log_properties), function(property) {
    value <- log_properties[[property]]
    pattern <- paste(property, "=.*", sep = "")

    if (length(grep(pattern, lines)) > 0) {
      lines <<- gsub(pattern, value, lines, perl = TRUE)
    }
    else {
      lines[[length(lines) + 1]] <<- value
    }
  })

  writeLines(lines, file_log)
  close(file_log)
}

spark_hive_file_set_value <- function(hivePath, properties) {
  lines <- list()
  lines[[length(lines) + 1]] <- "<configuration>"

  lapply(names(properties), function(property) {
    value <- properties[[property]]

    lines[[length(lines) + 1]] <<- "  <property>"
    lines[[length(lines) + 1]] <<- paste0("    <name>", property, "</name>")
    lines[[length(lines) + 1]] <<- paste0("    <value>", value, "</value>")
    lines[[length(lines) + 1]] <<- "  </property>"
  })

  lines[[length(lines) + 1]] <- "</configuration>"

  hiveFile <- file(hivePath)
  writeLines(unlist(lines), hiveFile)
  close(hiveFile)
}

spark_conf_file_set_value <- function(installInfo, properties, reset) {
  confPropertiesPath <- file.path(installInfo$sparkConfDir, "spark-defaults.conf")
  if (!file.exists(confPropertiesPath) || reset) {
    confTemplatePath <- file.path(installInfo$sparkConfDir, "spark-defaults.conf.template")
    file.copy(confTemplatePath, confPropertiesPath, overwrite = TRUE)
  }

  confPropertiesFile <- file(confPropertiesPath)
  lines <- readLines(confPropertiesFile)

  lines[[length(lines) + 1]] <- ""

  lapply(names(properties), function(property) {
    value <- paste0(property, "     ", properties[[property]])
    pattern <- paste(property, ".*", sep = "")

    if (length(grep(pattern, lines)) > 0) {
      lines <<- gsub(pattern, value, lines, perl = TRUE)
    }
    else {
      lines[[length(lines) + 1]] <<- value
    }
  })

  writeLines(lines, confPropertiesFile)
  close(confPropertiesFile)
}

Try the sparklyr package in your browser

Any scripts or data that you put into this service are public.

sparklyr documentation built on Nov. 2, 2023, 5:09 p.m.