#' Import AWS CLI S3 list-object-versions json output to a data table.
#'
#' @param x Character. AWS CLI s3api json output.
#'
#' @export import_S3_json
#'
#' @import data.table
#' @import glue
#' @import testthat
#' @import doParallel
#' @importFrom base64enc base64encode
#' @importFrom digest hmac
#' @importFrom foreach foreach
#' @importFrom jsonlite fromJSON
#' @importFrom lubridate as_datetime now
#' @importFrom magrittr %>% %T>% %$% %<>%
#' @importFrom parallel detectCores mclapply
#' @importFrom Rdpack reprompt
#' @importFrom stringr str_extract str_replace str_replace_all
#' @importFrom utils URLencode head
import_S3_json <- function(x) {
if (readLines(x, n = 10) %>% length() < 1) {
return(NULL)
}
S3_df <- try(jsonlite::fromJSON(txt = x), silent = TRUE)
if (!S3_df$DeleteMarkers %>% is.null()) {
DeleteMarkers <- S3_df$DeleteMarkers %>% data.table::as.data.table(.)
DeleteMarkers[, DeleteMarker := TRUE]
} else {
DeleteMarkers <- NULL
}
if (!S3_df$Versions %>% is.null()) {
Versions <- S3_df$Versions %>% data.table::as.data.table(.)
Versions[, DeleteMarker := FALSE]
} else {
Versions <- NULL
}
dt <- data.table::rbindlist(list(DeleteMarkers, Versions), fill = TRUE)
dt$bucket <- x %>%
basename() %>%
stringr::str_replace("aws_s3_objects_", "") %>%
stringr::str_replace("\\.json", "")
return(dt)
}
#' Index S3 objects from a list of S3 buckets.
#'
#' Quickly index all S3 objects in a bucket list into a data table. Requires AWS CLI.
#'
#' @param include Character. Bucket regular expression include.
#' @param exclude Character. Bucket regular expression exclude
#'
#' @export index_S3_objects
#'
index_S3_objects <- function(include = NULL, exclude = NULL) {
t <- Sys.time()
buckets <- glue::glue("zsh -c 'aws s3 ls'") %>%
system(intern = TRUE) %>%
stringr::str_extract("[^ ]+$")
if (!include %>% is.null()) {
buckets <- buckets %include% include
}
if (!exclude %>% is.null()) {
buckets <- buckets %exclude% include
}
on.exit({
list.files(
path = ".",
pattern = "aws_s3_objects.*(\\.json$|\\.RDS)",
full.names = TRUE
) %>%
file.remove()
})
writeLines(buckets, "aws_s3_objects_buckets.txt")
writeLines('parallel --delay 0.5 -j 16 "aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api list-object-versions --bucket {} > aws_s3_objects_{}.json" :::: aws_s3_objects_buckets.txt', "aws_s3_objects_command.txt")
system("chmod 700 aws_s3_objects_command.txt; zsh ./aws_s3_objects_command.txt")
json_import <- list.files(path = ".", pattern = "aws_s3_objects.*.json$", full.names = TRUE)
parallel::mclapply(1:length(json_import), function(j) {
base::print(json_import[j])
dt <- import_S3_json(json_import[j])
if (dt %>% is.null()) {
return(NULL)
}
fn <- json_import[j] %>%
stringr::str_replace("\\.json", ".RDS")
dt %>% saveRDS(fn, compress = FALSE)
return(NULL)
}, mc.cores = length(json_import))
jsonRds <- list.files(
path = ".",
pattern = "aws_s3_objects.*.RDS$",
full.names = TRUE
)
dt <- jsonRds %>%
lapply(readRDS) %>%
rbindlist(use.names = TRUE, fill = TRUE) %>%
unique()
dt[, LastModified := LastModified %>% gsub("\\.[0-9]+Z", "", .) %>%
as.POSIXct(format = "%Y-%m-%dT%H:%M:%S") %>%
lubridate::as_datetime(tz = "EST")]
dt[, LastIndexed := lubridate::now(tz = "EST")]
saveRDS(dt, paste0(LOG_DIR, "", "S3_dt.RDS"), compress = FALSE)
names <- dt %>% names()
if (
"IsLatest" %chin% names &
"DeleteMarker" %chin% names &
include %>% is.null() # only save log if all buckets were processed
) {
names <- dt[
IsLatest == TRUE &
DeleteMarker == FALSE,
paste0(bucket, "/", Key)
] %>%
unique()
data.table::data.table(x = names) %>%
data.table::fwrite(
paste0(LOG_DIR, "s3_index.txt"),
col.names = FALSE
)
}
return(dt)
}
#' Print statistics about S3 buckets.
#'
#' @param dt Data table output from index_S3_objects.
#' @param delete_markers Logical. Include delete markers?
#' @param log File path for log output.
#'
#' @export print_S3_statistics
#'
print_S3_statistics <- function(dt, log = "./print_S3_statistics.log", delete_markers = TRUE) {
sink(log)
on.exit(sink(NULL))
dt <- data.table::copy(dt)
if (!delete_markers) dt <- dt[DeleteMarker == FALSE]
dt[, Key_ext := Key %>% stringr::str_extract("[^\\.]+$")]
paste0("Bucket number: ", dt$bucket %>% as.factor() %>% levels() %>% length()) %>% print()
paste0("Total objects, all versions:", dt %>% nrow()) %>% print()
paste0("Bucket number: ", dt$bucket %>% as.factor() %>% levels() %>% length()) %>% print()
paste0("Total deletion markers: ", dt[DeleteMarker == TRUE] %>% nrow()) %>% print()
paste0("Total objects, current versions excluding deletion markers: ", dt[IsLatest == TRUE & DeleteMarker == FALSE] %>% nrow()) %>% print()
print("Total objects by bucket, all:")
dt[, .("Number of objects" = .N), by = bucket][order(-`Number of objects`)] %>% print()
print("Total size by bucket, all:")
dt[, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = bucket][order(-`Size (GB)`)] %>% print()
print("Total size by bucket, current versions excluding deletion markers:")
dt[IsLatest == TRUE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = bucket][order(-`Size (GB)`)] %>% print()
print("Largest files:")
dt[IsLatest == TRUE, .("Size (MB)" = Size / 1E6 %>% ceiling(), Key)][order(-`Size (MB)`)] %>%
utils::head(20) %>%
print()
print("Largest files by extension:")
dt[IsLatest == TRUE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key_ext][order(-`Size (GB)`)] %>%
utils::head(20) %>%
print()
print("Largest files key dirname:")
dt[IsLatest == TRUE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key %>% dirname()][order(-`Size (GB)`)] %>%
utils::head(20) %>%
print()
print("Largest previous version files:")
dt[IsLatest == FALSE, .("Size (MB)" = Size / 1E6 %>% ceiling(), Key)][order(-`Size (MB)`)] %>%
utils::head(20) %>%
print()
print("Largest previous version files by extension:")
dt[IsLatest == FALSE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key_ext][order(-`Size (GB)`)] %>%
utils::head(20) %>%
print()
print("Largest previous version files key dirname:")
dt[IsLatest == FALSE & DeleteMarker == FALSE, .("Size (GB)" = (Size %>% sum(na.rm = TRUE) / 1E9) %>% ceiling()), by = Key %>% dirname()][order(-`Size (GB)`)] %>%
utils::head(20) %>%
print()
print("Files by extension:")
dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key_ext][order(-N)] %>%
utils::head(20) %>%
print()
print("Files key dirname:")
dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key %>% dirname()][order(-N)] %>%
utils::head(20) %>%
print()
print("Previous files by extension:")
dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key_ext][order(-N)] %>%
utils::head(20) %>%
print()
print("Previous files key dirname:")
dt[IsLatest == FALSE & DeleteMarker == FALSE, .N, by = Key %>% dirname()][order(-N)] %>%
utils::head(20) %>%
print()
print("Most common keys:")
dt[, .("Common keys" = .N), by = Key][order(-`Common keys`)] %>%
utils::head(20) %>%
print()
print("Common extensions")
dt[, .N, by = Key_ext][order(-N)] %>%
utils::head(20) %>%
print()
print("Only delete markers:")
dt[Key %chin% setdiff(dt[DeleteMarker == TRUE]$Key %>% unique(), dt[DeleteMarker == FALSE]$Key %>% unique())] %>% nrow()
readLines(log) %>% print()
}
#' Delete S3 object versions.
#'
#' @param dt Data table. Output from index_S3_objects.
#' @param safe Logical. Restrict deletions to old versions?
#' @param dry Logical. Print command instead of running?
#'
#' @export delete_S3_object_version
#'
delete_S3_object_version <- function(dt, safe = TRUE, dry = TRUE) {
dt %<>% data.table::copy(.)
# validate input
if (!(safe %>% class()) == "logical") {
stop("safe must be TRUE or FALSE.")
}
if (!(dry %>% class()) == "logical") {
stop("safe must be TRUE or FALSE.")
}
if (!(dt %>% class() %>% .[1]) == "data.table") {
stop("dt must be a data.table.")
}
if (!"Key" %chin% (dt %>% names())) {
stop("Key column is missing.")
}
if (!"bucket" %chin% (dt %>% names())) {
stop("bucket column is missing.")
}
if (!"VersionId" %chin% (dt %>% names())) {
stop("VersionId column is missing.")
}
if (!"IsLatest" %chin% (dt %>% names())) {
stop("IsLatest column is missing.")
}
if (dt[, (Key %>% is.na()) %>% any()]) {
stop("Keys are missing.")
}
if (dt[, (bucket %>% is.na()) %>% any()]) {
stop("Buckets are missing.")
}
if (dt[, VersionId %>% is.na() %>% any()]) {
stop("VersionIds are missing.")
}
if (dt[, IsLatest %>% is.na() %>% any()]) {
stop("IsLatest statuses are missing.")
}
# do not delete current objects in safe mode
if (safe) {
dt <- dt[IsLatest == FALSE]
}
print("Files to delete:")
dt[, .N, by = bucket] %>% print()
Sys.sleep(2)
dt %>%
nrow() %>%
print()
# for each bucket
for (b in dt[, bucket %>% unique()]) {
base::print(b)
# get bucket objects
sub <- dt[bucket == b]
# construct JSON request
obj <- list(
Objects = list(
list(
Key = sub$Key[1],
VersionId = sub$VersionId[1]
)
),
Quiet = FALSE
)
len <- nrow(sub)
j <- 1
# add objects to delete
for (i in 2:len) {
cat(".")
j <- j + 1
k <- sub$Key[i]
v <- sub$VersionId[i]
o <- list(
Key = k,
VersionId = v
)
obj$Objects[[j]] <- o
# maximum of 1000 objects, therefore save and run after each 1000 or at end
if (j == 999 | i == len) {
cat("\n")
print(paste0("Deleting ", j, " objects"))
print(paste0("Total deleted: ", i, " objects"))
# reset increment counter
j <- 1
# save JSON request and run command
obj %>%
RJSONIO::toJSON(.) %>%
write(paste0(b, "p.s3.delete.json"))
cmd <- c("aws s3api delete-objects --bucket", " ", b, " ", "--delete", " ", "file://", paste0(b, "p.s3.delete.json")) %>%
paste(collapse = "")
if (dry == TRUE) {
print(paste0("Dry run: command to run:"))
cmd %>% print()
}
if (!dry == TRUE) {
cmd %>% system()
}
}
}
file.remove(paste0(b, "p.s3.delete.json"))
}
}
#' Download S3 object versions.
#'
#' @param dt Data table. Output from index_S3_objects.
#' @param version Logical. Prepend verion to filename?
#' @param cores Numeric. Cores to parallelize over.
#'
#' @export get_S3_object_version
#'
get_S3_object_version <- function(dt, version = TRUE, cores = parallel::detectCores() * 4) {
if (version) {
commands <- sapply(1:nrow(dt), function(x) {
paste0("aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api get-object --bucket ", dt$bucket[x] %>% shQuote(), " --key ", dt$Key[x] %>% shQuote(), " --version-id ", dt$VersionId[x] %>% shQuote(), " ", dt$LastModified[x] %>% stringr::str_replace_all("[^0-9]", ""), "_", dt$VersionId[x] %>% stringr::str_replace_all("[^0-9]", ""), "_", dt$Key[x] %>% basename() %>% stringr::str_replace_all("[^0-9A-Za-z_\\.]", ""), ";")
})
commands %>%
data.table::as.data.table(.) %>%
data.table::fwrite(file = "aws_get_commands.txt", col.names = FALSE, sep = "\t", quote = FALSE)
}
if (!version) {
commands <- sapply(1:nrow(dt), function(x) {
paste0("aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api get-object --bucket ", dt$bucket[x] %>% shQuote(), " --key ", dt$Key[x] %>% shQuote(), " --version-id ", dt$VersionId[x] %>% shQuote(), " ", dt$Key[x] %>% basename() %>% stringr::str_replace_all("[^0-9A-Za-z_\\.]", ""), ";")
})
commands %>%
data.table::as.data.table(.) %>%
data.table::fwrite(file = "aws_get_commands.txt", col.names = FALSE, sep = "\t", quote = FALSE)
}
system(paste0("/usr/local/bin/parallel -m -j ", cores, " :::: aws_get_commands.txt"))
file.remove("aws_get_commands.txt")
}
#' Restore S3 object versions from AWS Glacier.
#'
#' @param dt Data table. Output from index_S3_objects.
#' @param cores Numeric. Cores to parallelize over.
#' @param days Numeric. Days to restore objects for.
#'
#' @export restore_S3_object_version
#'
restore_S3_object_version <- function(dt, cores = parallel::detectCores() * 4, days = 7) {
commands <- sapply(1:nrow(dt), function(x) {
paste0(
"aws --cli-read-timeout 0 --cli-connect-timeout 0 s3api restore-object --bucket ",
dt$bucket[x] %>% shQuote(),
" --key ",
dt$Key[x] %>% shQuote(),
" --version-id ",
dt$VersionId[x] %>% shQuote(),
" --restore-request Days=",
days, ";"
)
})
commands %>%
data.table::as.data.table(.) %>%
data.table::fwrite(file = "aws_restore_commands.txt", col.names = FALSE, sep = "\t", quote = FALSE)
system(paste0("/usr/local/bin/parallel -m -j ", cores, " :::: aws_restore_commands.txt"))
file.remove("aws_restore_commands.txt")
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.