Nothing
#' @include shell_connection.R
master_is_gateway <- function(master) {
length(grep("^(sparklyr://)?[^:]+:[0-9]+(/[0-9]+)?$", master)) > 0
}
gateway_connection <- function(master, config) {
if (!master_is_gateway(master)) {
stop("sparklyr gateway master expected to be formatted as sparklyr://address:port")
}
protocol <- strsplit(master, "//")[[1]]
components <- strsplit(protocol[[2]], ":")[[1]]
gatewayAddress <- components[[1]]
portAndSesssion <- strsplit(components[[2]], "/")[[1]]
gatewayPort <- as.integer(portAndSesssion[[1]])
sessionId <- if (length(portAndSesssion) > 1) as.integer(portAndSesssion[[2]]) else 0
gatewayInfo <- spark_connect_gateway(
gatewayAddress = gatewayAddress,
gatewayPort = gatewayPort,
sessionId = sessionId,
config = config
)
if (is.null(gatewayInfo)) {
stop("Failed to connect to gateway: ", master)
}
sc <- spark_gateway_connection(master, config, gatewayInfo, gatewayAddress)
if (is.null(gatewayInfo)) {
stop("Failed to open connection from gateway: ", master)
}
sc
}
spark_gateway_connection <- function(master, config, gatewayInfo, gatewayAddress) {
tryCatch(
{
interval <- spark_config_value(config, "sparklyr.backend.interval", 1)
backend <- socketConnection(
host = gatewayAddress,
port = gatewayInfo$backendPort,
server = FALSE,
blocking = interval > 0,
open = "wb",
timeout = interval
)
class(backend) <- c(class(backend), "shell_backend")
monitoring <- socketConnection(
host = gatewayAddress,
port = gatewayInfo$backendPort,
server = FALSE,
blocking = interval > 0,
open = "wb",
timeout = interval
)
class(monitoring) <- c(class(monitoring), "shell_backend")
},
error = function(err) {
close(gatewayInfo$gateway)
stop("Failed to open connection to backend:", err$message)
}
)
# create the shell connection
sc <- new_spark_gateway_connection(list(
# spark_connection
master = master,
method = "gateway",
app_name = "sparklyr",
config = config,
state = new.env(),
# spark_gateway_connection : spark_shell_connection
spark_home = NULL,
backend = backend,
monitoring = monitoring,
gateway = gatewayInfo$gateway,
output_file = NULL
))
# stop shell on R exit
reg.finalizer(baseenv(), function(x) {
if (connection_is_open(sc)) {
stop_shell(sc)
}
}, onexit = TRUE)
sc
}
#' @export
connection_is_open.spark_gateway_connection <- connection_is_open.spark_shell_connection
#' @export
spark_log.spark_gateway_connection <- function(sc, n = 100, filter = NULL, ...) {
stop("spark_log is not available while connecting through an sparklyr gateway")
}
#' @export
spark_web.spark_gateway_connection <- function(sc, ...) {
stop("spark_web is not available while connecting through an sparklyr gateway")
}
#' @export
invoke_method.spark_gateway_connection <- invoke_method.spark_shell_connection
#' @export
j_invoke_method.spark_gateway_connection <- j_invoke_method.spark_shell_connection
#' @export
print_jobj.spark_gateway_connection <- print_jobj.spark_shell_connection
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.