R/databricks_execute.R

Defines functions databricks_execute

Documented in databricks_execute

#'
#' Remote execution of commands on a Databricks cluster.
#'
#' This function sends commands to an execution context on an existing
#' Databricks cluster via REST API.  It requires a context_id from
#' \code{create_execution_context}.  Commands must be compatible with the
#' language of the execution context - 'r', 'python', 'scala', or 'sql'.
#' Will attempt to return a data.frame but if the execution hasn't finished will return
#' the status of execution.  If your command does not return a data.frame output may
#' vary considerably, or fail.
#'
#' The API endpoint for creating the execution context is is '1.2/commands/execute'.
#'   For all details on API calls please see the official documentation at
#' \url{https://docs.databricks.com/dev-tools/api/latest/}.
#'
#' @param command A string containing commands for remote execution on Databricks.
#' @param context The list generated by \code{create_execution_context}
#' @param verbose If TRUE, will print the API response to the console.  Defaults to
#' FALSE.
#' @param ... Additional options to be passed to \code{data.table::fread} which is used to
#' parse the API response.
#' @return A list with two components:
#' \itemize{
#'     \item \emph{response} - The full API response.
#'     \item \emph{data} - The data as a data.frame.
#' }
#' @examples
#' # Using netrc
#' context <- create_execution_context(workspace = "https://eastus2.azuredatabricks.net",
#'                   language = "r",
#'                   cluster_id = "1017-337483-jars232")
#'
#' ## Use the context to execute a command on Databricks
#' command <- "iris[1, ]"
#' result <- databricks_execute(command, context)
#'
#' ## Access dataframe
#' result$data
#'

databricks_execute <- function(command, context, verbose = F, ...) {

  payload <- paste0('{
                    "language": "', context$language, '",
                    "clusterId": "', context$cluster_id, '",
                    "contextId": "', context$context_id, '",
                    "command": "', command, '"
                    }')

  ## Send command via REST, using netrc for auth by default
  if (is.null(context$token)) {

    use_netrc <- httr::config(netrc = 1)
    execute_response <- httr::with_config(use_netrc, {
      httr::POST(url = paste0(workspace, "/api/1.2/commands/execute"),
                 httr::content_type_json(),
                 body = payload)
    })
  }

  else {

    ## Bearer Authentication
    headers <- c(
      Authorization = paste("Bearer", context$token)
    )

    execute_response <- httr::POST(url = paste0(workspace, "/api/1.2/commands/execute"),
                                   httr::add_headers(.headers = headers),
                                   httr::content_type_json(),
                                   body = payload)
  }

  ## Extract command ID from response
  command_id <- jsonlite::fromJSON(rawToChar(execute_response$content))$id

  # If the command hasn't finished executing, poll the API until it has
  repeat{

    # Get result from status endpoint with command_id
    if (is.null(context$token)) {

      use_netrc <- httr::config(netrc = 1)
      status_response <- httr::with_config(use_netrc, {
        httr::GET(url = paste0(workspace, "/api/1.2/commands/status",
                               "?clusterId=", context$cluster_id,
                               "&contextId=", context$context_id,
                               "&commandId=", command_id))
      })

      # Share status
      message(
        "Command Status: ", jsonlite::fromJSON(rawToChar(status_response$content))$status
      )

    } else {

      status_response <- httr::GET(url = paste0(workspace, "/api/1.2/commands/status",
                                                "?clusterId=", context$cluster_id,
                                                "&contextId=", context$context_id,
                                                "&commandId=", command_id),
                                   httr::add_headers(.headers = headers))
      message(
        "Command Status: ", jsonlite::fromJSON(rawToChar(status_response$content))$status
      )
    }

    if (jsonlite::fromJSON(rawToChar(status_response$content))$status == "Finished") {
      break
    }

    # If execution hasn't finished, wait a second and try again
    Sys.sleep(1)
  }

  # could make this nested to account for both API calls used in this function
  if (verbose == T) {
    ## Successful request message
    if (status_response$status_code[1] == 200) {
      message(paste0(
        "Response Code: ", status_response$status_code[1],
        "\nCommand Status: ", jsonlite::fromJSON(rawToChar(status_response$content))$status,
        "\nCommand ID: ", command_id
      ))
    }

    ## Unsuccessful request message
    else {
      return(message(paste0(
        "Status: ", status_response$status_code[1],
        "\nThe request was not successful:\n\n", jsonlite::prettify(status_response)
      )))
    }
  }

  # Try to extract HTML snippet from API response
  tryCatch(
    {
      txt <- xml2::read_html(jsonlite::fromJSON(rawToChar(status_response$content))$results$data) %>%
        rvest::html_children() %>%
        xml2::xml_text()

      # Convert text to data.table if returning a data frame
      df <- suppressWarnings(suppressMessages(data.table::setDF(data.table::fread(txt, drop = ...))))

      results <- list(response = status_response,
                      data = df)

      results
    },
    error = function(e) {
      cat("There was a problem parsing the results - output must be a data.frame.
                Please check your code and try again.")
      jsonlite::prettify(status_response)
    }
  )
}
RafiKurlansik/bricksteR documentation built on Oct. 13, 2022, 6:58 a.m.