spark_yarn_cluster_get_conf_property <- function(scon) {
spark_yarn_get_conf_property(scon)
}
spark_yarn_cluster_get_app_id <- function(config, start_time, rm_webapp) {
property <- "id"
waitSeconds <- spark_config_value(config, "sparklyr.yarn.cluster.start.timeout", 30)
commandStart <- Sys.time()
propertyValue <- NULL
yarnApps <- NULL
appLookupPrefix <- spark_config_value(config, "sparklyr.yarn.cluster.lookup.prefix", "sparklyr")
appLoookupUser <- if ("USER" %in% names(Sys.getenv())) Sys.getenv()[["USER"]] else spark_config_value(config, "sparklyr.yarn.cluster.lookup.username", NULL)
appLookupUseUser <- spark_config_value(config, "sparklyr.yarn.cluster.lookup.byname", !is.null(appLoookupUser))
resourceManagerQuery <- paste0(
rm_webapp,
"/ws/v1/cluster/apps?startedTimeBegin=",
start_time,
"&applicationType=SPARK",
if (appLookupUseUser) paste0("&user=", appLoookupUser) else ""
)
while (length(propertyValue) == 0 && commandStart + waitSeconds > Sys.time()) {
resourceManagerResponce <- httr::GET(resourceManagerQuery)
yarnApps <- httr::content(resourceManagerResponce)
if (appLookupUseUser) {
newSparklyrApps <- Filter(function(e) grepl(appLoookupUser, e[[1]]$user), yarnApps$apps)
}
else {
newSparklyrApps <- Filter(function(e) grepl(paste0(appLookupPrefix, ".*"), e[[1]]$name), yarnApps$apps)
}
if (length(newSparklyrApps) > 1) {
stop("Multiple sparklyr apps submitted at once to this yarn cluster, aborting, please retry")
}
if (length(newSparklyrApps) > 0 && length(newSparklyrApps[[1]]) > 0) {
newSparklyrApp <- newSparklyrApps[[1]][[1]]
if (property %in% names(newSparklyrApp)) {
propertyValue <- newSparklyrApp[[property]]
}
}
if (length(propertyValue) == 0) Sys.sleep(1)
}
if (length(propertyValue) == 0) {
withr::with_options(list(
warning.length = 8000
), {
stop(
"Failed to retrieve new sparklyr yarn application from ",
resourceManagerQuery, " after ", format(Sys.time() - commandStart, digits = 1),
", check yarn.resourcemanager.webapp.address under yarn-site.xml. Last result: ",
yarnApps
)
})
}
propertyValue
}
spark_yarn_cluster_get_app_property <- function(rm_webapp, appId, property, errorMessage = "") {
resourceManagerQuery <- paste0(
rm_webapp,
"/ws/v1/cluster/apps/",
appId
)
resourceManagerResponce <- httr::GET(resourceManagerQuery)
yarnApp <- httr::content(resourceManagerResponce)
if (!"app" %in% names(yarnApp) || !property %in% names(yarnApp$app)) {
withr::with_options(list(
warning.length = 8000
), {
stop(
"Failed to retrieve '", property, "' from ", appId, errorMessage, ". Last result: ",
yarnApp
)
})
}
yarnApp$app[[property]]
}
spark_yarn_cluster_while_app <- function(rm_webapp, appId, waitSeconds, condition) {
commandStart <- Sys.time()
resourceManagerQuery <- paste0(
rm_webapp,
"/ws/v1/cluster/apps/",
appId
)
while (commandStart + waitSeconds > Sys.time()) {
resourceManagerResponce <- httr::GET(resourceManagerQuery)
yarnResponse <- httr::content(resourceManagerResponce)
if (!condition(yarnResponse$app)) break
sleepTime <- ifelse(Sys.time() - commandStart > 60, 30, 1)
Sys.sleep(sleepTime)
}
}
spark_yarn_cluster_resource_manager_is_online <- function(rm_webapp) {
rmQuery <- paste0(
rm_webapp,
"/ws/v1/cluster/info"
)
tryCatch(
{
rmResult <- httr::GET(paste0(spark_yarn_cluster_get_protocol(), "://", rmQuery))
if (httr::http_error(rmResult)) {
warning("Failed to open ", rmQuery, " with status ", httr::status_code(rmResult), ". ")
FALSE
} else {
TRUE
}
},
error = function(err) {
warning("Failed to open ", rmQuery, ". ", err)
FALSE
}
)
}
spark_yarn_cluster_get_resource_manager_webapp <- function() {
rmHighAvailability <- spark_yarn_cluster_get_conf_property("yarn.resourcemanager.ha.enabled")
rmHighAvailability <- length(rmHighAvailability) > 0 && grepl("TRUE", rmHighAvailability, ignore.case = TRUE)
mainRMWebapp <- "yarn.resourcemanager.webapp.address"
if (rmHighAvailability) {
rmHighAvailabilityId <- spark_yarn_cluster_get_conf_property("yarn.resourcemanager.ha.id")
rmHighAvailabilityIds <- spark_yarn_cluster_get_conf_property("yarn.resourcemanager.ha.rm-ids")
rmHighAvailabilityIds <- strsplit(rmHighAvailabilityIds, ",")[[1]]
if (length(rmHighAvailabilityId) > 0) {
rmHighAvailabilityIds <- rmHighAvailabilityIds[rmHighAvailabilityIds != rmHighAvailabilityId]
rmHighAvailabilityIds <- c(rmHighAvailabilityId, rmHighAvailabilityIds)
}
mainRMWebapp <- NULL
propCandidates <- c(
"yarn.resourcemanager.webapp.https.address.",
"yarn.resourcemanager.webapp.address.",
"yarn.resourcemanager.admin.address."
)
for (propCandidate in propCandidates) {
for (rmId in rmHighAvailabilityIds) {
rmCandidate <- paste0(propCandidate, rmId)
rmCandidateValue <- spark_yarn_cluster_get_conf_property(rmCandidate)
if (spark_yarn_cluster_resource_manager_is_online(rmCandidateValue)) {
mainRMWebapp <- rmCandidate
break
}
}
}
if (is.null(mainRMWebapp)) {
stop("Failed to find online resource manager under High Availability cluster.")
}
}
mainRMWebappValue <- spark_yarn_cluster_get_conf_property(mainRMWebapp)
if (length(mainRMWebappValue) == 0) {
if (rmHighAvailability) {
stop("Failed to retrieve ", mainRMWebapp, " from yarn-site.xml")
}
else {
mainRM <- "yarn.resourcemanager.address"
mainRMValue <- spark_yarn_cluster_get_conf_property(mainRM)
if (length(mainRMValue) == 0) {
stop("Failed to retrieve ", mainRMWebapp, " from yarn-site.xml")
}
else {
mainRMWebappValue <- paste(sub(":[0-9]+$", "", mainRMValue), 8088, sep = ":")
}
}
}
mainRMWebappValue
}
spark_yarn_cluster_get_protocol <- function() {
useHttpsValue <- spark_yarn_cluster_get_conf_property("yarn.http.policy")
if (length(useHttpsValue) > 0 && toupper(useHttpsValue) == "HTTPS_ONLY") {
"https"
} else {
"http"
}
}
spark_yarn_cluster_get_gateway <- function(config, start_time) {
resourceManagerWebapp <- spark_yarn_cluster_get_resource_manager_webapp()
if (length(resourceManagerWebapp) == 0) {
stop("Yarn Cluster mode uses `yarn.resourcemanager.webapp.address` but is not present in yarn-site.xml")
}
resourceManagerWebapp <- paste0(spark_yarn_cluster_get_protocol(), "://", resourceManagerWebapp)
appId <- spark_yarn_cluster_get_app_id(
config,
start_time,
resourceManagerWebapp
)
waitAcceptedSeconds <- spark_config_value(config, "sparklyr.yarn.cluster.accepted.timeout", 30)
spark_yarn_cluster_while_app(
resourceManagerWebapp,
appId,
waitAcceptedSeconds,
function(app) {
toupper(app$state) %in% c("NEW", "NEW_SAVING", "SUBMITTED")
}
)
currentState <- spark_yarn_cluster_get_app_property(
resourceManagerWebapp,
appId,
"state"
)
if (toupper(currentState) %in% c("NEW", "NEW_SAVING", "SUBMITTED")) {
stop(
"Yarn application ", appId, " and state ", currentState, " ",
"was not accepted after ", waitAcceptedSeconds, " seconds. ",
"Please check that the cluster has enough available resources or increase ",
"the wait time by changing 'config$sparklyr.yarn.cluster.accepted.timeout'."
)
}
if (toupper(currentState) != "ACCEPTED") {
stop(
"Yarn submission changed to state '", currentState, "' while 'ACCEPTED' ",
"state was expected for app: ", appId
)
}
# there is sometimes a delay to assign the host address even after app is in ACCEPTED state
waitHostAddressSeconds <- spark_config_value(config, "sparklyr.yarn.cluster.hostaddress.timeout", 30)
spark_yarn_cluster_while_app(
resourceManagerWebapp,
appId,
waitHostAddressSeconds,
function(app) {
!"amHostHttpAddress" %in% names(app)
}
)
amHostHttpAddress <- spark_yarn_cluster_get_app_property(
resourceManagerWebapp,
appId,
"amHostHttpAddress",
", try adjusting 'config$sparklyr.yarn.cluster.hostaddress.timeout'"
)
strsplit(amHostHttpAddress, ":")[[1]][[1]]
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.