R/worker_gateway.R

Defines functions wait_connect_gateway spark_gateway_commands query_gateway_for_port spark_connect_gateway

wait_connect_gateway <- function(gatewayAddress, gatewayPort, config, isStarting) {
  waitSeconds <- if (isStarting)
    spark_config_value(config, "sparklyr.gateway.start.timeout", 60)
  else
    spark_config_value(config, "sparklyr.gateway.connect.timeout", 1)

  gateway <- NULL
  commandStart <- Sys.time()

  while (is.null(gateway) && Sys.time() < commandStart + waitSeconds) {
    tryCatch({
      suppressWarnings({
        timeout <- spark_config_value(config, "sparklyr.monitor.timeout", 1)
        gateway <- socketConnection(host = gatewayAddress,
                                    port = gatewayPort,
                                    server = FALSE,
                                    blocking = TRUE,
                                    open = "rb",
                                    timeout = timeout)
      })
    }, error = function(err) {
    })

    startWait <- spark_config_value(config, "sparklyr.gateway.start.wait", 50 / 1000)
    Sys.sleep(startWait)
  }

  gateway
}

spark_gateway_commands <- function() {
  list(
    "GetPorts" = 0,
    "RegisterInstance" = 1
  )
}

query_gateway_for_port <- function(gateway, sessionId, config, isStarting) {
  waitSeconds <- if (isStarting)
    spark_config_value(config, "sparklyr.gateway.start.timeout", 60)
  else
    spark_config_value(config, "sparklyr.gateway.connect.timeout", 1)

  sparklyr:::writeInt(gateway, spark_gateway_commands()[["GetPorts"]])
  sparklyr:::writeInt(gateway, sessionId)
  sparklyr:::writeInt(gateway, if (isStarting) waitSeconds else 0)

  backendSessionId <- NULL
  redirectGatewayPort <- NULL

  commandStart <- Sys.time()
  while(length(backendSessionId) == 0 && commandStart + waitSeconds > Sys.time()) {
    backendSessionId <- sparklyr:::readInt(gateway)
    Sys.sleep(0.1)
  }

  redirectGatewayPort <- sparklyr:::readInt(gateway)
  backendPort <- sparklyr:::readInt(gateway)

  if (length(backendSessionId) == 0 || length(redirectGatewayPort) == 0 || length(backendPort) == 0) {
    if (isStarting)
      stop("sparklyr gateway did not respond while retrieving ports information after ", waitSeconds, " seconds")
    else
      return(NULL)
  }

  list(
    gateway = gateway,
    backendPort = backendPort,
    redirectGatewayPort = redirectGatewayPort
  )
}

spark_connect_gateway <- function(
  gatewayAddress,
  gatewayPort,
  sessionId,
  config,
  isStarting = FALSE) {

  # try connecting to existing gateway
  gateway <- wait_connect_gateway(gatewayAddress, gatewayPort, config, isStarting)

  if (is.null(gateway)) {
    if (isStarting)
      stop(
        "Gateway in port (", gatewayPort, ") did not respond.")

    NULL
  }
  else {
    gatewayPortsQuery <- query_gateway_for_port(gateway, sessionId, config, isStarting)
    if (is.null(gatewayPortsQuery) && !isStarting) {
      close(gateway)
      return(NULL)
    }

    redirectGatewayPort <- gatewayPortsQuery$redirectGatewayPort
    backendPort <- gatewayPortsQuery$backendPort

    if (redirectGatewayPort == 0) {
      close(gateway)

      if (isStarting)
        stop("Gateway in port (", gatewayPort, ") does not have the requested session registered")

      NULL
    } else if(redirectGatewayPort != gatewayPort) {
      close(gateway)

      spark_connect_gateway(gatewayAddress, redirectGatewayPort, sessionId, config, isStarting)
    }
    else {
      list(
        gateway = gateway,
        backendPort = backendPort
      )
    }
  }
}
javierluraschi/sparkworker documentation built on May 18, 2019, 5:56 p.m.