R/dbxClusterAdmin.R

Defines functions ul tryParsing dbxSparkVersions dbxZones dbxNodeTypes dbxDelete dbxRestart dbxStart autoscale dbxResize dbxGet dbxDelete

Documented in autoscale dbxDelete dbxGet dbxNodeTypes dbxResize dbxRestart dbxSparkVersions dbxStart dbxZones

ul <- function(s,f,h=as.character) h(unlist(lapply(s,f)))
tryParsing <- function(r){
    s <- content(r)
    if(is.character(s)) {
        rq = fromJSON(s)
        err = sprintf("%s:: %s",rq$error_code, rq$message)
        return(list(status=FALSE,content=err))
     } else{
        return(list(status=TRUE, content=fromJSON(s)))
     }
 }
##' Get the different versions of spark available
##' @param token obtained from databricks
##' @param instance from frank
##' @details see https://docs.databricks.com/api/latest/clusters.html#spark-versions
##' @export
dbxSparkVersions<-
    function(token = options("databricks")[[1]]$token
            ,instance = options("databricks")[[1]]$instance)
{

    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/spark-versions",instance=instance)
    res <- GET(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)))
    res <- fromJSON(content(res,'text'))
    if(!is.null(res$error_code)){
        stop(infuse("dbx: {{code}}:{{msg}}",code=res$error_code,msg=res$message))
    }else {
        defaultVersion <- uu$default_version_key
        f <- data.table(keys = unlist(lapply(uu$versions,"[[","key")),
                        name = unlist(lapply(uu$versions,"[[","name")))
        f[order(keys),]
        f[, default:=FALSE]
        list(default = defaultVersion, versions=f)
    }
}

##' Get the different zones
##' @param token obtained from databricks
##' @param instance from frank
##' @details see https://docs.databricks.com/api/latest/clusters.html#list-zones
##' @return a list of default zone and data table of zones
##' @export
dbxZones<-
    function(token = options("databricks")[[1]]$token
            ,instance = options("databricks")[[1]]$instance)
{

    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/list-zones",instance=instance)
    res <- GET(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)))
    res <- fromJSON(content(res,'text'))
    if(!is.null(res$error_code)){
        stop(infuse("dbx: {{code}}:{{msg}}",code=res$error_code,msg=res$message))
    }else {
        f <- data.table(zones= res$zones)
        list(default = res$default_zone, zones=f)
    }
}

##' Get the different node types
##' @param token obtained from databricks
##' @param instance from frank
##' @details seehttps://docs.databricks.com/api/latest/clusters.html#list-node-types
##' @return a list with default node type and data table of nodes
##' @export
dbxNodeTypes<-
    function(token = options("databricks")[[1]]$token
            ,instance = options("databricks")[[1]]$instance)
{

    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/list-node-types",instance=instance)
    res <- GET(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)))
    res <- fromJSON(content(res,'text'))
    if(!is.null(res$error_code)){
        stop(infuse("dbx: {{code}}:{{msg}}",code=res$error_code,msg=res$message))
    }else {
        ## see https://docs.databricks.com/api/latest/clusters.html#clusternodetype
        x <- res$node_types
        f <- data.table(
            node_type_id = ul(x,function(k) k$node_type_id),
            memory_mb = ul(x,function(k) k$memory_mb,as.numeric),
            num_cores = ul(x,function(k) k$num_cores,as.numeric),
            description = ul(x,function(k) k$description),
            instance_type_id = ul(x,function(k) k$instance_type_id),
            is_deprecated = ul(x,function(k) k$is_deprecated,as.logical)
        )
        setkey(f,"node_type_id")
        return(list(default = res$default_node_type_id, nodes =f))
    }
}

##' Delete a cluster
##' @param cluster_id a string
##' @details see https://docs.databricks.com/api/latest/clusters.html#delete
##' @return nothing
##' @export
dbxDelete <- function(cluster_id
                    , token = options("databricks")[[1]]$token
                    , instance = options("databricks")[[1]]$instance)
{


    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/delete",instance=instance)
    body <- list("cluster_id" = as.character(cluster_id))
    
    res <- POST(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)),
                body = body
              , encode = "json")
fromJSON(content(res,as='text'))
}


##' Restart a cluster
##' @param cluster_id a string
##' @details see https://docs.databricks.com/api/latest/clusters.html#restart
##' @return nothing
##' @export
dbxRestart <- function(cluster_id
                    , token = options("databricks")[[1]]$token
                    , instance = options("databricks")[[1]]$instance)
{


    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/restart",instance=instance)
    body <- list("cluster_id" = as.character(cluster_id))
    res <- POST(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)),
                body = body
              , encode = "json")
fromJSON(content(res,as='text'))
}


##' Starts a terminated Spark cluster given its id
##' @param cluster_id a string
##' @param wait if TRUE, wait till cluster starts
##' @details see https://docs.databricks.com/api/latest/clusters.html#restart
##' @return nothing
##' @export
dbxStart <- function(cluster_id
                     , wait = TRUE
                    , token = options("databricks")[[1]]$token
                   , instance = options("databricks")[[1]]$instance
                    ,verbose=0)
{


    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/start",instance=instance)
    body <- list("cluster_id" = as.character(cluster_id))
    if(verbose>3) print(url)
    res <- POST(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)),
                body = body
              , encode = "json")
    cl <- fromJSON(content(res,as='text'))
    if(verbose>3) print(cl)
    require(progress)
    pb <- progress_bar$new(format = "Cluster :idx is  :state, elapsed: :elapsed",  clear = FALSE,total=1e7, width = 60)
    if(wait){
        pb$tick(0)
        if(verbose>4) print(list("cl",cl))
        if(is.null(cl$"error_code")){
            while(TRUE){
                st <- dbxGet(getOption("databricks")$clusterId)$state
                if(verbose>4) print(st)
                pb$tick(token=list(idx=as.character(cluster_id),state=st))
                if(!is.null(st) && st !='RUNNING') {
                    if(st=="TERMINATED") stop(sprintf("Cluster %s got TERMINATED" ,as.character(cluster_id)))
                    Sys.sleep(5)
                } else {
                    cat("\n");
                    break
                }
            }
        }else{
            if(grepl("Running", cl$"message")) {Sys.sleep(3);return(cl)}
            stop(sprintf("problem starting cluster %s", cl$message))
        }
    }
    cl
}

##' Returns an autoscale structure
##' @param  min_workers The minimum number of workers to which the cluster can scale down when underutilized. It is also the initial number of workers the cluster will have after creation.
##' @param max_workers The maximum number of workers 
##' @details see https://docs.databricks.com/api/latest/clusters.html#clusterautoscale
##' @return an object of class autoscale
##' @export
autoscale <- function(min_workers, max_workers)
{
    x <- list(min_workers=as.integer(min_workers),
              max_workers=as.integer(max_workers))
    if(max_workers < min_workers) stop("max must be larger than min")
    class(x) = "autoscale"
    return(x)
}



##' Resize a Spark cluster given its id and num_workers
##' @param cluster_id a string
##' @param num_workers an integer or of class autoscale
##' @details see https://docs.databricks.com/api/latest/clusters.html#resize
##' @return nothing
##' @export
dbxResize <- function(cluster_id
                    , token = options("databricks")[[1]]$token
                    , instance = options("databricks")[[1]]$instance)
{


    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/resize",instance=instance)
    if(!(is(num_workers,"integer") || is(num_workers,"autoscale"))){
        stop("num_workers must be an integer or of class autoscale")
    }
    body <- list("cluster_id" = as.character(cluster_id))
    if(is(num_workers,"integer")){
        body$num_workers = num_workers
    }else{
        body$autoscale <- num_workers
    }
    res <- POST(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)),
                body = body,
              , encode = "json")
    fromJSON(content(res,as='text'))
}
                                  
                                  
                                  ##' Get information about  a cluster
##' @param cluster_id a string
##' @details see https://docs.databricks.com/api/latest/clusters.html#get
##' @return a big json
##' @export
dbxGet <- function(cluster_id
                    , token = options("databricks")[[1]]$token
                    , instance = options("databricks")[[1]]$instance)
{


    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/get",instance=instance)
    body <- list("cluster_id" = as.character(cluster_id))
    res <- POST(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)),
                body = body
              , encode = "json")
    fromJSON(content(res,as='text'))
#   r<- tryParsing(res)
#   if(r$status) r$content else stop(r$content)
}

                                 ##' Get information about  a cluster
##' @param cluster_id a string
##' @details see https://docs.databricks.com/api/latest/clusters.html#delete
##' @return a big json
##' @export
dbxDelete <- function(cluster_id
                    , token = options("databricks")[[1]]$token
                    , instance = options("databricks")[[1]]$instance)
{


    if(is.null(token) || is.null(instance)) stop("Must provide a token and instance")
    dbxCtxDestroy(NULL,instance=instance, clusterId=cluster_id)
    url <- infuse("https://{{instance}}.cloud.databricks.com/api/2.0/clusters/delete",instance=instance)
    body <- list("cluster_id" = as.character(cluster_id))
    res <- POST(url, add_headers(Authorization= infuse("Bearer {{token}}",token=token)),
                body = body
              , encode = "json")
    fromJSON(content(res,as='text'))
    
#   r<- tryParsing(res)
#   if(r$status) r$content else stop(r$content)
}
saptarshiguha/rdatabricks documentation built on May 26, 2019, 3:35 a.m.