
Defines functions update_args dt_split write_batch athena_read_lines.athena_vroom athena_read_lines.athena_data.table athena_read_lines athena_read.athena_vroom athena_read.athena_data.table athena_read format_athena_types

format_athena_types <- function(athena_types){
  data_type <- tolower(vapply(athena_types, function(x) x$Type, FUN.VALUE = character(1)))
  names(data_type) <- vapply(athena_types, function(x) x$Name, FUN.VALUE = character(1))

# ==========================================================================
# read in method
athena_read <- function(method, File, athena_types, con, ...) {

athena_read.athena_data.table <- function(method, File, athena_types, con, ...){
  data_type <- format_athena_types(athena_types)
  Type2 <- Type <- AthenaToRDataType(method, data_type)
  # Type2 is to handle issue with data.table fread 
  Type2[Type2 %in% "POSIXct"] <- "character"
  fill <- any(c("array", "row", "map", "json") %in% data_type)
  # currently parameter data.table is left as default. If users require data.frame to be returned then parameter will be updated
  # https://github.com/Rdatatable/data.table/blob/master/NEWS.md#datatable-v1130--24-jul-2020
  if(packageVersion("data.table") < package_version("1.13.0")){
    output <- data.table::fread(
      col.names = names(Type2),
      colClasses = unname(Type2),
      sep = ",",
      showProgress = F,
      fill = fill)
    # formatting POSIXct: from string to POSIXct
    for (col in names(Type[Type %in% "POSIXct"])) set(output, j=col, value=as.POSIXct(output[[col]], tz = con@info$timezone))
  } else {
    output <- data.table::fread(
      col.names = names(Type),
      colClasses = unname(Type),
      tz = con@info$timezone,
      sep = ",",
      showProgress = F,
      fill = fill)
  # AWS Athena returns " values as "". Due to this "" will be reformatted back to "
  for (col in names(Type[Type %in% "character"])) set(output, j=col, value=gsub('""' , '"', output[[col]]))
  # convert raw
  if(!identical(athena_option_env$binary, "character"))
    raw_parser(output, data_type)
  # convert json
  if(!identical(athena_option_env$json, "character"))
    json_parser(output, data_type)

athena_read.athena_vroom <- function(method, File, athena_types, con, ...){
  vroom <- pkg_method("vroom", "vroom")
  locale <- pkg_method("locale", "vroom")
  data_type <- format_athena_types(athena_types)
  Type <- AthenaToRDataType(method, data_type)
  output <- vroom(
    delim = ",",
    col_types = Type,
    locale = locale(tz = con@info$timezone),
    progress = FALSE,
    trim_ws = FALSE,
    altrep = TRUE
  # timestamp with time zone returns an unusual timestamp format: 
  # "2021-07-30 14:07:08.008 UTC"
  # To overcome this return as character and convert afterwards
  for (col in names(Type[data_type %in% "timestamp with time zone"])){
    output[[col]] <- as.POSIXct(output[[col]], tz = con@info$timezone)
  # convert raw
  if(!identical(athena_option_env$binary, "character"))
    raw_parser(output, data_type)
  # convert json
  if(!identical(athena_option_env$json, "character"))
    json_parser(output, data_type)

# Read in .txt files line by line and return them as a data.frame
athena_read_lines <- function(method, File, athena_types, con, ...) {

# Keep data.table formatting
athena_read_lines.athena_data.table <- function(method, File, athena_types, con, ...){
  data_type <- format_athena_types(athena_types)
  Type2 <- Type <- AthenaToRDataType(method, data_type)
  # Type2 is to handle issue with data.table fread 
  Type2[Type2 %in% "POSIXct"] <- "character"
  # currently parameter data.table is left as default. If users require data.frame to be returned then parameter will be updated
  # https://github.com/Rdatatable/data.table/blob/master/NEWS.md#datatable-v1130--24-jul-2020
  if(packageVersion("data.table") < package_version("1.13.0")){
    output <- data.table::fread(
      col.names = names(Type2),
      colClasses = unname(Type2),
      sep = "\n",
      showProgress = F,
      header = F,
      strip.white= F)
    # formatting POSIXct: from string to POSIXct
    for (col in names(Type[Type %in% "POSIXct"])) set(output, j=col, value=as.POSIXct(output[[col]], tz = con@info$timezone))
  } else {
    output <- data.table::fread(
      col.names = names(Type),
      colClasses = unname(Type),
      tz = con@info$timezone,
      sep = "\n",
      showProgress = F,
      header = F,
      strip.white= F)
  # AWS Athena returns " values as "". Due to this "" will be reformatted back to "
  for (col in names(Type[Type %in% "character"])) set(output, j=col, value=gsub('""' , '"', output[[col]]))
  # convert raw
  if(!identical(athena_option_env$binary, "character"))
    raw_parser(output, data_type)
  # convert json
  if(!identical(athena_option_env$json, "character"))
    json_parser(output, data_type)

athena_read_lines.athena_vroom <- function(method, File, athena_types, con, ...){
  vroom <- pkg_method("vroom", "vroom")
  locale <- pkg_method("locale", "vroom")
  data_type <- format_athena_types(athena_types)
  Type <- AthenaToRDataType(method, data_type)

  output <- vroom(
    col_names = names(Type),
    col_types = unname(Type),
    locale = locale(tz = con@info$timezone),
    progress = FALSE,
    altrep = TRUE,
    trim_ws = FALSE,
    delim = "\n"
  # timestamp with time zone returns an unusual timestamp format: 
  # "2021-07-30 14:07:08.008 UTC"
  # To overcome this return as character and convert afterwards
  for (col in names(Type[data_type %in% "timestamp with time zone"])){
    output[[col]] <- as.POSIXct(output[[col]], tz = con@info$timezone)
  # convert raw
  if(!identical(athena_option_env$binary, "character"))
    raw_parser(output, data_type)
  # convert json
  if(!identical(athena_option_env$json, "character"))
    json_parser(output, data_type)

# ==========================================================================
# write method
write_batch <- function(value, split_vec, fun, max.batch, max_row, path, file.type, compress, ...){
  # create temp file
  File <- paste(uuid::UUIDgenerate(), Compress(file.type, compress), sep = ".")
  path <- file.path(path, File)
  file_con <- if(file.type != "json") path else file(path)
  # split data.frame into chunks
  chunk <- value[split_vec:min(max_row,(split_vec+max.batch-1)),]
  # write 
  fun(chunk, file_con, ...)

dt_split <- function(value, max.batch, file.type, compress){
  if (nrow(value) == 0) {
    return(list(SplitVec = NULL, MaxBatch = NULL, MaxRow = NULL))
  if((file.type %in% c("parquet", "json") & is.infinite(max.batch)) || 
     (file.type %in% c("csv", "tsv") & !compress & is.infinite(max.batch))) 
    max.batch <- nrow(value)
  # set up split vec
  max_row <- nrow(value)
  split_20 <- .05 * max_row # default currently set to 20 split: https://github.com/DyfanJones/RAthena/issues/36
  min.batch = 1000000 # min.batch sized at 1M
  # if batch is set to default
    max.batch <- max(split_20, min.batch)
    split_vec <- seq(1, max_row, max.batch)
  # if max.batch is set by user
  if(!is.infinite(max.batch)) split_vec <- seq(1, max_row, as.integer(max.batch))
  return(list(SplitVec = split_vec, MaxBatch = max.batch, MaxRow = max_row))

update_args <- function(file.type = "tsv", init_args = list(), compress = FALSE){
  if(file.type ==  "parquet"){
    write_parquet <- pkg_method("write_parquet", "arrow")
    cp <- if(compress) "snappy" else NULL
    init_args <- c(init_args,
                   fun = write_parquet,
                   use_deprecated_int96_timestamps = TRUE, # align POSIXct to athena timestamp: https://docs.aws.amazon.com/athena/latest/ug/data-types.html
                   list(compression = cp))
  } else if(file.type == "json") {
    stream_out <- pkg_method("stream_out", "jsonlite")
    init_args <- c(init_args,
                   fun = stream_out,
                   verbose = FALSE)
  } else if(inherits(athena_option_env$file_parser, "athena_data.table")) {
    init_args <- c(init_args,
                   fun = data.table::fwrite,
                   quote = FALSE,
                   showProgress = FALSE)
    if(file.type == "csv")
      init_args <- c(init_args,
                     sep = ",")
    if(file.type == "tsv")
      init_args <- c(init_args,
                     sep = "\t")
  } else if(inherits(athena_option_env$file_parser, "athena_vroom")) {
    vroom_write <- pkg_method("vroom_write", "vroom")
    init_args <- c(init_args,
                   fun = vroom_write,
                   quote = "none",
                   progress = FALSE,
                   escape = "none")
    if(file.type == "csv")
      init_args <- c(init_args,
                     delim = ",")
    if(file.type == "tsv")
      init_args <- c(init_args,
                     delim = "\t")

# To enable vroom to compress files in parrallel then pigz is required: https://zlib.net/pigz/. 
# pipe(sprintf("pigz > %s", path))
DyfanJones/noctua documentation built on April 15, 2024, 3:22 p.m.