R/Result.R

Defines functions AthenaResult

#' @include Connection.R
#' @include utils.R
#' @include fetch_utils.R
NULL

AthenaResult <- function(conn,
                         statement = NULL,
                         s3_staging_dir = NULL,
                         unload = FALSE) {
  stopifnot(is.character(statement))
  response <- new.env(parent = emptyenv())
  response[["Query"]] <- statement

  s3_file <- NULL

  if (athena_option_env$cache_size > 0) {
    ll <- check_cache(statement, conn@info$work_group)
    response[["QueryExecutionId"]] <- ll[[1]]
    s3_file <- if (identical(ll[[2]], "")) NULL else ll[[2]]
  }
  # modify sql statement if user requests AWS Athena unload
  if (unload) {
    s3_file <- s3_file %||% uuid::UUIDgenerate()
    statement <- sprintf(
      "UNLOAD (\n%s)\nTO '%s'\nWITH (format = 'PARQUET',compression = 'SNAPPY')",
      statement, file.path(gsub("/$", "", s3_staging_dir), s3_file)
    )
  }

  if (is.null(response$QueryExecutionId)) {
    retry_api_call(
      response[["QueryExecutionId"]] <- py_to_r(conn@ptr$Athena$start_query_execution(
        ClientRequestToken = uuid::UUIDgenerate(),
        QueryString = statement,
        QueryExecutionContext = list(Database = conn@info$dbms.name),
        ResultConfiguration = ResultConfiguration(conn),
        WorkGroup = conn@info$work_group
      )$QueryExecutionId)
    )
  }
  on.exit(if (!is.null(conn@info$expiration)) time_check(conn@info$expiration))
  response[["UnloadDir"]] <- s3_file
  new("AthenaResult", connection = conn, info = response)
}

#' @rdname AthenaConnection
#' @export
setClass(
  "AthenaResult",
  contains = "DBIResult",
  slots = list(
    connection = "AthenaConnection",
    info = "environment"
  )
)

#' Clear Results
#'
#' Frees all resources (local and Athena) associated with result set. It does this by removing query output in AWS S3 Bucket,
#' stopping query execution if still running and removed the connection resource locally.
#'
#' @note If the user does not have permission to remove AWS S3 resource from AWS Athena output location, then an AWS warning will be returned.
#'       It is better use query caching or optionally prevent clear AWS S3 resource using \code{\link{RAthena_options}}
#'       so that the warning doesn't repeatedly show.
#' @name dbClearResult
#' @inheritParams DBI::dbClearResult
#' @return \code{dbClearResult()} returns \code{TRUE}, invisibly.
#' @seealso \code{\link[DBI]{dbIsValid}}
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documnentation
#'
#' library(DBI)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' res <- dbSendQuery(con, "show databases")
#' dbClearResult(res)
#'
#' # Check if connection if valid after closing connection
#' dbDisconnect(con)
#' }
#' @docType methods
NULL

#' @rdname dbClearResult
#' @export
setMethod(
  "dbClearResult", "AthenaResult",
  function(res, ...) {
    if (!dbIsValid(res)) {
      warning("Result already cleared", call. = FALSE)
    } else {
      # stops resource if query is still running
      retry_api_call(res@connection@ptr$Athena$stop_query_execution(
        QueryExecutionId = res@info[["QueryExecutionId"]]
      ))

      # checks status of query
      if (is.null(res@info[["Status"]])) {
        retry_api_call(query_execution <- py_to_r(res@connection@ptr$Athena$get_query_execution(
          QueryExecutionId = res@info$QueryExecutionId
        )))
        res@info[["OutputLocation"]] <-
          query_execution[["QueryExecution"]][["ResultConfiguration"]][["OutputLocation"]]
        res@info[["StatementType"]] <-
          query_execution[["QueryExecution"]][["StatementType"]]
      }

      # Don't clear S3 resource for caching or skipping
      if (athena_option_env$cache_size == 0 & athena_option_env$clear_s3_resource) {
        result_info <- split_s3_uri(res@info[["OutputLocation"]])

        # Output Python error as warning if s3 resource can't be dropped
        tryCatch(
          res@connection@ptr$S3$delete_object(
            Bucket = result_info[["bucket"]], Key = paste0(result_info[["key"]], ".metadata")
          ),
          error = function(e) py_warning(e)
        )

        # remove manifest csv created with CTAS statements
        if (res@info[["StatementType"]] == "DDL" || !is.null(res@info[["UnloadDir"]])) {
          tryCatch(
            {
              res@connection@ptr$S3$delete_object(
                Bucket = result_info[["bucket"]],
                Key = paste0(result_info[["key"]], "-manifest.csv")
              )
            },
            error = function(e) NULL
          )
        }
        # remove AWS Athena results
        if (is.null(res@info[["UnloadDir"]])) {
          tryCatch(
            res@connection@ptr$S3$delete_object(
              Bucket = result_info[["bucket"]], Key = result_info[["key"]]
            ),
            error = function(e) NULL
          )
        } else {
          # Check S3 Prefix for AWS Athena results
          result_info <- split_s3_uri(res@connection@info[["s3_staging"]])
          result_info$key <- file.path(gsub("/$", "", result_info$key), res@info$UnloadDir)
          all_keys <- list()
          i <- 1
          # Get all s3 objects linked to table
          kwargs <- list(Bucket = result_info[["bucket"]], Prefix = result_info[["key"]])
          token <- ""
          while (!is.null(token)) {
            kwargs[["ContinuationToken"]] <- (if (!nzchar(token)) NULL else token)
            objects <- py_to_r(do.call(res@connection@ptr$S3$list_objects_v2, kwargs))
            all_keys[[i]] <- lapply(objects$Contents, function(x) list(Key = x$Key))
            token <- objects$NextContinuationToken
            i <- i + 1
          }

          all_keys <- unlist(all_keys, recursive = FALSE, use.names = FALSE)

          # Only remove if files are found
          if (length(all_keys) > 0) {
            # Delete S3 files in batch size 1000
            key_parts <- split_vec(all_keys, 1000)
            for (i in seq_along(key_parts)) {
              tryCatch(
                {
                  res@connection@ptr$S3$delete_objects(
                    Bucket = result_info[["bucket"]],
                    Delete = list(Objects = key_parts[[i]])
                  )
                },
                error = function(e) NULL
              )
            }
          }
        }
      }
      # remove query information
      rm(list = ls(all.names = TRUE, envir = res@info), envir = res@info)
    }
    return(invisible(TRUE))
  }
)

#' Fetch records from previously executed query
#'
#' Currently returns the top n elements (rows) from result set or returns entire table from Athena.
#' @name dbFetch
#' @param n maximum number of records to retrieve per fetch. Use \code{n = -1} or \code{n = Inf} to retrieve all pending records.
#'          Some implementations may recognize other special values. Currently chunk sizes range from 0 to 999,
#'          if entire dataframe is required use \code{n = -1} or \code{n = Inf}.
#' @inheritParams DBI::dbFetch
#' @return \code{dbFetch()} returns a data frame.
#' @seealso \code{\link[DBI]{dbFetch}}
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documnentation
#'
#' library(DBI)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' res <- dbSendQuery(con, "show databases")
#' dbFetch(res)
#' dbClearResult(res)
#'
#' # Disconnect from Athena
#' dbDisconnect(con)
#' }
#' @docType methods
NULL

#' @rdname dbFetch
#' @export
setMethod(
  "dbFetch", "AthenaResult",
  function(res, n = -1, ...) {
    con_error_msg(res, msg = "Result already cleared.")

    # check status of query, skip poll if status found
    if (is.null(res@info[["Status"]])) {
      poll(res)
    }

    # if query failed stop
    if (res@info[["Status"]] == "FAILED") {
      stop(res@info[["StateChangeReason"]], call. = FALSE)
    }

    # cache query metadata if caching is enabled
    if (athena_option_env[["cache_size"]] > 0) {
      cache_query(res)
    }

    # return metadata of athena data types
    retry_api_call(result_class <- py_to_r(res@connection@ptr$Athena$get_query_results(
      QueryExecutionId = res@info$QueryExecutionId,
      MaxResults = as.integer(1)
    )[["ResultSet"]][["ResultSetMetadata"]][["ColumnInfo"]]))
    if (n >= 0 && n != Inf) {
      return(.fetch_n(res, result_class, n))
    }

    # Added data scan information when returning data from athena
    info_msg(
      "(Data scanned: ", data_scanned(res@info[["Statistics"]][["DataScannedInBytes"]]), ")"
    )
    if (!is.null(res@info[["UnloadDir"]])) {
      .fetch_unload(res)
    } else {
      .fetch_file(res, result_class)
    }
  }
)

#' Completion status
#'
#' This method returns if the query has completed.
#' @name dbHasCompleted
#' @inheritParams DBI::dbHasCompleted
#' @return \code{dbHasCompleted()} returns a logical scalar. \code{TRUE} if the query has completed, \code{FALSE} otherwise.
#' @seealso \code{\link[DBI]{dbHasCompleted}}
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documnentation
#'
#' library(DBI)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' # Check if query has completed
#' res <- dbSendQuery(con, "show databases")
#' dbHasCompleted(res)
#'
#' dbClearResult(res)
#'
#' # Disconnect from Athena
#' dbDisconnect(con)
#' }
#' @docType methods
NULL

#' @rdname dbHasCompleted
#' @export
setMethod(
  "dbHasCompleted", "AthenaResult",
  function(res, ...) {
    con_error_msg(res, msg = "Result already cleared.")

    output <- TRUE
    # if status has already return then output TRUE
    if (!is.null(res@info[["Status"]])) {
      return(output)
    }

    # get status of query
    retry_api_call(query_execution <- py_to_r(res@connection@ptr$Athena$get_query_execution(
      QueryExecutionId = res@info[["QueryExecutionId"]]
    )))
    if (query_execution[["QueryExecution"]][["Status"]][["State"]] %in% c("RUNNING", "QUEUED")) {
      output <- FALSE
    }

    return(output)
  }
)

#' @rdname dbIsValid
#' @export
setMethod(
  "dbIsValid", "AthenaResult",
  function(dbObj, ...) {
    resource_active(dbObj)
  }
)

#' @rdname dbGetInfo
#' @inheritParams DBI::dbGetInfo
#' @export
setMethod(
  "dbGetInfo", "AthenaResult",
  function(dbObj, ...) {
    con_error_msg(dbObj, msg = "Result already cleared.")
    info <- as.list(dbObj@info)
    return(info)
  }
)

#' Information about result types
#'
#' Produces a data.frame that describes the output of a query.
#' @name dbColumnInfo
#' @inheritParams DBI::dbColumnInfo
#' @return \code{dbColumnInfo()} returns a data.frame with as many rows as there are output fields in the result.
#'         The data.frame has two columns (field_name, type).
#' @seealso \code{\link[DBI]{dbHasCompleted}}
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documnentation
#'
#' library(DBI)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' # Get Column information from query
#' res <- dbSendQuery(con, "select * from information_schema.tables")
#' dbColumnInfo(res)
#' dbClearResult(res)
#'
#' # Disconnect from Athena
#' dbDisconnect(con)
#' }
#' @docType methods
NULL

#' @rdname dbColumnInfo
#' @export
setMethod(
  "dbColumnInfo", "AthenaResult",
  function(res, ...) {
    con_error_msg(res, msg = "Result already cleared.")

    # check status of query, skip poll if status found
    if (is.null(res@info[["Status"]])) {
      poll(res)
    }

    # if query failed stop
    if (res@info[["Status"]] == "FAILED") {
      stop(res@info[["StateChangeReason"]], call. = FALSE)
    }

    retry_api_call(result <- py_to_r(res@connection@ptr$Athena$get_query_results(
      QueryExecutionId = res@info[["QueryExecutionId"]],
      MaxResults = as.integer(1)
    )))
    Name <- vapply(result[["ResultSet"]][["ResultSetMetadata"]][["ColumnInfo"]],
      function(x) x$Name,
      FUN.VALUE = character(1)
    )
    Type <- vapply(result[["ResultSet"]][["ResultSetMetadata"]][["ColumnInfo"]],
      function(x) x$Type,
      FUN.VALUE = character(1)
    )
    data.frame(
      field_name = Name,
      type = Type, stringsAsFactors = F
    )
  }
)

#' Show AWS Athena Statistics
#'
#' @description Returns AWS Athena Statistics from execute queries \code{\link{dbSendQuery}}
#' @inheritParams DBI::dbColumnInfo
#' @name dbStatistics
#' @return \code{dbStatistics()} returns list containing Athena Statistics return from \code{boto3}.
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documnentation
#'
#' library(DBI)
#' library(RAthena)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' res <- dbSendQuery(con, "show databases")
#' dbStatistics(res)
#'
#' # Clean up
#' dbClearResult(res)
#' }
#' @docType methods
NULL

#' @rdname dbStatistics
#' @export
setGeneric("dbStatistics",
  def = function(res, ...) standardGeneric("dbStatistics")
)

#' @rdname dbStatistics
#' @export
setMethod(
  "dbStatistics", "AthenaResult",
  function(res, ...) {
    con_error_msg(res, msg = "Result already cleared.")

    # check status of query, skip poll if status found
    if (is.null(res@info[["Status"]])) {
      poll(res)
    }

    # if query failed stop
    if (res@info[["Status"]] == "FAILED") {
      stop(res@info[["StateChangeReason"]], call. = FALSE)
    }
    return(res@info[["Statistics"]])
  }
)

#' Get the statement associated with a result set
#'
#' Returns the statement that was passed to [dbSendQuery()]
#' or [dbSendStatement()].
#' @name dbGetStatement
#' @inheritParams DBI::dbGetStatement
#' @return \code{dbGetStatement()} returns a character.
#' @seealso \code{\link[DBI]{dbGetStatement}}
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documnentation
#'
#' library(DBI)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' rs <- dbSendQuery(con, "SHOW TABLES in default")
#' dbGetStatement(rs)
#' }
#' @docType methods
NULL

#' @rdname dbGetStatement
#' @export
setMethod(
  "dbGetStatement", "AthenaResult",
  function(res, ...) {
    con_error_msg(res, msg = "Result already cleared.")
    return(res@info[["Query"]])
  }
)

Try the RAthena package in your browser

Any scripts or data that you put into this service are public.

RAthena documentation built on Dec. 28, 2022, 1:19 a.m.