R/cluster.R

Defines functions getPoolWorkers stopCluster getClusterList getCluster makeCluster generateClusterConfig

Documented in generateClusterConfig getCluster getClusterList makeCluster stopCluster

#' Creates a configuration file for the user's cluster setup.
#'
#' @param fileName Cluster settings file name
#' @return The request to the Batch service was successful.
#' @examples {
#' generateClusterConfig("test_config.json")
#' generateClusterConfig("test_config.json")
#' }
#'
#' @export
generateClusterConfig <- function(fileName) {
  if (!file.exists(fileName) ||
      !file.exists(paste0(getwd(), "/", fileName))) {
    config <- list(
      name = "myPoolName",
      vmSize = "Standard_D2_v2",
      maxTasksPerNode = 1,
      poolSize = list(
        dedicatedNodes = list(min = 3,
                              max = 3),
        lowPriorityNodes = list(min = 3,
                                max = 3),
        autoscaleFormula = "QUEUE"
      ),
      containerImage = "rocker/tidyverse:latest",
      rPackages = list(
        cran = vector(),
        github = vector(),
        bioconductor = vector()
      ),
      commandLine = vector(),
      subnetId = ""
    )

    configJson <-
      jsonlite::toJSON(config, auto_unbox = TRUE, pretty = TRUE)
    write(configJson, file = paste0(getwd(), "/", fileName))

    print(
      sprintf(
        "A cluster settings has been generated %s. Please enter your cluster specification.",
        paste0(getwd(), "/", fileName)
      )
    )
    print(
      "Note: To maximize all CPU cores, set the maxTasksPerNode property up to 4x the number of cores for the VM size."
    )
  }
}

#' Creates an Azure cloud-enabled cluster.
#'
#' @param cluster Cluster configuration object or file name
#' @param fullName A boolean flag for checking the file full name
#' @param wait A boolean flag to wait for all nodes to boot up
#' @param resourceFiles A list of files that Batch will download to the compute node before running the command line
#'
#' @return The request to the Batch service was successful.
#' @examples
#' \dontrun{
#' cluster <- makeCluster("cluster_config.json", fullName = TRUE, wait = TRUE)
#' }
#' @export
makeCluster <-
  function(cluster = "cluster.json",
           fullName = FALSE,
           wait = TRUE,
           resourceFiles = list()) {
    if (class(cluster) == "character") {
      if (fullName) {
        poolConfig <- rjson::fromJSON(file = paste0(cluster))
      }
      else {
        poolConfig <-
          rjson::fromJSON(file = paste0(getwd(), "/", cluster))
      }
    } else if (class(cluster) == "list") {
      poolConfig <- cluster
    } else {
      stop(sprintf(
        "cluster setting type is not supported: %s\n",
        class(cluster)
      ))
    }

    config <- getOption("az_config")
    if (is.null(config)) {
      stop("Credentials were not set.")
    }

    installCranCommand <- NULL
    installGithubCommand <- NULL
    installBioconductorCommand <- NULL

    packages <- c()
    if (!is.null(poolConfig$rPackages) &&
        !is.null(poolConfig$rPackages$cran) &&
        length(poolConfig$rPackages$cran) > 0) {
      installCranCommand <-
        getPoolPackageInstallationCommand("cran", poolConfig$rPackages$cran)
      packages <- c(packages, installCranCommand)
    }

    if (!is.null(poolConfig$rPackages) &&
        !is.null(poolConfig$rPackages$github) &&
        length(poolConfig$rPackages$github) > 0) {

      if (!is.null(config$githubAuthenticationToken) &&
          config$githubAuthenticationToken != "") {
        installGithubCommand <-
          getPoolPackageInstallationCommand("github", poolConfig$rPackages$github, config$githubAuthenticationToken)
      }
      else {
        installGithubCommand <-
          getPoolPackageInstallationCommand("github", poolConfig$rPackages$github)
      }

      packages <- c(packages, installGithubCommand)
    }

    if (!is.null(poolConfig$rPackages) &&
        !is.null(poolConfig$rPackages$bioconductor) &&
        length(poolConfig$rPackages$bioconductor) > 0) {
      installBioconductorCommand <-
        getPoolPackageInstallationCommand("bioconductor", poolConfig$rPackages$bioconductor)
      packages <- c(packages, installBioconductorCommand)
    }

    if (length(packages) == 0) {
      packages <- NULL
    }

    # install docker
    containerConfiguration <- list(
      type = "dockerCompatible"
    )

    dockerImage <- "rocker/tidyverse:latest"
    if (!is.null(poolConfig$containerImage) &&
        nchar(poolConfig$containerImage) > 0) {
      dockerImage <- poolConfig$containerImage
    }

    containerConfiguration$containerImageNames <-
      list(dockerImage,
           "brianlovedocker/doazureparallel-merge-dockerfile:0.12.1")

    config$containerImage <- dockerImage

    # Note: Revert it to master once PR is approved
    dockerInstallCommand <- c(
      paste0(
        "wget https://raw.githubusercontent.com/Azure/doAzureParallel/",
        "master/inst/startup/install_bioconductor.R"
      ),
      paste0(
        "wget https://raw.githubusercontent.com/Azure/doAzureParallel/",
        "master/inst/startup/install_custom.R"
      ),
      "chmod u+x install_bioconductor.R"
    )

    commandLine <- c(dockerInstallCommand,
                    'mkdir -p $AZ_BATCH_NODE_SHARED_DIR/R/packages',
                    'chmod -R 0777 $AZ_BATCH_NODE_SHARED_DIR/R')

    # log into private registry if registry credentials were provided
    if (!is.null(config$dockerAuthentication) &&
        nchar(config$dockerAuthentication$username) > 0 &&
        nchar(config$dockerAuthentication$password) > 0 &&
        nchar(config$dockerAuthentication$registry) > 0) {

      username <- config$dockerAuthentication$username
      password <- config$dockerAuthentication$password
      registry <- config$dockerAuthentication$registry

      containerConfiguration$containerRegistries <- list(
        list(password = password,
             username = username,
             registryServer = registry)
      )
    }

    if (!is.null(poolConfig$commandLine)) {
      commandLine <- c(commandLine, poolConfig$commandLine)
    }

    if (!is.null(packages)) {
      # install packages
      commandLine <-
        c(commandLine,
          dockerRunCommand(dockerImage, packages, NULL, FALSE, FALSE))
    }

    environmentSettings <- data.frame(name = character(), value = character())
    if (!is.null(config$githubAuthenticationToken) &&
        config$githubAuthenticationToken != "") {
      environmentSettings <- rbind(
        environmentSettings,
        data.frame(name = "GITHUB_PAT",
                   value = config$githubAuthenticationToken)
      )
    }

    if (!is.null(config$applicationInsights) &&
        nchar(config$applicationInsights$applicationId) > 0 &&
        nchar(config$applicationInsights$instrumentationKey) > 0) {
      environmentSettings <- rbind(
        environmentSettings,
        data.frame(name = "APP_INSIGHTS_APP_ID",
                   value = config$applicationInsights$applicationId),
        data.frame(name = "APP_INSIGHTS_INSTRUMENTATION_KEY",
                   value = config$applicationInsights$instrumentationKey))

      commandLine <- c(commandLine,
                       "wget https://github.com/Azure/batch-insights/releases/download/go-beta.1/batch-insights",
                       "chmod +x ./batch-insights")
    }

    networkConfiguration <- NULL
    if (!is.null(poolConfig$subnetId) &&
        poolConfig$subnetId != "") {
      networkConfiguration <-
        list(
          subnetId = poolConfig$subnetId
        )
    }

    if (!is.null(poolConfig[["pool"]])) {
      validation$isValidDeprecatedClusterConfig(cluster)
      poolConfig <- poolConfig[["pool"]]
    }
    else {
      validation$isValidClusterConfig(cluster)
    }

    tryCatch({
      validation$isValidPoolName(poolConfig$name)
    },
    error = function(e) {
      stop(paste("Invalid pool name: \n",
                 e))
    })

    printCluster(poolConfig, resourceFiles)

    response <- BatchUtilitiesOperations$addPool(
      pool = poolConfig,
      packages = packages,
      environmentSettings = environmentSettings,
      resourceFiles = resourceFiles,
      commandLine = commandLine,
      networkConfiguration = networkConfiguration,
      containerConfiguration = containerConfiguration,
      applicationInsights = config$applicationInsights
    )

    if (nchar(response) > 0) {
      responseObj <- rjson::fromJSON(response)
      errorMessage <- getHttpErrorMessage(responseObj)

      if (responseObj$code == "PoolBeingDeleted") {
        message <- paste(
          "Cluster '%s' already exists and is being deleted.",
          "Another cluster with the same name cannot be created",
          "until it is deleted. Please wait for the cluster to be deleted",
          "or create one with a different name"
        )

      if (wait == TRUE) {
        pool <- config$batchClient$poolOperations$getPool(poolConfig$name)

        cat(sprintf(message,
                    poolConfig$name),
            fill = TRUE)

        while (!is.null(pool) && !is.null(pool$state) && pool$state == "deleting") {
          cat(".")
          Sys.sleep(10)
          pool <- config$batchClient$poolOperations$getPool(
            poolConfig$name)
        }

        cat("\n")

        response <- BatchUtilitiesOperations$addPool(
          pool = poolConfig,
          packages = packages,
          environmentSettings = environmentSettings,
          resourceFiles = resourceFiles,
          commandLine = commandLine
        )

        if (nchar(response) > 0) {
          responseObj <- rjson::fromJSON(response)
          errorMessage <- getHttpErrorMessage(responseObj)
        }
        else {
          responseObj <- NULL
          errorMessage <- NULL
        }
      }
      else {
        stop(sprintf(message,
                     poolConfig$name))
        }
      }

      if (nchar(response) > 0) {
        if (responseObj$code == "AuthenticationFailed") {
          stop(paste0("Check your credentials and try again.\r\n", errorMessage))
        }
        else {
          if (responseObj$code != "PoolExists") {
            stop(errorMessage)
          }
        }
      }
    }

    pool <- config$batchClient$poolOperations$getPool(
      poolConfig$name)

    if (grepl("PoolExists", response)) {
      cat(
        sprintf(
          "The specified cluster '%s' already exists. Cluster '%s' will be used.",
          pool$id,
          pool$id
        ),
        fill = TRUE
      )

      clusterNodeMismatchWarning <-
        paste(
          "There is a mismatched between the requested cluster %s",
          "nodes min/max '%s'/'%s' and the existing cluster %s nodes '%s'.",
          "Use the 'resizeCluster' function to get the correct amount",
          "of workers."
        )

      if (!(
        poolConfig$poolSize$dedicatedNodes$min <= pool$targetDedicatedNodes &&
        pool$targetDedicatedNodes <= poolConfig$poolSize$dedicatedNodes$max
      )) {
        dedicatedLabel <- "dedicated"
        warning(
          sprintf(
            clusterNodeMismatchWarning,
            dedicatedLabel,
            poolConfig$poolSize$dedicatedNodes$min,
            poolConfig$poolSize$dedicatedNodes$max,
            dedicatedLabel,
            pool$targetDedicatedNodes
          )
        )
      }

      if (!(
        poolConfig$poolSize$lowPriorityNodes$min <= pool$targetLowPriorityNodes &&
        pool$targetLowPriorityNodes <= poolConfig$poolSize$lowPriorityNodes$max
      )) {
        lowPriorityLabel <- "low priority"

        warning(
          sprintf(
            clusterNodeMismatchWarning,
            lowPriorityLabel,
            poolConfig$poolSize$lowPriorityNodes$min,
            poolConfig$poolSize$lowPriorityNodes$max,
            lowPriorityLabel,
            pool$targetLowPriorityNodes
          )
        )
      }
    }

    if (wait) {
      if (!grepl("PoolExists", response)) {
        waitForNodesToComplete(poolConfig$name, 60000)
      }
    }

    cat("Your cluster has been registered.", fill = TRUE)
    cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes),
        fill = TRUE)
    cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes),
        fill = TRUE)
    config$poolId <- poolConfig$name
    options("az_config" = config)
    return(getOption("az_config"))
  }

#' Gets the cluster from your Azure account.
#'
#' @param clusterName The cluster configuration that was created in \code{makeCluster}
#'
#' @examples
#' \dontrun{
#' cluster <- getCluster("myCluster")
#' }
#' @export
getCluster <- function(clusterName, verbose = TRUE) {
  config <- getConfiguration()

  pool <- config$batchClient$poolOperations$getPool(
    clusterName)

  if (!is.null(pool$code) && !is.null(pool$message)) {
    stop(sprintf("Code: %s - Message: %s", pool$code, pool$message))
  }

  if (!is.null(pool$resizeErrors)) {
    cat("\n")

    resizeErrors <- ""
    for (i in 1:length(pool$resizeErrors)) {
      resizeErrors <-
        paste0(
          resizeErrors,
          sprintf(
            "Code: %s - Message: %s \n",
            pool$resizeErrors[[i]]$code,
            pool$resizeErrors[[i]]$message
          )
        )
    }

    stop(resizeErrors)
  }

  config <- getOption("az_config")
  nodes <- config$batchClient$poolOperations$listPoolNodes(
    clusterName)

  if (!is.null(nodes$value) && length(nodes$value) > 0) {
    nodesInfo <- .processNodeCount(nodes)
    nodesState <- nodesInfo$nodesState
    nodesWithFailures <- nodesInfo$nodesWithFailures

    if (verbose == TRUE) {
      cat("\nnodes:", fill = TRUE)
      cat(sprintf("\tidle:                %s", nodesState$idle), fill = TRUE)
      cat(sprintf("\tcreating:            %s", nodesState$creating), fill = TRUE)
      cat(sprintf("\tstarting:            %s", nodesState$starting), fill = TRUE)
      cat(sprintf("\twaitingforstarttask: %s", nodesState$waitingforstarttask), fill = TRUE)
      cat(sprintf("\tstarttaskfailed:     %s", nodesState$starttaskfailed), fill = TRUE)
      cat(sprintf("\tpreempted:           %s", nodesState$preempted), fill = TRUE)
      cat(sprintf("\trunning:             %s", nodesState$running), fill = TRUE)
      cat(sprintf("\tother:               %s", nodesState$other), fill = TRUE)
    }

    .showNodesFailure(nodesWithFailures)
  }

  cat("Your cluster has been registered.", fill = TRUE)

  config <- getOption("az_config")
  config$targetDedicatedNodes <- pool$targetDedicatedNodes
  config$targetLowPriorityNodes <- pool$targetLowPriorityNodes
  cat(sprintf("Dedicated Node Count: %i", pool$targetDedicatedNodes),
      fill = TRUE)
  cat(sprintf("Low Priority Node Count: %i", pool$targetLowPriorityNodes),
      fill = TRUE)

  config$poolId <- clusterName
  options("az_config" = config)
  return(config)
}

#' Get a list of clusters by state from the given filter
#'
#' @param filter A filter containing cluster state
#'
#' @examples
#' \dontrun{
#' getClusterList()
#' }
#' @export
getClusterList <- function(filter = NULL) {
  filterClause <- ""

  if (!is.null(filter)) {
    if (!is.null(filter$state)) {
      for (i in 1:length(filter$state)) {
        filterClause <-
          paste0(filterClause,
                 sprintf("state eq '%s'", filter$state[i]),
                 " or ")
      }

      filterClause <-
        substr(filterClause, 1, nchar(filterClause) - 3)
    }
  }

  config <- getOption("az_config")
  pools <- config$batchClient$poolOperations$listPools(
    query = list(
      "$filter" = filterClause,
      "$select" = paste0("id,state,allocationState,vmSize,currentDedicatedNodes,",
                         "targetDedicatedNodes,currentLowPriorityNodes,targetLowPriorityNodes")
    )
  )

  count <- length(pools$value)
  id <- character(count)
  state <- character(count)
  allocationState <- character(count)
  vmSize <- integer(count)
  currentDedicatedNodes <- integer(count)
  targetDedicatedNodes <- integer(count)
  currentLowPriorityNodes <- integer(count)
  targetLowPriorityNodes <- integer(count)

  if (count > 0) {
    if (is.null(pools$value[[1]]$id)) {
      stop(pools$value)
    }
    for (j in 1:length(pools$value)) {
      id[j] <- pools$value[[j]]$id
      state[j] <- pools$value[[j]]$state
      allocationState[j] <- pools$value[[j]]$allocationState
      vmSize[j] <- pools$value[[j]]$vmSize
      currentDedicatedNodes[j] <- pools$value[[j]]$currentDedicatedNodes
      targetDedicatedNodes[j] <- pools$value[[j]]$targetDedicatedNodes
      currentLowPriorityNodes[j] <- pools$value[[j]]$currentLowPriorityNodes
      targetLowPriorityNodes[j] <- pools$value[[j]]$targetLowPriorityNodes
    }
  }

  return (
    data.frame(
      Id = id,
      State = state,
      AllocationState = allocationState,
      VmSize = vmSize,
      CurrentDedicatedNodes = currentDedicatedNodes,
      targetDedicatedNodes = targetDedicatedNodes,
      currentLowPriorityNodes = currentLowPriorityNodes,
      targetLowPriorityNodes = targetLowPriorityNodes
    )
  )
}

#' Deletes the cluster from your Azure account.
#'
#' @param cluster The cluster configuration that was created in \code{makeCluster}
#'
#' @examples
#' \dontrun{
#' clusterConfiguration <- makeCluster("cluster_settings.json")
#' stopCluster(clusterConfiguration)
#' }
#' @export
stopCluster <- function(cluster) {
  config <- getOption("az_config")
  config$batchClient$poolOperations$deletePool(
    cluster$poolId)

  print(sprintf("Your %s cluster is being deleted.", cluster$poolId))
}

getPoolWorkers <- function(poolId, ...) {
  args <- list(...)
  raw <- !is.null(args$RAW)

  config <- getOption("az_config")
  nodes <- config$batchClient$poolOperations$listPoolNodes(
    poolId)

  if (length(nodes$value) > 0) {
    for (i in 1:length(nodes$value)) {
      print(
        sprintf(
          "Node: %s - %s - %s",
          nodes$value[[i]]$id,
          nodes$value[[i]]$state,
          nodes$value[[i]]$ipAddress
        )
      )
    }
  }
  else{
    print("There are currently no nodes in the pool.")
  }

  if (raw) {
    return(nodes)
  }
}
Azure/doAzureParallel documentation built on May 22, 2021, 4:39 a.m.