R/aws_s3.R

Defines functions getExcelSheetsFromS3ExcelFile getExcelFilesFromS3 searchAndGetExcelFilesFromS3 getExcelFileFromS3 searchAndGetParquetFilesFromS3 getParquetFilesFromS3 getParquetFileFromS3 searchAndGetCSVFilesFromS3 getCSVFilesFromS3 getCSVFileFromS3 guessFileEncodingForS3File clearS3CacheFile downloadDataFileFromS3 getS3Folders

#' API to get folder names for the S3 Bucket
#' @param bucket
#' @param prefix
#' @export
getS3Folders <- function(bucket, prefix = NULL, ...) {
  # To get all folders, pass Inf.
  max <- Inf
  # 1000 is limit per call.
  limit <- 1000
  query <- list(prefix = prefix, delimiter = "/", "max-keys" = limit, marker = NULL)
  result <- aws.s3::s3HTTP(verb = "GET", bucket = bucket, query = query, parse_response = TRUE, ...)
  # Handle pagination for large result set.
  while (result[["IsTruncated"]] == "true") { # if IsTruncted is true, need to send another request to get the remaining result.
    # Get the last row from the result and check the Key and pass that as a new marker to make the pagination works.
    nextMarker =  tail(result, 1)[["Contents"]][["Key"]]
    query <- list(prefix = prefix, delimiter = "/", "max-keys" = 1000, marker = nextMarker)
    # Send another query to get remaining.
    additionalResult <- aws.s3::s3HTTP(verb = "GET", bucket = bucket, query = query, parse_response = TRUE, ...)
    # Append additional query result
    combinedResult <- c(result, additionalResult)
    combinedResult[["IsTruncated"]] <- additionalResult[["IsTruncated"]]
    result <- combinedResult
  }
  # Folders are stored under CommonPrefixes
  df <- data.frame(result[names(result) == "CommonPrefixes"])

  # The data frame looks like this so gather columns and keep only "folder" column.
  #   CommonPrefixes CommonPrefixes.1 CommonPrefixes.2
  # 1          data/           data2/           data3/
  if (nrow(df) > 0) {
    df %>% tidyr::gather(key="key", value="folder") %>% select("folder")
  } else { # if no folder is found, return empty data frame.
    data.frame()
  }
}


#' API to download remote data file (excel, csv) from Amazon S3 and cache it if necessary
#' it uses tempfile https://stat.ethz.ch/R-manual/R-devel/library/base/html/tempfile.html
#' and a R variable with name of hashed region, bucket, key, secret, fileName are  assigned to the path given by tempfile.
downloadDataFileFromS3 <- function(region, bucket, key, secret, fileName, as = "text"){
  shouldCacheFile <- getOption("tam.should.cache.datafile")
  filepath <- NULL
  hash <- digest::digest(stringr::str_c(region, bucket, fileName, sep = ":"), "md5", serialize = FALSE)
  tryCatch({
    filepath <- getDownloadedFilePath(hash)
  }, error = function(e){
    # if filePath hash is not set as global variable yet, it raises error that says object not found
    # which can be ignored
    filepath <- NULL
  })
  # Check if cached excel/csv exists for the filepath
  if (!is.null(shouldCacheFile) && isTRUE(shouldCacheFile) && !is.null(filepath)) {
    filepath
  } else {
    ext <- stringr::str_to_lower(tools::file_ext(fileName))
    tmp <- tempfile(fileext = stringr::str_c(".", ext))

    # In case of using Rserve on linux, somehow it doesn't create a temporary
    # directory specified by tempdir() which is used as a part of temp file
    # path generated by tempfile(). So if you try to use that temp file path,
    # dump some data into it for example, it will fail because no such path
    # found. This function fails with the same reason at download.file below.
    #
    # It works fine from the R command line on linux, and it works
    # fine all the time on Mac and Windows regardless Rserv or not.
    #
    # The following command is harmless even if you have the directory already.
    # http://stackoverflow.com/questions/4216753/check-existence-of-directory-and-create-if-doesnt-exist
    dir.create(tempdir(), showWarnings = FALSE)

    # download file to temporary location
    aws.s3::save_object(fileName, bucket = bucket, as = as, region = region, key = key, secret = secret, file = tmp)
    # cache file
    if(!is.null(shouldCacheFile) && isTRUE(shouldCacheFile)){
      setDownloadedFilePath(hash, tmp)
    }
    tmp
  }
}


#' API to clear S3 cache file
#' @param region
#' @param bucket
#' @param filenName
#' @export
clearS3CacheFile <- function(region, bucket, fileName){
  options(tam.should.cache.datafile = FALSE)
  hash <- digest::digest(stringr::str_c(region, bucket, fileName, sep = ":"), "md5", serialize = FALSE)
  tryCatch({
    filepath <- eval(as.name(hash))
    do.call(rm, c(as.name(hash)),envir = .GlobalEnv)
    unlink(filepath)
  }, error = function(e){
  })
}

#'Wrapper for readr::guess_encoding to support aws s3 csv file
#'@export
guessFileEncodingForS3File <- function(region, bucket, key, secret, fileName, n_max = 1e4, threshold = 0.20){
  loadNamespace("readr")
  filePath <- downloadDataFileFromS3(region = region, bucket = bucket, key = key, secret = secret, fileName = fileName, as = "text")
  readr::guess_encoding(filePath, n_max, threshold)

}


#'API that imports a CSV file from AWS S3.
#'@export
getCSVFileFromS3 <- function(fileName, region, username, password, bucket, delim, quote = '"',
                             escape_backslash = FALSE, escape_double = TRUE,
                             col_names = TRUE, col_types = NULL,
                             locale = readr::default_locale(),
                             na = c("", "NA"), quoted_na = TRUE,
                             comment = "", trim_ws = FALSE,
                             skip = 0, n_max = Inf, guess_max = min(1000, n_max),
                             progress = interactive()) {
  tryCatch({
    filePath <- downloadDataFileFromS3(region = region, bucket = bucket, key = username, secret = password, fileName = fileName, as = "text")
  }, error = function(e) {
    if (stringr::str_detect(e$message, "(Not Found|Moved Permanently)")) {
      # Looking for error that looks like "Error in parse_aws_s3_response(r, Sig, verbose = verbose) :\n Moved Permanently (HTTP 301).",
      # or "Not Found (HTTP 404).".
      # This seems to be returned when the bucket itself does not exist.
      stop(paste0('EXP-DATASRC-8 :: ', jsonlite::toJSON(c(bucket, fileName)), ' :: There is no such file in the AWS S3 bucket.'))
    }
    else {
      stop(e)
    }
  })
  exploratory::read_delim_file(filePath, delim = delim, quote = quote,
                               escape_backslash = escape_backslash, escape_double = escape_double,
                               col_names = col_names, col_types = col_types,
                               locale = locale,
                               na = na, quoted_na = quoted_na,
                               comment = comment, trim_ws = trim_ws,
                               skip = skip, n_max = n_max, guess_max = guess_max,
                               progress = progress)
}

#'API that imports multiple same structure CSV files and merge it to a single data frame
#'
#'For col_types parameter, by default it forces character to make sure that merging the CSV based data frames doesn't error out due to column data types mismatch.
# Once the data frames merging is done, readr::type_convert is called from Exploratory Desktop to restore the column data types.

#'@export
getCSVFilesFromS3 <- function(files, region, username, password, bucket, forPreview = FALSE, delim, quote = '"',
                             escape_backslash = FALSE, escape_double = TRUE,
                             col_names = TRUE, col_types = readr::cols(.default = readr::col_character()),
                             locale = readr::default_locale(),
                             na = c("", "NA"), quoted_na = TRUE,
                             comment = "", trim_ws = FALSE,
                             skip = 0, n_max = Inf, guess_max = min(1000, n_max),
                             progress = interactive()) {
  # for preview mode, just use the first file.
  if (forPreview & length(files) > 0) {
    files <- files[1]
  }
  # set name to the files so that it can be used for the "id" column created by purrr:map_dfr.
  files <- setNames(as.list(files), files)
  df <- purrr::map_dfr(files, exploratory::getCSVFileFromS3, region = region, username = username, password = password, bucket = bucket, delim = delim, quote = quote,
                       escape_backslash = escape_backslash, escape_double = escape_double,
                       col_names = col_names, col_types = col_types,
                       locale = locale,
                       na = na, quoted_na = quoted_na,
                       comment = comment, trim_ws = trim_ws,
                       skip = skip, n_max = n_max, guess_max = guess_max,
                       progress = progress, .id = "exp.file.id") %>% mutate(exp.file.id = basename(exp.file.id))  # extract file name from full path with basename and create file.id column.
  id_col <- avoid_conflict(colnames(df), "id")
  # copy internal exp.file.id to the id column.
  df[[id_col]] <- df[["exp.file.id"]]
  # drop internal column and move the id column to the very beginning.
  df %>% dplyr::select(!!rlang::sym(id_col), dplyr::everything(), -exp.file.id)
}

#'API that search files by search keyword then imports multiple same structure CSV files and merge it to a single data frame
#'
#'For col_types parameter, by default it forces character to make sure that merging the CSV based data frames doesn't error out due to column data types mismatch.
# Once the data frames merging is done, readr::type_convert is called from Exploratory Desktop to restore the column data types.

#'@export
searchAndGetCSVFilesFromS3 <- function(searchKeyword, region, username, password, bucket, forPreview = FALSE, delim, quote = '"',
                              escape_backslash = FALSE, escape_double = TRUE,
                              col_names = TRUE, col_types = readr::cols(.default = readr::col_character()),
                              locale = readr::default_locale(),
                              na = c("", "NA"), quoted_na = TRUE,
                              comment = "", trim_ws = FALSE,
                              skip = 0, n_max = Inf, guess_max = min(1000, n_max),
                              progress = interactive()) {

  # search condition is case insensitive. (ref: https://www.regular-expressions.info/modifiers.html, https://stackoverflow.com/questions/5671719/case-insensitive-search-of-a-list-in-r)
  tryCatch({
    files <- aws.s3::get_bucket_df(region = region, bucket = bucket, key = username, secret = password, max= Inf) %>%
      filter(str_detect(Key, stringr::str_c("(?i)", searchKeyword)))
  }, error = function(e) {
    if (stringr::str_detect(e$message, "(Not Found|Moved Permanently)")) {
      # Looking for error that looks like "Error in parse_aws_s3_response(r, Sig, verbose = verbose) :\n Moved Permanently (HTTP 301).",
      # or "Not Found (HTTP 404).".
      # This seems to be returned when the bucket itself does not exist.
      stop(paste0('EXP-DATASRC-7 :: ', jsonlite::toJSON(bucket), ' :: The specified AWS S3 bucket does not exist.'))
    }
    else {
      stop(e)
    }
  })
  if (nrow(files) == 0) {
    stop(paste0('EXP-DATASRC-4 :: ', jsonlite::toJSON(bucket), ' :: There is no file in the AWS S3 bucket that matches with the specified condition.')) # TODO: escape bucket name.
  }
  getCSVFilesFromS3(files = files$Key, region = region, username = username, password = password, bucket = bucket, forPreview = forPreview, delim = delim, quote = quote,
                    col_names = col_names, col_types = col_types, locale = locale, na = na, quoted_na = quoted_na, comment = comment, trim_ws = trim_ws,
                    skip = skip, n_max = n_max, guess_max = guess_max, progress = progress)

}

#'API that imports a Parquet file from AWS S3.
#'@export
getParquetFileFromS3 <- function(fileName, region, username, password, bucket, col_select = NULL) {
  tryCatch({
    filePath <- downloadDataFileFromS3(region = region, bucket = bucket, key = username, secret = password, fileName = fileName, as = "text")
  }, error = function(e) {
    if (stringr::str_detect(e$message, "(Not Found|Moved Permanently)")) {
      # Looking for error that looks like "Error in parse_aws_s3_response(r, Sig, verbose = verbose) :\n Moved Permanently (HTTP 301).",
      # or "Not Found (HTTP 404).".
      # This seems to be returned when the bucket itself does not exist.
      stop(paste0('EXP-DATASRC-8 :: ', jsonlite::toJSON(c(bucket, fileName)), ' :: There is no such file in the AWS S3 bucket.'))
    }
    else {
      stop(e)
    }
  })
  exploratory::read_parquet_file(filePath, col_select = col_select)
}

#'API that imports multiple same structure Parquet files and merge it to a single data frame
#'
#'@export
getParquetFilesFromS3 <- function(files, region, username, password, bucket, forPreview = FALSE, col_select = NULL) {
  # for preview mode, just use the first file.
  if (forPreview & length(files) > 0) {
    files <- files[1]
  }
  # set name to the files so that it can be used for the "id" column created by purrr:map_dfr.
  files <- setNames(as.list(files), files)
  df <- purrr::map_dfr(files, exploratory::getParquetFileFromS3, region = region, username = username, password = password, bucket = bucket, col_select = col_select, .id = "exp.file.id") %>% mutate(exp.file.id = basename(exp.file.id))  # extract file name from full path with basename and create file.id column.
  id_col <- avoid_conflict(colnames(df), "id")
  # copy internal exp.file.id to the id column.
  df[[id_col]] <- df[["exp.file.id"]]
  # drop internal column and move the id column to the very beginning.
  df %>% dplyr::select(!!rlang::sym(id_col), dplyr::everything(), -exp.file.id)
}

#'API that search files by search keyword then imports multiple same structure Parquet files and merge it to a single data frame
#'
#'@export
searchAndGetParquetFilesFromS3 <- function(searchKeyword, region, username, password, bucket, forPreview = FALSE, col_select = NULL) {

  # search condition is case insensitive. (ref: https://www.regular-expressions.info/modifiers.html, https://stackoverflow.com/questions/5671719/case-insensitive-search-of-a-list-in-r)
  tryCatch({
    files <- aws.s3::get_bucket_df(region = region, bucket = bucket, key = username, secret = password, max= Inf) %>%
      filter(str_detect(Key, stringr::str_c("(?i)", searchKeyword)))
  }, error = function(e) {
    if (stringr::str_detect(e$message, "(Not Found|Moved Permanently)")) {
      # Looking for error that looks like "Error in parse_aws_s3_response(r, Sig, verbose = verbose) :\n Moved Permanently (HTTP 301).",
      # or "Not Found (HTTP 404).".
      # This seems to be returned when the bucket itself does not exist.
      stop(paste0('EXP-DATASRC-7 :: ', jsonlite::toJSON(bucket), ' :: The specified AWS S3 bucket does not exist.'))
    }
    else {
      stop(e)
    }
  })
  if (nrow(files) == 0) {
    stop(paste0('EXP-DATASRC-4 :: ', jsonlite::toJSON(bucket), ' :: There is no file in the AWS S3 bucket that matches with the specified condition.')) # TODO: escape bucket name.
  }
  getParquetFilesFromS3(files = files$Key, region = region, username = username, password = password, bucket = bucket, forPreview = forPreview, col_select = col_select)

}


#'API that imports a Excel file from AWS S3.
#'@export
getExcelFileFromS3 <- function(fileName, region, username, password, bucket, sheet = 1, col_names = TRUE, col_types = NULL, na = "", skip = 0, trim_ws = TRUE, n_max = Inf, use_readxl = NULL, detectDates = FALSE, skipEmptyRows = FALSE, skipEmptyCols = FALSE, check.names = FALSE, tzone = NULL, convertDataTypeToChar = FALSE, ...) {
  tryCatch({
    filePath <- downloadDataFileFromS3(region = region, bucket = bucket, key = username, secret = password, fileName = fileName, as = "raw")
  }, error = function(e) {
    if (stringr::str_detect(e$message, "(Not Found|Moved Permanently)")) {
      # Looking for error that looks like "Error in parse_aws_s3_response(r, Sig, verbose = verbose) :\n Moved Permanently (HTTP 301).",
      # or "Not Found (HTTP 404).".
      # This seems to be returned when the bucket itself does not exist.
      stop(paste0('EXP-DATASRC-8 :: ', jsonlite::toJSON(c(bucket, fileName)), ' :: There is no such file in the AWS S3 bucket.'))
    }
    else {
      stop(e)
    }
  })
  exploratory::read_excel_file(path = filePath, sheet = sheet, col_names = col_names, col_types = col_types, na = na, skip = skip, trim_ws = trim_ws, n_max = n_max, use_readxl = use_readxl, detectDates = detectDates, skipEmptyRows =  skipEmptyRows, skipEmptyCols = skipEmptyCols, check.names = check.names, tzone = tzone, convertDataTypeToChar = convertDataTypeToChar, ...)
}

#'API that search files by search keyword then imports multiple same structure Excel files and merge it to a single data frame
#'
#'For col_types parameter, by default it forces character to make sure that merging the Excel based data frames doesn't error out due to column data types mismatch.
# Once the data frames merging is done, readr::type_convert is called from Exploratory Desktop to restore the column data types.

#'@export
searchAndGetExcelFilesFromS3 <- function(searchKeyword, region, username, password, bucket, forPreview = FALSE, sheet = 1, col_names = TRUE, col_types = NULL, na = "", skip = 0, trim_ws = TRUE, n_max = Inf, use_readxl = NULL, detectDates = FALSE, skipEmptyRows = FALSE, skipEmptyCols = FALSE, check.names = FALSE, tzone = NULL, convertDataTypeToChar = TRUE, ...){

  # search condition is case insensitive. (ref: https://www.regular-expressions.info/modifiers.html, https://stackoverflow.com/questions/5671719/case-insensitive-search-of-a-list-in-r)
  tryCatch({
    files <- aws.s3::get_bucket_df(region = region, bucket = bucket, key = username, secret = password, max= Inf) %>%
      filter(str_detect(Key, stringr::str_c("(?i)", searchKeyword)))
  }, error = function(e) {
    if (stringr::str_detect(e$message, "(Not Found|Moved Permanently)")) {
      # Looking for error that looks like "Error in parse_aws_s3_response(r, Sig, verbose = verbose) :\n Moved Permanently (HTTP 301).",
      # or "Not Found (HTTP 404).".
      # This seems to be returned when the bucket itself does not exist.
      stop(paste0('EXP-DATASRC-7 :: ', jsonlite::toJSON(bucket), ' :: The specified AWS S3 bucket does not exist.'))
    }
    else {
      stop(e)
    }
  })
  if (nrow(files) == 0) {
    stop(paste0('EXP-DATASRC-4 :: ', jsonlite::toJSON(bucket), ' :: There is no file in the AWS S3 bucket that matches with the specified condition.')) # TODO: escape bucket name.
  }
  exploratory::getExcelFilesFromS3(files = files$Key, region = region, username = username, password = password, bucket = bucket, forPreview = forPreview, sheet = sheet,
                                   col_names = col_names, col_types = col_types, na = na, skip = skip, trim_ws = trim_ws, n_max = n_max,
                                   use_readxl = use_readxl, detectDates = detectDates, skipEmptyRows = skipEmptyRows, skipEmptyCols = skipEmptyCols,
                                   check.names = check.names, tzone = tzone, convertDataTypeToChar = convertDataTypeToChar, ...)
}

#'API that imports multiple Excel files from AWS S3.
#'@export
getExcelFilesFromS3 <- function(files, region, username, password, bucket, forPreview = FALSE, sheet = 1, col_names = TRUE, col_types = NULL, na = "", skip = 0, trim_ws = TRUE, n_max = Inf, use_readxl = NULL, detectDates = FALSE, skipEmptyRows = FALSE, skipEmptyCols = FALSE, check.names = FALSE, tzone = NULL, convertDataTypeToChar = TRUE, ...) {
  # for preview mode, just use the first file.
  if (forPreview & length(files) > 0) {
    files <- files[1]
  }
  # set name to the files so that it can be used for the "id" column created by purrr:map_dfr.
  files <- setNames(as.list(files), files)
  df <- purrr::map_dfr(files, exploratory::getExcelFileFromS3, region = region, username = username, password = password, bucket = bucket, sheet = sheet,
                       col_names = col_names, col_types = col_types, na = na, skip = skip, trim_ws = trim_ws, n_max = n_max, use_readxl = use_readxl,
                       detectDates = detectDates, skipEmptyRows =  skipEmptyRows, skipEmptyCols = skipEmptyCols, check.names = check.names,
                       tzone = tzone, convertDataTypeToChar = convertDataTypeToChar, .id = "exp.file.id") %>% mutate(exp.file.id = basename(exp.file.id))  # extract file name from full path with basename and create file.id column.
  id_col <- avoid_conflict(colnames(df), "id")
  # copy internal exp.file.id to the id column.
  df[[id_col]] <- df[["exp.file.id"]]
  # drop internal column and move the id column to the very beginning.
  df %>% dplyr::select(!!rlang::sym(id_col), dplyr::everything(), -exp.file.id)
}

#'Wrapper for readxl::excel_sheets to support AWS S3 Excel file
#'@export
getExcelSheetsFromS3ExcelFile <- function(fileName, region, username, password, bucket){
  filePath <- downloadDataFileFromS3(region = region, bucket = bucket, key = username, secret = password, fileName = fileName, as = "raw")
  readxl::excel_sheets(filePath)
}
exploratory-io/exploratory_func documentation built on April 23, 2024, 9:15 p.m.