R/p.s3.R

Defines functions import_S3_json

Documented in import_S3_json

#' Import AWS CLI S3 list-object-versions json output to a data table.
#'
#' @param x Character. AWS CLI s3api json output.
#'
#' @export import_S3_json
#'
#' @import data.table
#' @import glue
#' @import testthat
#' @import doParallel
#' @importFrom base64enc base64encode
#' @importFrom digest hmac
#' @importFrom foreach foreach
#' @importFrom jsonlite fromJSON
#' @importFrom lubridate as_datetime now
#' @importFrom magrittr %>% %T>% %$% %<>%
#' @importFrom parallel detectCores mclapply
#' @importFrom Rdpack reprompt
#' @importFrom stringr str_extract str_replace str_replace_all
#' @importFrom utils URLencode head

import_S3_json <- function(x) {
  if (readLines(x, n = 10) %>% length() < 1) {
    return(NULL)
  }
  S3_df <- try(jsonlite::fromJSON(txt = x), silent = TRUE)

  if (!S3_df$DeleteMarkers %>% is.null()) {
    DeleteMarkers <- S3_df$DeleteMarkers %>% data.table::as.data.table(.)
    DeleteMarkers[, DeleteMarker := TRUE]
  } else {
    DeleteMarkers <- NULL
  }


  if (!S3_df$Versions %>% is.null()) {
    Versions <- S3_df$Versions %>% data.table::as.data.table(.)
    Versions[, DeleteMarker := FALSE]
  } else {
    Versions <- NULL
  }

  dt <- data.table::rbindlist(list(DeleteMarkers, Versions), fill = TRUE)

  dt$bucket <- x %>%
    basename() %>%
    stringr::str_replace("aws_s3_objects_", "") %>%
    stringr::str_replace("\\.json", "")

  return(dt)
}




#' Index S3 objects from a list of S3 buckets.
#'
#' Quickly index all S3 objects in a bucket list into a data table. Requires AWS CLI.
#'
#' @param include Character. Bucket regular expression include.
#' @param exclude Character. Bucket regular expression exclude
#'
#' @export index_S3_objects
#'

index_S3_objects <- function(include = NULL, exclude = NULL) {
  t <- Sys.time()

  buckets <- glue::glue("zsh -c 'aws s3 ls'") %>%
    system(intern = TRUE) %>%
    stringr::str_extract("[^ ]+$")

  if (!include %>% is.null()) {
    buckets <- buckets %include% include
  }

  if (!exclude %>% is.null()) {
    buckets <- buckets %exclude% include
  }

  on.exit({
    list.files(
      path = ".",
      pattern = "aws_s3_objects.*(\\.json$|\\.RDS)",
      full.names = TRUE
    ) %>%
      file.remove()
  })

  writeLines(buckets, "aws_s3_objects_buckets.txt")

  writeLines('parallel --delay 0.5 -j 16 "aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api list-object-versions --bucket {} > aws_s3_objects_{}.json" :::: aws_s3_objects_buckets.txt', "aws_s3_objects_command.txt")

  system("chmod 700 aws_s3_objects_command.txt; zsh ./aws_s3_objects_command.txt")

  json_import <- list.files(path = ".", pattern = "aws_s3_objects.*.json$", full.names = TRUE)

  parallel::mclapply(1:length(json_import), function(j) {
    base::print(json_import[j])

    dt <- import_S3_json(json_import[j])

    if (dt %>% is.null()) {
      return(NULL)
    }

    fn <- json_import[j] %>%
      stringr::str_replace("\\.json", ".RDS")

    dt %>% saveRDS(fn, compress = FALSE)

    return(NULL)
  }, mc.cores = length(json_import))

  jsonRds <- list.files(
    path = ".",
    pattern = "aws_s3_objects.*.RDS$",
    full.names = TRUE
  )

  dt <- jsonRds %>%
    lapply(readRDS) %>%
    rbindlist(use.names = TRUE, fill = TRUE) %>%
    unique()

  dt[, LastModified := LastModified %>% gsub("\\.[0-9]+Z", "", .) %>%
    as.POSIXct(format = "%Y-%m-%dT%H:%M:%S") %>%
    lubridate::as_datetime(tz = "EST")]

  dt[, LastIndexed := lubridate::now(tz = "EST")]

  saveRDS(dt, paste0(LOG_DIR, "", "S3_dt.RDS"), compress = FALSE)

  names <- dt %>% names()
  if (
    "IsLatest" %chin% names &
      "DeleteMarker" %chin% names &
      include %>% is.null() # only save log if all buckets were processed
  ) {
    names <- dt[
      IsLatest == TRUE &
        DeleteMarker == FALSE,
      paste0(bucket, "/", Key)
    ] %>%
      unique()

    data.table::data.table(x = names) %>%
      data.table::fwrite(
        paste0(LOG_DIR, "s3_index.txt"),
        col.names = FALSE
      )
  }

  return(dt)
}




#' Print statistics about S3 buckets.
#'
#' @param dt Data table output from index_S3_objects.
#' @param delete_markers Logical. Include delete markers?
#' @param log File path for log output.
#'
#' @export print_S3_statistics
#'

print_S3_statistics <- function(dt, log = "./print_S3_statistics.log", delete_markers = TRUE) {
  sink(log)
  on.exit(sink(NULL))

  dt <- data.table::copy(dt)

  if (!delete_markers) dt <- dt[DeleteMarker == FALSE]

  dt[, Key_ext := Key %>% stringr::str_extract("[^\\.]+$")]

  paste0("Bucket number: ", dt$bucket %>% as.factor() %>% levels() %>% length()) %>% print()
  paste0("Total objects, all versions:", dt %>% nrow()) %>% print()
  paste0("Bucket number: ", dt$bucket %>% as.factor() %>% levels() %>% length()) %>% print()
  paste0("Total deletion markers: ", dt[DeleteMarker == TRUE] %>% nrow()) %>% print()
  paste0("Total objects, current versions excluding deletion markers: ", dt[IsLatest == TRUE & DeleteMarker == FALSE] %>% nrow()) %>% print()

  print("Total objects by bucket, all:")
  dt[, .("Number of objects" = .N), by = bucket][order(-`Number of objects`)] %>% print()

  print("Total size by bucket, all:")
  dt[, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = bucket][order(-`Size (GB)`)] %>% print()

  print("Total size by bucket, current versions excluding deletion markers:")
  dt[IsLatest == TRUE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = bucket][order(-`Size (GB)`)] %>% print()

  print("Largest files:")
  dt[IsLatest == TRUE, .("Size (MB)" = Size / 1E6 %>% ceiling(), Key)][order(-`Size (MB)`)] %>%
    utils::head(20) %>%
    print()

  print("Largest files by extension:")
  dt[IsLatest == TRUE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key_ext][order(-`Size (GB)`)] %>%
    utils::head(20) %>%
    print()

  print("Largest files key dirname:")
  dt[IsLatest == TRUE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key %>% dirname()][order(-`Size (GB)`)] %>%
    utils::head(20) %>%
    print()

  print("Largest previous version files:")
  dt[IsLatest == FALSE, .("Size (MB)" = Size / 1E6 %>% ceiling(), Key)][order(-`Size (MB)`)] %>%
    utils::head(20) %>%
    print()

  print("Largest previous version files by extension:")
  dt[IsLatest == FALSE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key_ext][order(-`Size (GB)`)] %>%
    utils::head(20) %>%
    print()

  print("Largest previous version files key dirname:")
  dt[IsLatest == FALSE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key %>% dirname()][order(-`Size (GB)`)] %>%
    utils::head(20) %>%
    print()

  print("Files by extension:")
  dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key_ext][order(-N)] %>%
    utils::head(20) %>%
    print()

  print("Files key dirname:")
  dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key %>% dirname()][order(-N)] %>%
    utils::head(20) %>%
    print()

  print("Previous files by extension:")
  dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key_ext][order(-N)] %>%
    utils::head(20) %>%
    print()

  print("Previous files key dirname:")
  dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key %>% dirname()][order(-N)] %>%
    utils::head(20) %>%
    print()

  print("Most common keys:")
  dt[, .("Common keys" = .N), by = Key][order(-`Common keys`)] %>%
    utils::head(20) %>%
    print()

  print("Common extensions")
  dt[, .N, by = Key_ext][order(-N)] %>%
    utils::head(20) %>%
    print()

  print("Only delete markers:")
  dt[Key %chin% setdiff(dt[DeleteMarker == TRUE]$Key %>% unique(), dt[DeleteMarker == FALSE]$Key %>% unique())] %>% nrow()

  readLines(log) %>% print()
}




#' Delete S3 object versions.
#'
#' @param dt Data table. Output from index_S3_objects.
#' @param safe Logical. Restrict deletions to old versions?
#' @param dry Logical. Print command instead of running?
#'
#' @export delete_S3_object_version
#'

delete_S3_object_version <- function(dt, safe = TRUE, dry = TRUE) {
  dt %<>% data.table::copy(.)

  # validate input

  if (!(safe %>% class()) == "logical") {
    stop("safe must be TRUE or FALSE.")
  }
  if (!(dry %>% class()) == "logical") {
    stop("safe must be TRUE or FALSE.")
  }

  if (!(dt %>% class() %>% .[1]) == "data.table") {
    stop("dt must be a data.table.")
  }

  if (!"Key" %chin% (dt %>% names())) {
    stop("Key column is missing.")
  }
  if (!"bucket" %chin% (dt %>% names())) {
    stop("bucket column is missing.")
  }
  if (!"VersionId" %chin% (dt %>% names())) {
    stop("VersionId column is missing.")
  }
  if (!"IsLatest" %chin% (dt %>% names())) {
    stop("IsLatest column is missing.")
  }

  if (dt[, (Key %>% is.na()) %>% any()]) {
    stop("Keys are missing.")
  }
  if (dt[, (bucket %>% is.na()) %>% any()]) {
    stop("Buckets are missing.")
  }
  if (dt[, VersionId %>% is.na() %>% any()]) {
    stop("VersionIds are missing.")
  }
  if (dt[, IsLatest %>% is.na() %>% any()]) {
    stop("IsLatest statuses are missing.")
  }


  # do not delete current objects in safe mode
  if (safe) {
    dt <- dt[IsLatest == FALSE]
  }

  print("Files to delete:")
  dt[, .N, by = bucket] %>% print()
  Sys.sleep(2)

  dt %>%
    nrow() %>%
    print()

  # for each bucket
  for (b in dt[, bucket %>% unique()]) {
    base::print(b)

    # get bucket objects
    sub <- dt[bucket == b]

    # construct JSON request
    obj <- list(
      Objects = list(
        list(
          Key = sub$Key[1],
          VersionId = sub$VersionId[1]
        )
      ),
      Quiet = FALSE
    )

    len <- nrow(sub)
    j <- 1

    # add objects to delete
    for (i in 2:len) {
      cat(".")
      j <- j + 1
      k <- sub$Key[i]
      v <- sub$VersionId[i]

      o <- list(
        Key = k,
        VersionId = v
      )

      obj$Objects[[j]] <- o

      # maximum of 1000 objects, therefore save and run after each 1000 or at end
      if (j == 999 | i == len) {
        cat("\n")
        print(paste0("Deleting ", j, " objects"))
        print(paste0("Total deleted:  ", i, " objects"))

        # reset increment counter
        j <- 1

        # save JSON request and run command
        obj %>%
          RJSONIO::toJSON(.) %>%
          write(paste0(b, "p.s3.delete.json"))
        cmd <- c("aws s3api delete-objects --bucket", " ", b, " ", "--delete", " ", "file://", paste0(b, "p.s3.delete.json")) %>%
          paste(collapse = "")

        if (dry == TRUE) {
          print(paste0("Dry run: command to run:"))
          cmd %>% print()
        }
        if (!dry == TRUE) {
          cmd %>% system()
        }
      }
    }
    file.remove(paste0(b, "p.s3.delete.json"))
  }
}




#' Download S3 object versions.
#'
#' @param dt Data table. Output from index_S3_objects.
#' @param version Logical. Prepend verion to filename?
#' @param cores Numeric. Cores to parallelize over.
#'
#' @export get_S3_object_version
#'

get_S3_object_version <- function(dt, version = TRUE, cores = parallel::detectCores() * 4) {
  if (version) {
    commands <- sapply(1:nrow(dt), function(x) {
      paste0("aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api get-object --bucket ", dt$bucket[x] %>% shQuote(), " --key ", dt$Key[x] %>% shQuote(), " --version-id ", dt$VersionId[x] %>% shQuote(), " ", dt$LastModified[x] %>% stringr::str_replace_all("[^0-9]", ""), "_", dt$VersionId[x] %>% stringr::str_replace_all("[^0-9]", ""), "_", dt$Key[x] %>% basename() %>% stringr::str_replace_all("[^0-9A-Za-z_\\.]", ""), ";")
    })
    commands %>%
      data.table::as.data.table(.) %>%
      data.table::fwrite(file = "aws_get_commands.txt", col.names = FALSE, sep = "\t", quote = FALSE)
  }

  if (!version) {
    commands <- sapply(1:nrow(dt), function(x) {
      paste0("aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api get-object --bucket ", dt$bucket[x] %>% shQuote(), " --key ", dt$Key[x] %>% shQuote(), " --version-id ", dt$VersionId[x] %>% shQuote(), " ", dt$Key[x] %>% basename() %>% stringr::str_replace_all("[^0-9A-Za-z_\\.]", ""), ";")
    })
    commands %>%
      data.table::as.data.table(.) %>%
      data.table::fwrite(file = "aws_get_commands.txt", col.names = FALSE, sep = "\t", quote = FALSE)
  }

  system(paste0("/usr/local/bin/parallel -m -j ", cores, " :::: aws_get_commands.txt"))
  file.remove("aws_get_commands.txt")
}




#' Restore S3 object versions from AWS Glacier.
#'
#' @param dt Data table. Output from index_S3_objects.
#' @param cores Numeric. Cores to parallelize over.
#' @param days Numeric. Days to restore objects for.
#'
#' @export restore_S3_object_version
#'

restore_S3_object_version <- function(dt, cores = parallel::detectCores() * 4, days = 7) {
  commands <- sapply(1:nrow(dt), function(x) {
    paste0(
      "aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api restore-object --bucket ",
      dt$bucket[x] %>% shQuote(),
      " --key ",
      dt$Key[x] %>% shQuote(),
      " --version-id ",
      dt$VersionId[x] %>% shQuote(),
      " --restore-request Days=",
      days, ";"
    )
  })

  commands %>%
    data.table::as.data.table(.) %>%
    data.table::fwrite(file = "aws_restore_commands.txt", col.names = FALSE, sep = "\t", quote = FALSE)

  system(paste0("/usr/local/bin/parallel -m -j ", cores, " :::: aws_restore_commands.txt"))

  file.remove("aws_restore_commands.txt")
}
andrewrech/p.s3 documentation built on Sept. 28, 2020, 12:33 p.m.