R/AzureSpark.R

#' Create new Spark Session.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureAuthenticate
#' @inheritParams azureCreateHDI
#'
#' @family Spark functions
#' @export
azureSparkNewSession <- function(azureActiveContext, clustername, hdiAdmin,
                                 hdiPassword, kind = "spark", verbose = FALSE) {

  if (missing(clustername)) {
    CN <- azureActiveContext$clustername
  } else (CN = clustername)
  if (missing(hdiAdmin)) {
    HA <- azureActiveContext$hdiAdmin
  } else (HA = hdiAdmin)
  if (missing(hdiPassword)) {
    HP <- azureActiveContext$hdiPassword
  } else (HP = hdiPassword)
  if (missing(kind)) {
    KI <- azureActiveContext$kind
  } else (KI = kind)
  verbosity <- set_verbosity(verbose)


  if (!length(CN)) {
    stop("Error: No Valid clustername provided")
  }
  if (!length(HA)) {
    stop("Error: No Valid hdiAdmin provided")
  }
  if (!length(HP)) {
    stop("Error: No Valid hdiPassword  provided")
  }
  if (!length(kind)) {
    stop("Error: No Valid kind provided")
  }

  azureActiveContext$hdiAdmin <- HA
  azureActiveContext$hdiPassword <- HP
  azureActiveContext$clustername <- CN
  azureActiveContext$kind <- KI

  URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions", sep = "")

  # URL <-
  # paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')

  # print(URL)

  bodyI <- list(kind = KI)

  r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
            authenticate(HA, HP), body = bodyI, encode = "json", verbosity)

  if (status_code(r) != "201")
    stop(paste("Error Return Code:", status_code(r)))

  rl <- content(r, "text", encoding = "UTF-8")
  # print(rl)
  df <- fromJSON(rl)
  # print(df$id)
  azureActiveContext$sessionID <- toString(df$id)
  return(df$id)
}


#' List Spark Sessions.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @family Spark functions
#' @export
azureSparkListSessions <- function(azureActiveContext, clustername, hdiAdmin,
                                   hdiPassword, verbose = FALSE) {
  HA = ""
  if (missing(clustername)) {
    CN <- azureActiveContext$clustername
  } else (CN = clustername)
  if (missing(hdiAdmin)) {
    HA <- azureActiveContext$hdiAdmin
  } else (HA = hdiAdmin)
  if (missing(hdiPassword)) {
    HP <- azureActiveContext$hdiPassword
  } else (HP = hdiPassword)
  verbosity <- set_verbosity(verbose)


  if (!length(CN)) {
    stop("Error: No Valid clustername provided")
  }
  if (!length(HA)) {
    stop("Error: No Valid hdiAdmin provided")
  }
  if (!length(HP)) {
    stop("Error: No Valid hdiPassword  provided")
  }

  azureActiveContext$hdiAdmin <- HA
  azureActiveContext$hdiPassword <- HP
  azureActiveContext$clustername <- CN

  URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions", sep = "")

  # URL <-
  # paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')

  # print(URL)

  r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
           authenticate(HA, HP), verbosity)
  # ,authenticate('admin', 'Summer2014!')
  rl <- content(r, "text", encoding = "UTF-8")
  df <- fromJSON(rl)
  # print(df) print(df$sessions$appId)
  dfn <- as.data.frame(df$sessions$id)
  clust <- nrow(dfn)
  if (clust == 0)
    stop("No Sessions available")
  dfn[1:clust, 2] <- df$sessions$appId
  dfn[1:clust, 3] <- df$sessions$state
  dfn[1:clust, 4] <- df$sessions$proxyUser
  dfn[1:clust, 5] <- df$sessions$kind
  colnames(dfn) <- c("ID", "appID", "state", "proxyUser", "kind")

  return(dfn)
}


#' Stop a Spark Sessions.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @family Spark functions
#' @export
azureSparkStopSession <- function(azureActiveContext, clustername, hdiAdmin,
                                  hdiPassword, sessionID, verbose = FALSE) {

  azureCheckToken(azureActiveContext)

  if (missing(clustername)) {
    CN <- azureActiveContext$clustername
  } else (CN = clustername)
  if (missing(hdiAdmin)) {
    HA <- azureActiveContext$hdiAdmin
  } else (HA = hdiAdmin)
  if (missing(hdiPassword)) {
    HP <- azureActiveContext$hdiPassword
  } else (HP = hdiPassword)
  if (missing(sessionID)) {
    SI <- azureActiveContext$sessionID
  } else (SI = sessionID)
  verbosity <- set_verbosity(verbose)


  if (!length(CN)) {
    stop("Error: No clustername provided")
  }
  if (!length(HA)) {
    stop("Error: No hdiAdmin provided")
  }
  if (!length(HP)) {
    stop("Error: No hdiPassword  provided")
  }
  if (!length(SI)) {
    stop("Error: No sessionID provided")
  }

  azureActiveContext$hdiAdmin <- HA
  azureActiveContext$hdiPassword <- HP
  azureActiveContext$clustername <- CN
  azureActiveContext$sessionID <- SI

  URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions/",
               SI, sep = "")

  # URL <-
  # paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')

  # print(URL)

  r <- DELETE(URL, add_headers(.headers = c(`Content-type` = "application/json")),
              authenticate(HA, HP), verbosity)

  # rl <- content(r,'text',encoding='UTF-8') print(rl)
  if (status_code(r) == "404")
    stop(paste("sessionID not found (", status_code(r), ")"))

  if (status_code(r) != "200")
    stop(paste("Error Return Code:", status_code(r)))
  return(TRUE)
}


#' Send Spark Statements/comamnds (REPL/Interactive mode).
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @param CMD CMD
#'
#' @family Spark functions
#' @export
azureSparkCMD <- function(azureActiveContext, CMD, clustername, hdiAdmin,
                          hdiPassword, sessionID, verbose = FALSE) {

  if (missing(clustername)) {
    CN <- azureActiveContext$clustername
  } else (CN = clustername)
  if (missing(hdiAdmin)) {
    HA <- azureActiveContext$hdiAdmin
  } else (HA = hdiAdmin)
  if (missing(hdiPassword)) {
    HP <- azureActiveContext$hdiPassword
  } else (HP = hdiPassword)
  if (missing(sessionID)) {
    SI <- azureActiveContext$sessionID
  } else (SI = sessionID)
  if (missing(CMD)) {
    stop("Error: No CMD provided")
  }
  verbosity <- set_verbosity(verbose)


  if (!length(CN)) {
    stop("Error: No Valid clustername provided")
  }
  if (!length(HA)) {
    stop("Error: No Valid hdiAdmin provided")
  }
  if (!length(HP)) {
    stop("Error: No Valid hdiPassword  provided")
  }
  if (!length(SI)) {
    stop("Error: No sessionID provided")
  }

  azureActiveContext$hdiAdmin <- HA
  azureActiveContext$hdiPassword <- HP
  azureActiveContext$clustername <- CN
  azureActiveContext$sessionID <- SI

  URL <- paste("https://", CN, ".azurehdinsight.net/livy/sessions/",
               SI, "/statements", sep = "")

  # URL <-
  # paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')

  # print(URL) print(typeof(CMD))
  bodyI <- list(code = CMD)
  # print(CMD)

  r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
            authenticate(HA, HP), body = bodyI, encode = "json", verbosity)

  rl <- content(r, "text", encoding = "UTF-8")
  rh <- headers(r)

  if (status_code(r) == "404")
    stop(paste("sessionID not found (", status_code(r), ")"))

  if (status_code(r) != "201")
    stop(paste("Error Return Code:", status_code(r)))

  df <- fromJSON(rl)
  # print(df$sessions$appId)
  if (df$state == "available") {
    RET <- df$output$data
    return(toString(RET))
  }
  DUR <- 2
  URL <- paste("https://", CN, ".azurehdinsight.net/livy/", rh$location,
               sep = "")
  # print(URL)
  message(paste("CMD Running: ", Sys.time()))
  message("Running(R), Completed(C)")

  while (df$state == "running") {
    Sys.sleep(DUR)
    if (DUR < 5)
      DUR <- DUR + 1
    message("R")
    r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
             authenticate(HA, HP))
    rl <- content(r, "text", encoding = "UTF-8")
    rh <- headers(r)
    df <- fromJSON(rl)

  }
  message("C")
  message("Finished Running statement: ", Sys.time())
  RET <- df$output$data[1]
  # rownames(RET) <- 'Return Value'
  return(toString(RET))

}


#' Submit Spark Job (Batch mode).
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @param FILE file
#' @param log log
#'
#' @family Spark functions
#' @export
azureSparkJob <- function(azureActiveContext, FILE, clustername, hdiAdmin,
                          hdiPassword, log = "URL", verbose = FALSE) {

  if (missing(clustername)) {
    CN <- azureActiveContext$clustername
  } else (CN = clustername)
  if (missing(hdiAdmin)) {
    HA <- azureActiveContext$hdiAdmin
  } else (HA = hdiAdmin)
  if (missing(hdiPassword)) {
    HP <- azureActiveContext$hdiPassword
  } else (HP = hdiPassword)
  if (missing(FILE)) {
    stop("Error: No CMD provided")
  }
  verbosity <- set_verbosity(verbose)


  if (!length(CN)) {
    stop("Error: No Valid clustername provided")
  }
  if (!length(HA)) {
    stop("Error: No Valid hdiAdmin provided")
  }
  if (!length(HP)) {
    stop("Error: No Valid hdiPassword  provided")
  }

  azureActiveContext$hdiAdmin <- HA
  azureActiveContext$hdiPassword <- HP
  azureActiveContext$clustername <- CN

  URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches", sep = "")

  # URL <-
  # paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')

  # print(URL) print(typeof(CMD))
  bodyI <- list(file = FILE)
  # print(CMD)

  r <- POST(URL, add_headers(.headers = c(`Content-type` = "application/json")),
            authenticate(HA, HP), body = bodyI, encode = "json", verbosity)

  rl <- content(r, "text", encoding = "UTF-8")
  rh <- headers(r)

  if (status_code(r) == "404")
    stop(paste("sessionID not found (", status_code(r), ")"))

  if (status_code(r) != "201")
    stop(paste("Error Return Code:", status_code(r)))

  df <- fromJSON(rl)
  BI <- df$id
  # print(df$sessions$appId)
  if (df$state == "available")
    return(df$output$data)
  DUR <- 2
  BI <- df$id

  URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches/", BI,
               sep = "")
  # print(URL)
  message(paste("CMD Running: ", Sys.time()))
  message("Running(R), Completed(C)")
  LOGURL2 <- ""

  while (df$state == "running") {
    Sys.sleep(DUR)
    if (DUR < 5)
      DUR <- DUR + 1
    message("R")
    r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
             authenticate(HA, HP), verbosity)
    rl <- content(r, "text", encoding = "UTF-8")
    rh <- headers(r)
    df <- fromJSON(rl)
    if (length(df$appInfo$driverlogUrl))
      LOGURL2 <- df$appInfo$driverlogUrl
  }
  message("C")
  STATE <- df$state
  message("")
  message(paste("Finished Running statement: ", Sys.time()))

  # BID = gsub('application_','container_',df$appId) print(df$log[2])
  # HN<- strsplit(df$log[2], ' ')
  print("LOGURL")
  print(LOGURL2)

  if (log == "URL")
    azureActiveContext$log <- LOGURL2 else {
      print("LOGURL")
      print(LOGURL2)

      r <- GET(LOGURL2, add_headers(.headers = c(`Content-type` = "application/json")),
               authenticate(HA, HP), verbosity)
      rl <- content(r, "text", encoding = "UTF-8")
      azureActiveContext$log <- rl
    }
  return(STATE)
}


#' List Spark Jobs (Batch mode).
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @family Spark functions
#' @return manually direct output to blob fule /SQL in script
#' @export
azureSparkListJobs <- function(azureActiveContext, clustername, hdiAdmin,
                               hdiPassword, verbose = FALSE) {
  HA = ""
  if (missing(clustername)) {
    CN <- azureActiveContext$clustername
  } else (CN = clustername)
  if (missing(hdiAdmin)) {
    HA <- azureActiveContext$hdiAdmin
  } else (HA = hdiAdmin)
  if (missing(hdiPassword)) {
    HP <- azureActiveContext$hdiPassword
  } else (HP = hdiPassword)
  verbosity <- set_verbosity(verbose)


  if (!length(CN)) {
    stop("Error: No Valid clustername provided")
  }
  if (!length(HA)) {
    stop("Error: No Valid hdiAdmin provided")
  }
  if (!length(HP)) {
    stop("Error: No Valid hdiPassword  provided")
  }

  azureActiveContext$hdiAdmin <- HA
  azureActiveContext$hdiPassword <- HP
  azureActiveContext$clustername <- CN

  URL <- paste("https://", CN, ".azurehdinsight.net/livy/batches", sep = "")

  # URL <-
  # paste('https://management.azure.com/subscriptions/',subscriptionID,'/resourceGroups/',resourceGroup,'/providers/Microsoft.HDInsight/clusters/',clustername,'?api-version=2015-03-01-preview',sep='')

  # print(URL)


  r <- GET(URL, add_headers(.headers = c(`Content-type` = "application/json")),
           authenticate(HA, HP), verbosity)
  # ,authenticate('admin', 'Summer2014!')
  rl <- content(r, "text", encoding = "UTF-8")
  df <- fromJSON(rl)
  # print(df$sessions$id)
  print(colnames(df))
  dfn <- as.data.frame(df$sessions$id)
  clust <- nrow(dfn)
  if (clust == 0)
    stop("No Sessions available")
  dfn[1:clust, 2] <- df$sessions$appId
  dfn[1:clust, 3] <- df$sessions$state

  colnames(dfn) <- c("ID", "appID", "state")

  return(dfn)
}


#' Show Spark log Output.
#'
#' @inheritParams setAzureContext
#' @inheritParams azureSparkNewSession
#'
#' @param URL URL
#'
#' @family Spark
#'
#' @family Spark functions
#' @export
azureSparkShowURL <- function(azureActiveContext, URL) {
  if (!missing(URL))
    browseURL(URL) else browseURL(azureActiveContext$log)
  return(TRUE)
}
ymmah/AzureSMR documentation built on May 6, 2019, 10:53 a.m.