R/clusters.R

Defines functions get_latest_dbr get_and_start_cluster db_cluster_events db_cluster_list_zones db_cluster_runtime_versions db_cluster_list_node_types db_cluster_list db_cluster_get db_cluster_resize db_cluster_unpin db_cluster_pin db_cluster_perm_delete db_cluster_terminate db_cluster_delete db_cluster_restart db_cluster_start db_cluster_action db_cluster_edit db_cluster_create

Documented in db_cluster_action db_cluster_create db_cluster_delete db_cluster_edit db_cluster_events db_cluster_get db_cluster_list db_cluster_list_node_types db_cluster_list_zones db_cluster_perm_delete db_cluster_pin db_cluster_resize db_cluster_restart db_cluster_runtime_versions db_cluster_start db_cluster_terminate db_cluster_unpin get_and_start_cluster get_latest_dbr

#' Create a Cluster
#'
#' @param name Cluster name requested by the user. This doesn’t have to be
#' unique. If not specified at creation, the cluster name will be an empty
#' string.
#' @param spark_version The runtime version of the cluster. You can retrieve a
#' list of available runtime versions by using [db_cluster_runtime_versions()].
#' @param spark_conf Named list. An object containing a set of optional,
#' user-specified Spark configuration key-value pairs. You can also pass in a
#' string of extra JVM options to the driver and the executors via
#' `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions`
#' respectively. E.g. `list("spark.speculation" = true,
#' "spark.streaming.ui.retainedBatches" = 5)`.
#' @param num_workers Number of worker nodes that this cluster should have. A
#' cluster has one Spark driver and `num_workers` executors for a total of
#' `num_workers` + 1 Spark nodes.
#' @param autoscale Instance of [cluster_autoscale()].
#' @param node_type_id The node type for the worker nodes.
#' [db_cluster_list_node_types()] can be used to see available node types.
#' @param cloud_attrs Attributes related to clusters running on specific cloud
#' provider. Defaults to [aws_attributes()]. Must be one of [aws_attributes()],
#' [azure_attributes()], [gcp_attributes()].
#' @param driver_node_type_id The node type of the Spark driver. This field is
#' optional; if unset, the driver node type will be set as the same value as
#' `node_type_id` defined above. [db_cluster_list_node_types()] can be used to
#' see available node types.
#' @param custom_tags Named list. An object containing a set of tags for cluster
#' resources. Databricks tags all cluster resources with these tags in addition
#' to `default_tags`. Databricks allows at most 45 custom tags.
#' @param log_conf Instance of [cluster_log_conf()].
#' @param init_scripts Instance of [init_script_info()].
#' @param spark_env_vars Named list. User-specified environment variable
#' key-value pairs. In order to specify an additional set of
#' `SPARK_DAEMON_JAVA_OPTS`, we recommend appending them to
#' `$SPARK_DAEMON_JAVA_OPTS` as shown in the following example. This ensures
#' that all default Databricks managed environmental variables are included as
#' well. E.g. `{"SPARK_DAEMON_JAVA_OPTS": "$SPARK_DAEMON_JAVA_OPTS
#'  -Dspark.shuffle.service.enabled=true"}`
#' @param autotermination_minutes Automatically terminates the cluster after it
#' is inactive for this time in minutes. If not set, this cluster will not be
#' automatically terminated. If specified, the threshold must be between 10 and
#' 10000 minutes. You can also set this value to 0 to explicitly disable
#' automatic termination. Defaults to 120.
#' @param ssh_public_keys List. SSH public key contents that will be added to each
#' Spark node in this cluster. The corresponding private keys can be used to
#' login with the user name ubuntu on port 2200. Up to 10 keys can be specified.
#' @param enable_elastic_disk When enabled, this cluster will dynamically
#' acquire additional disk space when its Spark workers are running low on
#' disk space.
#' @param driver_instance_pool_id ID of the instance pool to use for the
#' driver node. You must also specify `instance_pool_id`. Optional.
#' @param instance_pool_id ID of the instance pool to use for cluster nodes. If
#' `driver_instance_pool_id` is present, `instance_pool_id` is used for worker
#' nodes only. Otherwise, it is used for both the driver and worker nodes.
#' Optional.
#' @param idempotency_token An optional token that can be used to guarantee the
#' idempotency of cluster creation requests. If an active cluster with the
#' provided token already exists, the request will not create a new cluster,
#' but it will return the ID of the existing cluster instead. The existence of a
#' cluster with the same token is not checked against terminated clusters. If
#' you specify the idempotency token, upon failure you can retry until the
#' request succeeds. Databricks guarantees that exactly one cluster will be
#' launched with that idempotency token. This token should have at most 64
#' characters.
#' @param apply_policy_default_values Boolean (Default: `TRUE`), whether to use
#' policy default values for missing cluster attributes.
#' @param enable_local_disk_encryption Boolean (Default: `TRUE`), whether
#' encryption of disks locally attached to the cluster is enabled.
#' @param docker_image Instance of [docker_image()].
#' @param policy_id String, ID of a cluster policy.
#' @param kind The kind of compute described by this compute specification.
#' @param data_security_mode Data security mode decides what data governance
#' model to use when accessing data from a cluster.
#' @inheritParams auth_params
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Create a new Apache Spark cluster. This method acquires new instances from
#' the cloud provider if necessary. This method is asynchronous; the returned
#' `cluster_id` can be used to poll the cluster state ([db_cluster_get()]).
#' When this method returns, the cluster is in a `PENDING` state. The cluster is
#' usable once it enters a `RUNNING` state.
#'
#' Databricks may not be able to acquire some of the requested nodes, due to
#' cloud provider limitations or transient network issues. If Databricks
#' acquires at least 85% of the requested on-demand nodes, cluster creation will
#' succeed. Otherwise the cluster will terminate with an informative error
#' message.
#'
#' Cannot specify both `autoscale` and `num_workers`, must choose one.
#'
#' [More Documentation](https://docs.databricks.com/api/workspace/clusters/create).
#'
#' @family Clusters API
#'
#' @export
db_cluster_create <- function(
  name,
  spark_version,
  node_type_id,
  num_workers = NULL,
  autoscale = NULL,
  spark_conf = list(),
  cloud_attrs = aws_attributes(),
  driver_node_type_id = NULL,
  custom_tags = list(),
  init_scripts = list(),
  spark_env_vars = list(),
  autotermination_minutes = 120,
  log_conf = NULL,
  ssh_public_keys = NULL,
  driver_instance_pool_id = NULL,
  instance_pool_id = NULL,
  idempotency_token = NULL,
  enable_elastic_disk = TRUE,
  apply_policy_default_values = TRUE,
  enable_local_disk_encryption = TRUE,
  docker_image = NULL,
  policy_id = NULL,
  kind = c("CLASSIC_PREVIEW"),
  data_security_mode = c(
    "NONE",
    "SINGLE_USER",
    "USER_ISOLATION",
    "LEGACY_TABLE_ACL",
    "LEGACY_PASSTHROUGH",
    "LEGACY_SINGLE_USER",
    "LEGACY_SINGLE_USER_STANDARD",
    "DATA_SECURITY_MODE_STANDARD",
    "DATA_SECURITY_MODE_DEDICATED",
    "DATA_SECURITY_MODE_AUTO"
  ),
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  # input checks
  # - cloud_attrs must be of class AwsAttributes/AzureAttributes/GcpAttributes
  # - if specified, log_conf must be of class ClusterLogConf
  # - all values in init_scripts must be from init_script_info() (class: InitScriptInfo)
  # - if specified, docker_image must be of class DockerImage

  kind <- match.arg(kind)
  data_security_mode <- match.arg(data_security_mode)

  body <- list(
    cluster_name = name,
    spark_version = spark_version,
    spark_conf = spark_conf,
    node_type_id = node_type_id,
    driver_node_type_id = driver_node_type_id,
    custom_tags = custom_tags,
    log_conf = log_conf,
    init_scripts = init_scripts,
    spark_env_vars = spark_env_vars,
    autotermination_minutes = autotermination_minutes,
    ssh_public_keys = ssh_public_keys,
    enable_elastic_disk = enable_elastic_disk,
    driver_instance_pool_id = driver_instance_pool_id,
    instance_pool_id = instance_pool_id,
    idempotency_token = idempotency_token,
    apply_policy_default_values = apply_policy_default_values,
    enable_local_disk_encryption = enable_local_disk_encryption,
    docker_image = docker_image,
    policy_id = policy_id,
    data_security_mode = data_security_mode,
    kind = kind
  )

  if (is.null(num_workers)) {
    stopifnot(is.cluster_autoscale(autoscale))
    body[["autoscale"]] <- unclass(autoscale)
  } else {
    body[["num_workers"]] <- num_workers
  }

  if (is.aws_attributes(cloud_attrs)) {
    body[["aws_attributes"]] <- unclass(cloud_attrs)
  } else if (is.azure_attributes(cloud_attrs)) {
    body[["azure_attributes"]] <- unclass(cloud_attrs)
  } else if (is.gcp_attributes(cloud_attrs)) {
    body[["gcp_attributes"]] <- unclass(cloud_attrs)
  } else {
    cli::cli_abort(c(
      "Invalid cloud attributes specification:",
      "i" = "Use {.fn aws_attributes}, {.fn azure_attributes}, or {.fn gcp_attributes} for {.arg cloud_attr}"
    ))
  }

  req <- db_request(
    endpoint = "clusters/create",
    method = "POST",
    version = "2.0",
    body = body,
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)
  } else {
    req
  }
}

#' Edit a Cluster
#'
#' Edit the configuration of a cluster to match the provided attributes and
#' size.
#'
#' @param cluster_id Canonical identifier for the cluster.
#' @inheritParams auth_params
#' @inheritParams db_cluster_create
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' You can edit a cluster if it is in a `RUNNING` or `TERMINATED` state. If you
#' edit a cluster while it is in a `RUNNING` state, it will be restarted so that
#' the new attributes can take effect. If you edit a cluster while it is in a
#' `TERMINATED` state, it will remain `TERMINATED.` The next time it is started
#' using the clusters/start API, the new attributes will take effect. An attempt
#' to edit a cluster in any other state will be rejected with an `INVALID_STATE`
#' error code.
#'
#' Clusters created by the Databricks Jobs service cannot be edited.
#'
#' @family Clusters API
#'
#' @export
db_cluster_edit <- function(
  cluster_id,
  spark_version,
  node_type_id,
  num_workers = NULL,
  autoscale = NULL,
  name = NULL,
  spark_conf = NULL,
  cloud_attrs = NULL,
  driver_node_type_id = NULL,
  custom_tags = NULL,
  init_scripts = NULL,
  spark_env_vars = NULL,
  autotermination_minutes = NULL,
  log_conf = NULL,
  ssh_public_keys = NULL,
  driver_instance_pool_id = NULL,
  instance_pool_id = NULL,
  idempotency_token = NULL,
  enable_elastic_disk = NULL,
  apply_policy_default_values = NULL,
  enable_local_disk_encryption = NULL,
  docker_image = NULL,
  policy_id = NULL,
  kind = c("CLASSIC_PREVIEW"),
  data_security_mode = c(
    "NONE",
    "SINGLE_USER",
    "USER_ISOLATION",
    "LEGACY_TABLE_ACL",
    "LEGACY_PASSTHROUGH",
    "LEGACY_SINGLE_USER",
    "LEGACY_SINGLE_USER_STANDARD",
    "DATA_SECURITY_MODE_STANDARD",
    "DATA_SECURITY_MODE_DEDICATED",
    "DATA_SECURITY_MODE_AUTO"
  ),
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  # NOTES:
  # edit is annoying and requires node size/spark version for edit even if
  # they aren't being changed from existing config

  # input checks
  # - cloud_attrs must be of class AwsAttributes or AzureAttributes
  # - if specified, log_conf must be of class ClusterLogConf
  # - all values in init_scripts must be from init_script_info() (class: InitScriptInfo)
  # - if specified, docker_image must be of class DockerImage
  kind <- match.arg(kind)
  data_security_mode <- match.arg(data_security_mode)

  body <- list(
    cluster_id = cluster_id,
    cluster_name = name,
    spark_version = spark_version,
    spark_conf = spark_conf,
    node_type_id = node_type_id,
    driver_node_type_id = driver_node_type_id,
    custom_tags = custom_tags,
    log_conf = log_conf,
    init_scripts = init_scripts,
    spark_env_vars = spark_env_vars,
    autotermination_minutes = autotermination_minutes,
    ssh_public_keys = ssh_public_keys,
    enable_elastic_disk = enable_elastic_disk,
    driver_instance_pool_id = driver_instance_pool_id,
    instance_pool_id = instance_pool_id,
    idempotency_token = idempotency_token,
    apply_policy_default_values = apply_policy_default_values,
    enable_local_disk_encryption = enable_local_disk_encryption,
    docker_image = docker_image,
    policy_id = policy_id,
    data_security_mode = data_security_mode,
    kind = kind
  )

  if (!(is.null(num_workers) && is.null(autoscale))) {
    if (is.null(num_workers)) {
      stopifnot(is.cluster_autoscale(autoscale))
      body[["autoscale"]] <- unclass(autoscale)
    } else {
      body[["num_workers"]] <- num_workers
    }
  }

  if (!is.null(cloud_attrs)) {
    if (is.aws_attributes(cloud_attrs)) {
      body[["aws_attributes"]] <- unclass(cloud_attrs)
    } else if (is.azure_attributes(cloud_attrs)) {
      body[["azure_attributes"]] <- unclass(cloud_attrs)
    } else {
      cli::cli_abort(c(
        "Invalid cloud attributes specification:",
        "i" = "Use {.fn aws_attributes} or {.fn azure_attributes} for {.arg cloud_attr}"
      ))
    }
  }

  body <- purrr::discard(body, is.null)

  req <- db_request(
    endpoint = "clusters/edit",
    method = "POST",
    version = "2.0",
    body = body,
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)
  } else {
    req
  }
}

#' Cluster Action Helper Function
#'
#' @param action One of `start`, `restart`, `delete`, `permanent-delete`, `pin`,
#' `unpin`.
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
db_cluster_action <- function(
  cluster_id,
  action = c("start", "restart", "delete", "permanent-delete", "pin", "unpin"),
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  body <- list(
    cluster_id = cluster_id
  )

  req <- db_request(
    endpoint = paste0("clusters/", action),
    method = "POST",
    version = "2.0",
    body = body,
    host = host,
    token = token
  )

  req <- req |>
    httr2::req_body_json(body)

  if (perform_request) {
    httr2::req_perform(req)
    NULL
  } else {
    req
  }
}

#' Start a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Start a terminated cluster given its ID.
#'
#' This is similar to [db_cluster_create()], except:
#' * The terminated cluster ID and attributes are preserved.
#' * The cluster starts with the last specified cluster size. If the terminated
#' cluster is an autoscaling cluster, the cluster starts with the minimum number
#' of nodes.
#' * If the cluster is in the `RESTARTING` state, a `400` error is returned.
#' * You cannot start a cluster launched to run a job.
#'
#' @family Clusters API
#'
#' @export
db_cluster_start <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  db_cluster_action(cluster_id, "start", host, token, perform_request)
}

#' Restart a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' The cluster must be in the `RUNNING` state.
#'
#' @family Clusters API
#'
#' @export
db_cluster_restart <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  db_cluster_action(cluster_id, "restart", host, token, perform_request)
}

#' Delete/Terminate a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @inherit db_cluster_restart details
#'
#' @export
db_cluster_delete <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  db_cluster_action(cluster_id, "delete", host, token, perform_request)
}

#' Delete/Terminate a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' The cluster is removed asynchronously. Once the termination has completed,
#' the cluster will be in the `TERMINATED` state. If the cluster is already in a
#' `TERMINATING` or `TERMINATED` state, nothing will happen.
#'
#' Unless a cluster is pinned, 30 days after the cluster is terminated, it is
#' permanently deleted.
#'
#' @family Clusters API
#'
#' @export
db_cluster_terminate <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  db_cluster_action(cluster_id, "delete", host, token, perform_request)
}

#' Permanently Delete a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' If the cluster is running, it is terminated and its resources are
#' asynchronously removed. If the cluster is terminated, then it is immediately
#' removed.
#'
#' You cannot perform *any action, including retrieve the cluster’s permissions,
#' on a permanently deleted cluster. A permanently deleted cluster is also no
#' longer returned in the cluster list.
#'
#' @family Clusters API
#'
#' @export
db_cluster_perm_delete <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  db_cluster_action(
    cluster_id,
    "permanent-delete",
    host,
    token,
    perform_request
  )
}

#' Pin a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Ensure that an all-purpose cluster configuration is retained even after a
#' cluster has been terminated for more than 30 days. Pinning ensures that the
#' cluster is always returned by [db_cluster_list()]. Pinning a cluster that is
#' already pinned has no effect.
#'
#' @family Clusters API
#'
#' @export
db_cluster_pin <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  db_cluster_action(cluster_id, "pin", host, token, perform_request)
}

#' Unpin a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Allows the cluster to eventually be removed from the list returned by
#' [db_cluster_list()]. Unpinning a cluster that is not pinned has no effect.
#'
#' @family Clusters API
#'
#' @export
db_cluster_unpin <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  db_cluster_action(cluster_id, "unpin", host, token, perform_request)
}

#' Resize a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details The cluster must be in the `RUNNING` state.
#'
#' @family Clusters API
#'
#' @export
db_cluster_resize <- function(
  cluster_id,
  num_workers = NULL,
  autoscale = NULL,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  if (is.null(num_workers) && is.null(autoscale)) {
    cli::cli_abort(
      "Must specify either {.arg num_workers} or {.arg autoscale}."
    )
  }

  body <- list(
    cluster_id = cluster_id
  )

  if (is.null(num_workers)) {
    stopifnot(is.cluster_autoscale(autoscale))
    body[["autoscale"]] <- unclass(autoscale)
  } else {
    body[["num_workers"]] <- num_workers
  }

  req <- db_request(
    endpoint = "clusters/resize",
    method = "POST",
    version = "2.0",
    body = body,
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)
  } else {
    req
  }
}

#' Get Details of a Cluster
#'
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Retrieve the information for a cluster given its identifier. Clusters can be
#' described while they are running or up to 30 days after they are terminated.
#'
#' @family Clusters API
#'
#' @export
db_cluster_get <- function(
  cluster_id,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  body <- list(
    cluster_id = cluster_id
  )

  req <- db_request(
    endpoint = "clusters/get",
    method = "GET",
    version = "2.0",
    body = body,
    host = host,
    token = token
  )

  req <- req |>
    httr2::req_body_json(body)

  if (perform_request) {
    req |>
      httr2::req_perform() |>
      httr2::resp_body_json()
  } else {
    req
  }
}

#' List Clusters
#'
#' @inheritParams auth_params
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Return information about all pinned clusters, active clusters, up to 150 of
#' the most recently terminated all-purpose clusters in the past 30 days, and up
#' to 30 of the most recently terminated job clusters in the past 30 days.
#'
#' For example, if there is 1 pinned cluster, 4 active clusters, 45 terminated
#' all-purpose clusters in the past 30 days, and 50 terminated job clusters in
#' the past 30 days, then this API returns:
#' * the 1 pinned cluster
#' * 4 active clusters
#' * All 45 terminated all-purpose clusters
#' * The 30 most recently terminated job clusters
#'
#' @family Clusters API
#'
#' @export
db_cluster_list <- function(
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  req <- db_request(
    endpoint = "clusters/list",
    method = "GET",
    version = "2.0",
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)$clusters
  } else {
    req
  }
}

#' List Available Cluster Node Types
#'
#' @inheritParams auth_params
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Return a list of supported Spark node types. These node types can be used to
#' launch a cluster.
#'
#' @family Clusters API
#'
#' @export
db_cluster_list_node_types <- function(
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  req <- db_request(
    endpoint = "clusters/list-node-types",
    method = "GET",
    version = "2.0",
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)
  } else {
    req
  }
}

#' List Available Databricks Runtime Versions
#'
#' @inheritParams auth_params
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Return the list of available runtime versions. These versions can be used to
#' launch a cluster.
#'
#' @family Clusters API
#'
#' @export
db_cluster_runtime_versions <- function(
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  req <- db_request(
    endpoint = "clusters/spark-versions",
    method = "GET",
    version = "2.0",
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)
  } else {
    req
  }
}

#' List Availability Zones (AWS Only)
#'
#' @inheritParams auth_params
#' @inheritParams db_sql_warehouse_create
#'
#' @family Clusters API
#'
#' @details
#' **Amazon Web Services (AWS) ONLY!**
#' Return a list of availability zones where clusters can be created in
#' (ex: us-west-2a). These zones can be used to launch a cluster.
#'
#' @export
db_cluster_list_zones <- function(
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  req <- db_request(
    endpoint = "clusters/list-zones",
    method = "GET",
    version = "2.0",
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)
  } else {
    req
  }
}

#' List Cluster Activity Events
#'
#' @param cluster_id The ID of the cluster to retrieve events about.
#' @param start_time The start time in epoch milliseconds. If empty, returns
#' events starting from the beginning of time.
#' @param end_time The end time in epoch milliseconds. If empty, returns events
#' up to the current time.
#' @param event_types List. Optional set of event types to filter by. Default
#' is to return all events. [Event Types](https://docs.databricks.com/api/workspace/clusters/events#events).
#' @param order Either `DESC` (default) or `ASC`.
#' @param offset The offset in the result set. Defaults to 0 (no offset). When
#' an offset is specified and the results are requested in descending order, the
#' end_time field is required.
#' @param limit Maximum number of events to include in a page of events.
#' Defaults to 50, and maximum allowed value is 500.
#' @inheritParams auth_params
#' @inheritParams db_sql_warehouse_create
#'
#' @details
#' Retrieve a list of events about the activity of a cluster. You can retrieve
#' events from active clusters (running, pending, or reconfiguring) and
#' terminated clusters within 30 days of their last termination. This API is
#' paginated. If there are more events to read, the response includes all the
#' parameters necessary to request the next page of events.
#'
#' @family Clusters API
#'
#' @export
db_cluster_events <- function(
  cluster_id,
  start_time = NULL,
  end_time = NULL,
  event_types = NULL,
  order = c("DESC", "ASC"),
  offset = 0,
  limit = 50,
  host = db_host(),
  token = db_token(),
  perform_request = TRUE
) {
  order <- match.arg(order, several.ok = FALSE)
  stopifnot(
    offset >= 0,
    limit > 0 && limit <= 500
  )

  body <- list(
    cluster_id = cluster_id,
    start_time = as.integer(start_time),
    end_time = as.integer(end_time),
    order = order,
    offset = as.integer(offset),
    limit = as.integer(limit)
  )

  req <- db_request(
    endpoint = "clusters/events",
    method = "POST",
    version = "2.0",
    body = body,
    host = host,
    token = token
  )

  if (perform_request) {
    db_perform_request(req)$events
  } else {
    req
  }
}

### Higher Functions ###########################################################

#' Get and Start Cluster
#'
#' @param polling_interval Number of seconds to wait between status checks
#' @inheritParams auth_params
#' @inheritParams db_cluster_edit
#' @param silent Boolean (default: `FALSE`), will emit cluster state progress
#' if `TRUE`.
#'
#' @details Get information regarding a Databricks cluster. If the cluster is
#' inactive it will be started and wait until the cluster is active.
#'
#' @seealso [db_cluster_get()] and [db_cluster_start()].
#'
#' @family Clusters API
#' @family Cluster Helpers
#'
#' @return `db_cluster_get()`
#' @export
get_and_start_cluster <- function(
  cluster_id,
  polling_interval = 5,
  host = db_host(),
  token = db_token(),
  silent = FALSE
) {
  # get cluster status
  cluster_status <- db_cluster_get(
    cluster_id = cluster_id,
    host = host,
    token = token
  )

  if (!silent) {
    msg <- "{.header Checking cluster:} {.emph '{cluster_id}'}"
    msg_done <- "{.header Checking cluster:} {.emph {cluster_status$cluster_name}}"
    cli::cli_progress_step(msg, msg_done)
  }

  # if the cluster isn't running, start it
  if (
    !cluster_status$state %in%
      c("RUNNING", "PENDING", "RESIZING", "STARTING", "RESTARTING")
  ) {
    db_cluster_start(cluster_id = cluster_id, host = host, token = token)
  }

  # wait for cluster to become active
  if (!silent) {
    msg <- "{.header Attaching to cluster:} {.emph [{cluster_status$state}] '{cluster_status$state_message}'}"
    msg_done <- "{.header Attached to cluster}"
    msg_failed <- "{.header Cluster entered [{cluster_status$state}] state}"
    cli::cli_progress_step(msg, msg_done, msg_failed)
  }

  while (cluster_status$state != "RUNNING") {
    Sys.sleep(polling_interval)
    cluster_status <- db_cluster_get(
      cluster_id = cluster_id,
      host = host,
      token = token
    )
    if (!silent) {
      cli::cli_progress_update()
    }
    if (cluster_status$state %in% c("TERMINATED", "TERMINATING")) {
      if (!silent) {
        cli::cli_progress_done(result = "failed")
      }
      break
    }
  }

  if (!silent) {
    cli::cli_progress_done()
    cli::cli_end()
  }

  cluster_status
}


#' Get Latest Databricks Runtime (DBR)
#'
#' @inheritParams auth_params
#' @inheritParams db_sql_warehouse_create
#' @param lts Boolean, if `TRUE` returns only LTS runtimes
#' @param ml Boolean, if `TRUE` returns only ML runtimes
#' @param gpu Boolean, if `TRUE` returns only ML GPU runtimes
#' @param photon Boolean, if `TRUE` returns only photon runtimes
#'
#' @details
#' There are runtime combinations that are not possible, such as GPU/ML and
#' photon. This function does not permit invalid combinations.
#'
#'
#' @family Clusters API
#' @family Cluster Helpers
#'
#' @return Named list
#' @export
#'
#' @importFrom rlang .data
get_latest_dbr <- function(
  lts,
  ml,
  gpu,
  photon,
  host = db_host(),
  token = db_token()
) {
  # don't allow impossible combinations
  if (gpu) {
    if (!ml) {
      cli::cli_abort("{.arg gpu} runtime only available for {.arg ml} versions")
    }
  }

  if ((gpu || ml) && photon) {
    cli::cli_abort(
      "Cannot use {.arg ml}/{.arg gpu} runtimes with {.arg photon}"
    )
  }

  runtimes <- db_cluster_runtime_versions(host = host, token = token)

  runtimes_adj <- runtimes[[1]] |>
    purrr::map_dfr(function(x) {
      list(key = x[["key"]], name = x[["name"]])
    }) |>
    dplyr::mutate(
      version = as.numeric(gsub("^(\\d+\\.\\d)\\..*", "\\1", .data$key)),
      lts = grepl("LTS", .data$name),
      ml = grepl("ml", .data$key),
      gpu = grepl("gpu", .data$key),
      photon = grepl("photon", .data$key),
    ) |>
    dplyr::arrange(dplyr::desc(version))

  runtime_matches <- runtimes_adj |>
    dplyr::filter(
      .data$lts == {{ lts }},
      .data$ml == {{ ml }},
      .data$gpu == {{ gpu }},
      .data$photon == {{ photon }}
    ) |>
    dplyr::slice_head(n = 1)

  list(
    key = runtime_matches[["key"]],
    name = runtime_matches[["name"]]
  )
}

Try the brickster package in your browser

Any scripts or data that you put into this service are public.

brickster documentation built on Sept. 9, 2025, 5:33 p.m.