#' @noRd
#' @title Internal Class definition for \code{LazyArray}
#' @author Zhengjia Wang
#' @description Internal class definition of lazy array objects
ClassLazyArray <- R6::R6Class(
classname = "LazyArray",
portable = TRUE,
private = list(
.valid = TRUE,
.dir = character(0),
.path = character(0),
.dim = integer(0),
.dimnames = NULL,
.meta_name = 'lazyarray.meta',
lazyarray_version = 0,
file_format = 'fst',
storage_format = character(0),
postfix = '.fst',
read_only = TRUE,
compress_level = 50,
auto_clean = FALSE,
part_dimension = integer(0), # dimension in each file. dim[-length(dim)] or c(dim[-length(dim)], 1)
sample_data = function(){
switch(
private$storage_format,
"double" = 0.1,
"integer" = 0L,
"complex" = 0i,
"character" = "",
{
stop("Invalid internal storage format")
}
)
},
save_meta = function(){
meta <- list(
lazyarray_version = private$lazyarray_version,
file_format = private$file_format,
storage_format = private$storage_format,
dim = private$.dim,
dimnames = private$.dimnames,
part_dimension = private$part_dimension,
postfix = private$postfix,
compress_level = private$compress_level
)
save_yaml(meta, private$.path)
}
),
public = list(
#' @description Override print method
#' @param ... ignored
#' @return self instance
print = function(...){
cat("<LazyArray> (", private$storage_format, ')\n', sep = '')
cat('Dimension:\t', paste(sprintf('%d ', private$.dim), collapse = 'x '), '\n')
cat('File format:\t', sprintf('[part]%s', private$postfix), '\n')
invisible(self)
},
#' @description Constructor
#' @param path directory to store data into
#' @param read_only whether modification is allowed
#' @param meta_name meta file to store the data into
initialize = function(path, read_only = TRUE, meta_name = 'lazyarray.meta'){
private$.meta_name <- meta_name
private$read_only <- read_only
stopifnot(dir.exists(path))
private$.dir <- normalizePath(path, mustWork = TRUE)
private$.path <- normalizePath(file.path(path, self$meta_name), mustWork = TRUE)
# load yaml
meta <- load_yaml(private$.path)
# check format version
if(is.null(meta$lazyarray_version)){
if(!isTRUE(meta$lazyarray_version >= self$min_version)){
stop("Minimal version ", self$min_version, " required, but the file version is ", private$.meta$lazyarray_version)
}
private$lazyarray_version <- meta$lazyarray_version
} else {
private$lazyarray_version <- self$min_version
}
# check format: fst? hdf5?
if(length(meta$file_format) == 1){
private$file_format <- meta$file_format
}
# check data format
if(length(meta$storage_format) != 1 ||
! meta$storage_format %in% c('double', 'integer', 'character', 'complex') ){
stop(sprintf("Cannot find valid storage format. Supported data formats are %s, %s, %s, and %s",
sQuote('double'), sQuote('integer'), sQuote('character'), sQuote('complex')))
}
private$storage_format <- meta$storage_format
# get dimension information
if(length(meta$dim) < 2){
stop("Invalid dimension information.")
}
private$.dim <- meta$dim
# get dimension names
if(!is.null(meta$dimnames) && length(meta$dimnames) != length(meta$dim)){
stop("Dimension name does not match.")
}
private$.dimnames <- meta$dimnames
# check whether the file is partitioned
private$part_dimension <- meta$part_dimension
n_part <- meta$dim[[length(meta$dim)]]
if(!length(meta$postfix) || !is.character(meta$postfix)){
stop("Cannot find file postfix")
}
private$postfix <- meta$postfix
if(!isTRUE(is.numeric(meta$compress_level))){
meta$compress_level <- 50
}
private$compress_level <- meta$compress_level
},
#' @description Set auto clean flag
#' @param auto logical whether the data on hard disk will be automatically cleaned
flag_auto_clean = function(auto){
private$auto_clean <- auto
},
#' @description Override finalize method
finalize = function(){
if(private$auto_clean){
self$remove_data(warn = FALSE)
}
},
#' @description Remove data on hard disk
#' @param force whether to force remove the data
#' @param warn whether to show warning if not fully cleaned
remove_data = function(force = FALSE, warn = TRUE){
if(!private$.valid){ return(FALSE) }
if(dir.exists(private$.dir)){
if(force || file.exists(private$.path)){
# list all files within .dir
fs <-
c(private$.meta_name,
self$get_partition_fpath(full_path = FALSE),
self$get_partition_fpath(full_path = FALSE, summary_file = TRUE))
all_fs <- list.files(private$.dir, all.files = TRUE,
recursive = FALSE, full.names = FALSE,
include.dirs = TRUE)
rest <- all_fs[!all_fs %in% c(fs, '.', '..')]
sel_metas <- grepl('\\.meta$', rest)
if(!length(rest) || all(sel_metas)){
# cannot remove all files because some other files exist,
# not created by me
unlink(private$.dir, recursive = TRUE)
} else {
lapply(fs, function(f){
f <- file.path(private$.dir, f)
if(file.exists(f)){
unlink(f)
}
})
}
private$.valid <- FALSE
}
} else {
private$.valid <- FALSE
}
if(warn && dir.exists(private$.dir)){
warning("LazyArray not fully cleaned at: ", private$.dir,
'. Some files noted created by this array were detected. ',
'Please manually remove them if they are no longer used.')
}
return(!private$.valid)
},
#' @description Make instance writable
make_writable = function(){
private$read_only <- FALSE
},
#' @description Make instance read-only
make_readonly = function(){
private$read_only <- TRUE
},
#' @description Set \code{\link[base]{dim}} and \code{\link[base]{dimnames}} of the array
#' @param dim integer vector of the array dimension; see \code{\link[base]{dim}}
#' @param dimnames named list of dimension names; see \code{\link[base]{dimnames}}
set_dim = function(dim, dimnames){
stopifnot(!private$read_only && private$.valid)
if(length(dim) != length(private$.dim)){
stop('Cannot set dimension from ', deparse1(private$.dim), ' to ',
deparse1(private$.dim), '. Different # of dimensions.')
}
dif <- private$.dim - dim
if(prod(dim) == 0){
stop("zero length array is not supported.")
}
# dim is allowed to vary along the last dimension, like c(1,2,4,3) -> c(1,2,4,5)
if(!all(dif[-length(dif)] == 0)){
stop('For multi-part arrays, you can only increase/decrease the last dimension, like from c(12,3,1) to c(12,3,X)')
}
# check dimension vs dim
mis_dimnames <- missing(dimnames)
if(mis_dimnames){
dimnames <- private$.dimnames
}
if(!mis_dimnames && length(dimnames)){
if(!is.list(dimnames)){
stop('dimnames must be a list')
}
dnl <- sapply(dimnames, length)
if(length(dimnames) != length(dim) || !all(dnl - dim == 0)){
stop("dim does not matches with dimnames")
}
}
private$.dim <- dim
for(ii in seq_along(dimnames)){
dimnames[[ii]] <- dimnames[[ii]][seq_len(dim[[ii]])]
}
private$.dimnames <- dimnames
private$save_meta()
},
#' @description Partition format, currently only \code{'fst'} is supported
get_file_format = function(){
private$file_format
},
#' @description Data storage format, expected to be one of the
#' followings: 'double', 'integer', 'character', or 'complex'
get_storage_format = function(){
private$storage_format
},
#' @description Whether partitioned based on the last dimension
is_multi_part = function(){
TRUE
},
#' @description Returns dimension of each partition
partition_dim = function(){
private$part_dimension
},
#' @description Internally used to calculate min, max, range, and sum
#' squares, can only be used when array is numeric
#' @param partitions integer, partition numbers
#' then \code{map_function} must be specified
#' @param map_function mapping function, receive numeric vectors and returns
#' a numeric vector
#' @param split whether to further partition the data
#' @param ... passed as internal options
`@partition_map` = function(
partitions, map_function = NULL, split = TRUE, ...
){
if(!self$is_multi_part()){
stop("Cannot call @partition_reduce if array is not multi-part")
}
sformat <- self$get_storage_format()
is_complex = sformat == 'complex'
stopifnot(is.function(map_function))
more_args <- list(...)
pd <- self$partition_dim()
# self = arr; p = 1
paths <- self$get_partition_fpath(partitions)
res <- lapply2(seq_along(partitions), function(ii){
part <- partitions[[ii]]
path <- paths[[ii]]
map_f2 <- function(x){
map_function(x, part)
}
if(!file.exists(path)){
re <- map_f2(array(NA, dim = pd))
if(!is.null(re)){
attr(re, 'chunk_length') <- prod(pd)
}
return(list(re))
}
meta <- fstMeta(path)
if(inherits(meta, 'fst_error')){
stop(meta)
}
# decide column names
if(is_complex){
ncols = floor(meta$nrOfCols / 2)
colnms <- sapply(c('R', 'I'), function(s){
sprintf('V%d%s', seq_len(ncols), s)
})
colnms <- matrix(colnms, ncol = 2)
} else {
ncols = meta$nrOfCols
colnms <- as.matrix(sprintf('V%d', seq_len(ncols)))
}
if( split ){
more_args[['dim']] <- c(meta$nrOfRows, ncols)
more_args[['recursive']] <- !isFALSE(more_args[['recursive']])
chunks <- do.call(make_chunks, more_args)
res <- lapply(seq_len(chunks$nchunks), function(i){
idx <- chunks$get_indices(i, as_numeric = TRUE)
idx[[1]] <- idx[[1]]
lapply(seq.int(idx[[2]][[1]], idx[[2]][[2]]), function(col){
lazyMapReduceByPartition(path, colnms[col, ], idx[[1]][[1]], idx[[1]][[2]], map_f2)
})
})
res <- unlist(res, recursive = FALSE)
} else {
res <- lapply(seq_len(ncols), function(ii){
lazyMapReduceByPartition(path, colnms[ii,], 1L, NULL, map_f2, reshape = pd)
})
}
res
})
res <- unlist(res, recursive = FALSE)
res
},
`@chunk_map` = function(
map_function, max_nchunks = 50, ...
){
if(!self$is_multi_part()){
stop("Cannot call @chunk_map if array is not multi-part")
}
if(!is.function(map_function)){
stop("map_function must be a function")
}
if(length(formals(map_function)) < 2){
map_f <- function(data, chunk, idx){
map_function(data)
}
} else if(length(formals(map_function)) < 2){
map_f <- function(data, chunk, idx){
map_function(data, chunk)
}
} else {
map_f <- map_function
}
nrows <- prod(self$partition_dim())
ncols <- self$npart
# get chunk size
chunkf <- make_chunks(nrows, max_nchunks = max_nchunks, ...)
files <- self$get_partition_fpath()
partition_locations <- list(
numeric(0),
seq_len(ncols)
)
sdata <- self$`@sample_data`()
lapply2(seq_len(chunkf$nchunks), function(ii){
idx_range <- chunkf$get_indices(ii, as_numeric = TRUE)[[1]]
chunk_data <- lazyLoadOld(files = files, partition_dim = c(nrows, 1),
partition_locations = list(seq.int(idx_range[[1]], idx_range[[2]]), 1L),
ndim = 2L, value_type = sdata)
map_f(chunk_data, ii, idx_range)
})
},
#' @description Get partition path
#' @param part integer representing the partition
#' @param full_path whether return the full system path
#' @param summary_file whether to return summary file
#' @return Character file name or full path
get_partition_fpath = function(part, full_path = TRUE, summary_file = FALSE){
if(missing(part)){
part <- seq_len(self$npart)
} else {
part <- as.integer(part)
if(base::anyNA(part) || any(part <= 0)){
stop("partition number must be all positive")
}
}
res <- sprintf('%s%s', part, private$postfix)
if(full_path){
res <- file.path(private$.dir, res)
}
if(summary_file){
res <- sprintf('%s.summary', res)
}
res
},
#' @description Internal method to set data
#' @param value vector of data to be set
#' @param ... index set
`@set_data` = function(value, ...){
stopifnot(!private$read_only && private$.valid)
idx <- list(...)
ndim <- self$ndim
stopifnot(length(idx) == ndim)
idx <- lapply(idx, function(ii){
# ii = ii[!is.na(ii)]
as.integer(ii)
})
val_dim <- sapply(idx, length)
if(any(val_dim <= 0)){
# no change
return(invisible(self))
}
if(length(value) == 1){
value <- array(value, dim = val_dim)
} else{
stopifnot(length(value) == prod(val_dim))
}
x <- NULL
if(is.null(dim(value)) || length(dim(value)) != length(val_dim) || !all(dim(value) == val_dim) ){
dim(value) <- val_dim
}
part_dimension <- private$part_dimension
partition_locations <- lapply(part_dimension, seq_len)
part_idx <- idx
part_idx[[ndim]] <- 1
args <- c(list(x = quote(x)), part_idx, list(value = quote(value)))
# We save for each file
ii <- 1
last_idx <- idx[[ndim]]
partial_len <- prod(val_dim[-ndim])
lapply(seq_len(val_dim[[ndim]]), function(ii){
value <- value[partial_len * (ii-1) + seq_len(partial_len)]
# })
# apply(value, ndim, function(value){
# get partition file
part <- last_idx[[ii]]
if(part > self$dim[ndim]){
return(FALSE)
}
fname <- self$get_partition_fpath(part, full_path = TRUE)
if(prod(vapply(part_idx, length, 1L)) == prod(self$partition_dim())){
x <- array(value, dim = self$partition_dim())
} else {
x <- lazyLoadOld(fname, partition_locations, part_dimension, self$ndim, private$sample_data())
# x <- do.call('[<-', args)
# make a call instead of do.call
call <- as.call(c(list(quote(`[<-`)), args))
x <- eval(call)
}
cpp_create_lazyarray(x, part_dimension, fname, compression = private$compress_level, uniformEncoding = TRUE)
self$`@generate_parition_summary`(part, x = x)
ii <<- ii+1
return(TRUE)
})
invisible(self)
},
#' @description Internal method to generate partition summary statistics
#' @param part integer, partition number
#' @param x partition data
`@generate_parition_summary` = function(part, x){
if(!self$is_multi_part()){ return() }
if(self$npart < part){
stop("Wrong partition number: ", part)
}
storage_format <- private$storage_format
# update summary statistics
summary_file <- self$get_partition_fpath(part, summary_file = TRUE)
smry <- list()
suppressWarnings({
smry$nas <- sum(is.na(x))
smry$length <- length(x)
if( storage_format %in% c('double', 'integer') ){
smry$max <- max(x, na.rm = TRUE)
smry$min <- min(x, na.rm = TRUE)
smry$mean <- mean(x, na.rm = TRUE)
smry$sd <- sd(x, na.rm = TRUE)
smry$range <- c(smry$min, smry$max)
smry$var <- smry$sd^2
valid_length <- smry$length - smry$nas
smry$meansq <- smry$var * (1 - 1 / valid_length) + smry$mean^2
} else if( storage_format == 'complex' ){
smry$mean <- mean(x, na.rm = TRUE)
}
})
smry$storage_format <- storage_format
saveRDS(smry, summary_file, ascii = TRUE)
},
#' @description Get partition summaryr statistics
#' @param part integer, partition number
#' @param types type of statistics to get, options are \code{'min'},
#' \code{'max'}, \code{'range'}, \code{'mean'}, \code{'sd'}, \code{'var'},
#' \code{'nas'}, \code{'length'}; can be more than one options
#' @param show_warn whether to show warnings if array type is inconsistent
#' with types of statistics
partition_summary = function(
part,
types = c('min', 'max', 'range', 'mean', 'sd', 'var', 'nas', 'length'),
show_warn = TRUE
){
if(!self$is_multi_part()){
stop("Cannot call partition_summary if array is not multi-part")
}
if(part > self$npart){
stop("partition overflow")
}
storage_format <- self$get_storage_format()
if(!storage_format %in% c('double', 'integer') &&
types %in% c('min', 'max', 'range', 'sd', 'var')){
if( show_warn ){
warning('Summary [min,max, range, sd, var] cannot be obtained from storage_format: ', storage_format)
}
types <- types[!types %in% c('min', 'max', 'range', 'sd', 'var')]
}
if(!length(types)){
return(NULL)
}
fname <- self$get_partition_fpath(part, summary_file = TRUE)
fst_name <- self$get_partition_fpath(part, summary_file = FALSE)
if(file.exists(fname)){
tryCatch({
readRDS(fname)
}, error = function(e){
unlink(fname)
})
}
if(!file.exists(fname) && file.exists(fst_name)){
# generate summary file
self$`@partition_map`(
partitions = part, split = FALSE,
map_function = function(x, part){
self$`@generate_parition_summary`(part, x)
})
}
if(file.exists(fname)){
smry <- readRDS(fname)
} else {
# Not exist! partition data is missing
plen <- prod(self$partition_dim())
smry <- list(
nas = plen,
length = plen,
max = -Inf,
min = Inf,
mean = NA_real_,
sd = NA_real_,
range = c(-Inf, Inf),
var = NA_real_,
meansq = NA_real_,
storage_format = storage_format
)
}
smry[types]
},
#' @description Set compression level
#' @param level from 0 to 100. 0 means no compression, 100 means max compression
set_compress_level = function(level){
stopifnot(level >= 0 & level <= 100)
private$compress_level <- level
private$save_meta()
},
#' @description Get compression level
get_compress_level = function(){
private$compress_level
},
#' @description Internal method to read data
#' @param ... index set
#' @param drop whether to drop dimension after subset, default is true
`@get_data` = function(..., drop = TRUE){
stopifnot(private$.valid)
idx <- list(...)
if(!length(idx)){
# case 1 x[], get all the data
idx <- lapply(self$dim, seq_len)
} else if(length(idx) == 1){
# case 1 x[a:b], get data by length, we have to calculate indices
stop("subset with length is not supported right now")
} else {
stopifnot(length(idx) == self$ndim)
idx <- lapply(idx, function(id){
id <- as.integer(id)
id[is.na(id)] <- -1L
id
})
}
# check which files need to be loaded
files <- self$get_partition_fpath(idx[[self$ndim]], full_path = TRUE)
idx[[self$ndim]] <- 1
re <- lazyLoadOld(files, partition_locations = idx,
partition_dim = self$partition_dim(),
ndim = self$ndim, value_type = private$sample_data())
if(drop){
re <- drop(re)
}
re
},
#' @description Internal method to obtain a sample data to be used to determine storage mode
`@sample_data` = function(){
private$sample_data()
}
),
active = list(
#' @field meta_name file name to store meta information
meta_name = function(){
private$.meta_name
},
#' @field min_version minimal version supported, for backward compatibility concerns
min_version = function(){
0
},
#' @field version current version of lazy data instance
version = function(){
private$lazyarray_version
},
#' @field dim dimension of the data
dim = function(){
private$.dim
},
#' @field dimnames dimension names of the data
dimnames = function(){
private$.dimnames
},
#' @field ndim length of dimensions
ndim = function(){ length(private$.dim) },
#' @field can_write is array read-only or writable
can_write = function(){
!private$read_only
},
#' @field storage_path directory where the data is stored at
storage_path = function(){
private$.path
},
#' @field npart number of partitions
npart = function(){
private$.dim[[length(private$.dim)]]
},
#' @field filesize total disk space used in gigatypes
filesize = function(){
files <-
self$get_partition_fpath(
part = seq_len(self$npart),
full_path = TRUE,
summary_file = FALSE
)
files <- files[file.exists(files)]
if(length(files)){
sum(file.info(files, extra_cols = FALSE)$size) / 1024^3
} else {
0
}
},
#' @field expected_filesize expected file size in gigatypes if all partition
#' files exist
expected_filesize = function(){
fsize <- self$filesize
if(fsize > 0){
files <-
self$get_partition_fpath(
part = seq_len(self$npart),
full_path = TRUE,
summary_file = FALSE
)
return(fsize / mean(file.exists(files)))
}
unit_size <- 32
if(private$storage_format == 'integer'){
unit_size <- 4
} else if(private$storage_format == 'double'){
unit_size <- 8
} else if(private$storage_format == 'complex'){
unit_size <- 16
}
prod(self$dim) * unit_size / 1024^3
},
#' @field storage_formats_avail storage format supported
storage_formats_avail = function(){
c('double', 'integer', 'character', 'complex')
},
#' @field is_valid whether the array is valid
is_valid = function(){
private$.valid
}
)
)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.