#'
#' Remote execution of commands on a Databricks cluster.
#'
#' This function sends commands to an execution context on an existing
#' Databricks cluster via REST API. It requires a context_id from
#' \code{create_execution_context}. Commands must be compatible with the
#' language of the execution context - 'r', 'python', 'scala', or 'sql'.
#' Will attempt to return a data.frame but if the execution hasn't finished will return
#' the status of execution. If your command does not return a data.frame output may
#' vary considerably, or fail.
#'
#' The API endpoint for creating the execution context is is '1.2/commands/execute'.
#' For all details on API calls please see the official documentation at
#' \url{https://docs.databricks.com/dev-tools/api/latest/}.
#'
#' @param command A string containing commands for remote execution on Databricks.
#' @param context The list generated by \code{create_execution_context}
#' @param verbose If TRUE, will print the API response to the console. Defaults to
#' FALSE.
#' @param ... Additional options to be passed to \code{data.table::fread} which is used to
#' parse the API response.
#' @return A list with two components:
#' \itemize{
#' \item \emph{response} - The full API response.
#' \item \emph{data} - The data as a data.frame.
#' }
#' @examples
#' # Using netrc
#' context <- create_execution_context(workspace = "https://eastus2.azuredatabricks.net",
#' language = "r",
#' cluster_id = "1017-337483-jars232")
#'
#' ## Use the context to execute a command on Databricks
#' command <- "iris[1, ]"
#' result <- databricks_execute(command, context)
#'
#' ## Access dataframe
#' result$data
#'
databricks_execute <- function(command, context, verbose = F, ...) {
payload <- paste0('{
"language": "', context$language, '",
"clusterId": "', context$cluster_id, '",
"contextId": "', context$context_id, '",
"command": "', command, '"
}')
## Send command via REST, using netrc for auth by default
if (is.null(context$token)) {
use_netrc <- httr::config(netrc = 1)
execute_response <- httr::with_config(use_netrc, {
httr::POST(url = paste0(workspace, "/api/1.2/commands/execute"),
httr::content_type_json(),
body = payload)
})
}
else {
## Bearer Authentication
headers <- c(
Authorization = paste("Bearer", context$token)
)
execute_response <- httr::POST(url = paste0(workspace, "/api/1.2/commands/execute"),
httr::add_headers(.headers = headers),
httr::content_type_json(),
body = payload)
}
## Extract command ID from response
command_id <- jsonlite::fromJSON(rawToChar(execute_response$content))$id
# If the command hasn't finished executing, poll the API until it has
repeat{
# Get result from status endpoint with command_id
if (is.null(context$token)) {
use_netrc <- httr::config(netrc = 1)
status_response <- httr::with_config(use_netrc, {
httr::GET(url = paste0(workspace, "/api/1.2/commands/status",
"?clusterId=", context$cluster_id,
"&contextId=", context$context_id,
"&commandId=", command_id))
})
# Share status
message(
"Command Status: ", jsonlite::fromJSON(rawToChar(status_response$content))$status
)
} else {
status_response <- httr::GET(url = paste0(workspace, "/api/1.2/commands/status",
"?clusterId=", context$cluster_id,
"&contextId=", context$context_id,
"&commandId=", command_id),
httr::add_headers(.headers = headers))
message(
"Command Status: ", jsonlite::fromJSON(rawToChar(status_response$content))$status
)
}
if (jsonlite::fromJSON(rawToChar(status_response$content))$status == "Finished") {
break
}
# If execution hasn't finished, wait a second and try again
Sys.sleep(1)
}
# could make this nested to account for both API calls used in this function
if (verbose == T) {
## Successful request message
if (status_response$status_code[1] == 200) {
message(paste0(
"Response Code: ", status_response$status_code[1],
"\nCommand Status: ", jsonlite::fromJSON(rawToChar(status_response$content))$status,
"\nCommand ID: ", command_id
))
}
## Unsuccessful request message
else {
return(message(paste0(
"Status: ", status_response$status_code[1],
"\nThe request was not successful:\n\n", jsonlite::prettify(status_response)
)))
}
}
# Try to extract HTML snippet from API response
tryCatch(
{
txt <- xml2::read_html(jsonlite::fromJSON(rawToChar(status_response$content))$results$data) %>%
rvest::html_children() %>%
xml2::xml_text()
# Convert text to data.table if returning a data frame
df <- suppressWarnings(suppressMessages(data.table::setDF(data.table::fread(txt, drop = ...))))
results <- list(response = status_response,
data = df)
results
},
error = function(e) {
cat("There was a problem parsing the results - output must be a data.frame.
Please check your code and try again.")
jsonlite::prettify(status_response)
}
)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.