R/amazon_common.R

Defines functions .resolve_type .write_recordio read_records_io write_spmatrix_to_sparse_tensor write_matrix_to_dense_tensor .write_shape .write_keys_tensor .write_label_tensor .write_feature_tensor

Documented in read_records_io write_matrix_to_dense_tensor write_spmatrix_to_sparse_tensor

# NOTE: This code has been modified from AWS Sagemaker Python:
# https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/amazon/common.py

# Note: this file has been split into components sagemaker.mlcore and this package

#' @include amazon_record_pb2.R

#' @importFrom methods is as

.write_feature_tensor <- function(resolved_type, record, vector){
  if (resolved_type == "Int32")
    record$features[[1]]$value$int32_tensor$values <- vector
  if (resolved_type == "Float64")
    record$features[[1]]$value$float64_tensor$values <- vector
  if (resolved_type == "Float32")
    record$features[[1]]$value$float32_tensor$values <- vector
}

.write_label_tensor <- function(resolved_type, record, scalar){
  if (resolved_type == "Int32")
    record$label[[1]]$value$int32_tensor$values <- c(scalar)
  if (resolved_type == "Float64")
    record$label[[1]]$value$float64_tensor$values <- c(scalar)
  if (resolved_type == "Float32")
    record$label[[1]]$value$float32_tensor$values <- c(scalar)
}

.write_keys_tensor <- function(resolved_type, record, vector){
  if (resolved_type == "Int32")
    record$features[[1]]$value$int32_tensor$keys <- vector
  if (resolved_type == "Float64")
    record$features[[1]]$value$float64_tensor$keys <- vector
  if (resolved_type == "Float32")
    record.features[[1]]$value$float32_tensor$keys <- vector
}

.write_shape <- function(resolved_type, record, scalar){
  if(resolved_type == "Int32")
    record$features[[1]]$value$int32_tensor$shape <- c(scalar)
  if (resolved_type == "Float64")
    record$features[[1]]$values$float64_tensor$shape <- c(scalar)
  if (resolved_type == "Float32")
    record$features[[1]]$value$float32_tensor$shape <- c(scalar)
}

#' @title write_matrix_to_dense_tensor
#' @description Write matrix to dense tensor file.
#' @param file (str): file location
#' @param array (array):
#' @param labels (str):
#' @export
write_matrix_to_dense_tensor <- function(file, array, labels = NULL){
  # Validate shape of array and labels, resolve array and label types
  if (!length(dim(array)) ==2)
    stop("Array must be a Matrix", call. = F)

  if(!is.null(labels)){
    if (!length(dim(labels)) == 1)
      stop("Labels must be a Vector", call. = F)
    if (!(dim(labels)[1] %in% dim(array)))
        stop(sprintf("Label shape (%s) not compatible with array shape (%s)",
                     paste(dim(labels), collapse = ", "),
                     paste(dim(array), collapse = ", ")),
             call. = F)
    # As matrix/sparse consist of all elements being the same clase can check first element
    resolved_label_type = .resolve_type(labels[1])
  }
  resolved_type = .resolve_type(array[1])
  record = Record_pb()
  # Write each vector in array into a Record in the file object
  for(index in 1:nrow(array)){
    vector = array[index,]
    record$clear()
    record$update(features = .RECORD_FEATURESENTRY(), label = .RECORD_LABELENTRY())
    .write_feature_tensor(resolved_type, record, vector)
    if (!is.null(labels))
      .write_label_tensor(resolved_label_type, record, labels[index])

    .write_recordio(file, record$serialize(NULL))
  }
}

#' @title write_spmatrix_to_sparse_tensor
#' @description Writes a Matrix sparse matrix to a sparse tensor
#' @param file (str): file location
#' @param array (array):
#' @param labels (str):
#' @export
write_spmatrix_to_sparse_tensor <- function(file, array, labels=NULL){
  initProtoBuf()
  requireNamespace("Matrix")
  if (!is(array, "sparseMatrix"))
    stop("Array must be sparse", call. = F)

  # Validate shape of array and labels, resolve array and label types
  if (!length(dim(array)) == 2)
    stop("Array must be a Matrix", call.=F)
  if (!is.null(labels)){
    if (!is.vector(labels))
      stop("Labels must be a Vector", call. = F)
    if (!(length(labels) %in% dim(array)))
      stop(sprintf("Label shape (%s) not compatible with array shape (%s)",
                   paste(dim(labels), collapse = ", "),
                   paste(dim(array), collapse = ", ")),
           call. = F)
    resolved_label_type = .resolve_type(labels[1])
  }
  resolved_type = .resolve_type(labels[1])

  # convert sparse Matrix to Sparse Row matrix
  csr_array = as(array, "RsparseMatrix")
  dim_array = dim(csr_array)

  for (row_idx in seq_len(dim_array[1])){
    record = Record_pb()
    row = csr_array[row_idx,, drop = F] # keep row in RsparseMatrix format

    # Write values
    .write_feature_tensor(resolved_type, record, row@x)

    # Write keys
    .write_keys_tensor(resolved_type, record, row@j)

    # Write labels
    if (!is.null(labels))
      .write_label_tensor(resolved_label_type, record, labels[row_idx])

    # Write shape
    .write_shape(resolved_type, record, dim_array[2])

    .write_recordio(file, record$serialize(NULL))
  }
}

#' @title read_records_io
#' @description Eagerly read a collection of amazon Record protobuf objects from raw object
#' @param obj (raw): raw object
#' @export
read_records_io = function(obj){

  # create raw connection
  f = rawConnection(obj,  "rb")
  on.exit(close(f))

  records = list()
  i = 1

  while(TRUE){
    read_kmagic = readBin(f, "numeric", n = 1)
    (check = read_kmagic == .kmagic)

    # break loop
    if(!check || length(check) == 0)
      return(records)

    len_record = readBin(f, "int", n = 1)

    data = readBin(f, "raw", n = len_record)

    pad = bitwShiftL(bitwShiftR((len_record + 3), 2), 2) - len_record

    record = RProtoBuf::read(get("aialgs.data.Record"), data)
    records[[i]] = record
    i = i +1

    if(pad > 0)
      readBin(f, "raw", pad)
  }
}

# MXNet requires recordio records have length in bytes that's a multiple of 4
# This sets up padding bytes to append to the end of the record, for diferent
# amounts of padding required.
padding = list()
for (amount in 0:3){
  padding[[amount+1]] = writeBin(rep(0x00, amount), raw())
}

.kmagic = 0xCED7230A

.write_recordio = function(f, data){
  len = length(data)
  writeBin(.kmagic, f)
  writeBin(len, f)
  pad = 1 + bitwShiftL(bitwShiftR((len + 3), 2), 2) - len # added +1 to map to R indexing
  writeBin(data, f)
  writeBin(padding[[pad]], f)
}

.resolve_type=function(dtype){
  switch(typeof(dtype),
         integer =   "Int32",
         integer64 = "Int32",
         numeric =   "Float64",
         double = "Float64",
         stop(sprintf("Unsupported class %s on array",dtype), call. = FALSE)
  )
}
DyfanJones/sagemaker-r-local documentation built on June 14, 2022, 10:32 p.m.