Nothing
#' Initialize h5 file on-disk
#'
#' Initialize the on-disk portion on an ondisc_matrix.
#'
#' @param h5_fp file path to the .h5 file to be initialized
#' @param mtx_metadata metadata of the .mtx file
#' @param features_metadata metadata of the features.tsv file
#' @param barcodes_fp file path to the barcodes.tsv file
#' @param features_fp file path to the features.tsv file
#'
#' @return NULL
#' @noRd
initialize_h5_file_on_disk <- function(h5_fp, mtx_metadata, features_metadata, barcodes_fp, features_fp, progress) {
# Create the .h5 file
rhdf5::h5createFile(h5_fp) %>% invisible()
# Write metadata
cell_barcodes <- dplyr::pull(readr::read_tsv(file = barcodes_fp, col_names = FALSE, col_types = "c", progress = progress))
rhdf5::h5write(cell_barcodes, h5_fp, "cell_barcodes")
feature_ids <- read_given_column_of_tsv(col_idx = 1, n_cols = features_metadata$n_cols, tsv_file = features_fp, progress = progress)
rhdf5::h5write(feature_ids, h5_fp, "feature_ids")
if (features_metadata$feature_names) {
feature_names <- read_given_column_of_tsv(col_idx = 2, n_cols = features_metadata$n_cols, tsv_file = features_fp, progress = progress)
rhdf5::h5write(feature_names, h5_fp, "feature_names")
}
rhdf5::h5write(c(mtx_metadata$n_features, mtx_metadata$n_cells), h5_fp, "dimension")
rhdf5::h5write(mtx_metadata$is_logical, h5_fp, "logical_mat")
# Initialize CSC
rhdf5::h5createDataset(file = h5_fp, dataset = "cell_ptr", dims = mtx_metadata$n_cells + 1, storage.mode = "integer", level = 0L, chunk = 10) %>% invisible()
rhdf5::h5createDataset(file = h5_fp, dataset = "feature_idxs", dims = mtx_metadata$n_data_points, storage.mode = "integer", level = 0L, chunk = 50) %>% invisible()
if (!mtx_metadata$is_logical) {
rhdf5::h5createDataset(file = h5_fp, dataset = "data_csc", dims = mtx_metadata$n_data_points, storage.mode = "integer", level = 0L, chunk = 50) %>% invisible()
}
# Initialize CSR
rhdf5::h5createDataset(file = h5_fp, dataset = "feature_ptr", dims = mtx_metadata$n_features + 1, storage.mode = "integer", level = 0L, chunk = 10) %>% invisible()
rhdf5::h5createDataset(file = h5_fp, dataset = "cell_idxs", dims = mtx_metadata$n_data_points, storage.mode = "integer", level = 0L, chunk = 50) %>% invisible()
if (!mtx_metadata$is_logical) {
rhdf5::h5createDataset(file = h5_fp, dataset = "data_csr", dims = mtx_metadata$n_data_points, storage.mode = "integer", level = 0L, chunk = 50) %>% invisible()
}
return(invisible())
}
#' Initialize accumulator
#'
#' @param terminal_symbol the terminal symbol
#' @param bag_of_variables the bag of variables
#'
#' @return An initialized vector of the correct type and length
#' @noRd
initialize_accumulator <- function(terminal_symbol, bag_of_variables) {
info <- get_terminal_acc_and_args(terminal_symbol)
info$acc_constructor(bag_of_variables[[info$acc_length]])
}
#' Get accumulator funct and arg list
#'
#' @param terminal_symbol a terminal symbol
#'
#' @return a list containing (i) the name of and (ii) the arguments to pass to the accumulator function.
#' @noRd
get_accumulator_funct_arg_list <- function(terminal_symbol) {
info <- get_terminal_acc_and_args(terminal_symbol)
list(acc_funct = info$acc_funct, acc_args = info$acc_args)
}
################################################################
# Core algorithm functions start
################################################################
#' Run mtx algo step 1
#'
#' Runs the first step of the .mtx algo.
#'
#' @param mtx_fp file path to .mtx file
#' @param initialize_accumulator initialize accumulator function
#' @param bag_of_variables the bag of variables
#' @param is_logical (boolean) is matrix logical
#' @param n_lines_per_chunk number of lines to process per chunk
#' @param n_rows_to_skip number of rows to skip in .mtx file
#'
#' @return list containing (i) n_features, and (ii) a list containing n_features an n_features vector for each chunk.
#' @noRd
run_mtx_algo_step_1 <- function(mtx_fp, initialize_accumulator, bag_of_variables, is_logical, n_lines_per_chunk, n_rows_to_skip, progress) {
symbols <- symbols_enum()
initializer <- function() initialize_accumulator(terminal_symbol = symbols$n_nonzero_feature, bag_of_variables = bag_of_variables)
# function to be called by read_delim_chunked
closure <- function(x, pos, acc) {
# x <- readr::read_delim(file = mtx_fp, delim = " ", skip = n_rows_to_skip, col_names = arguments$feature_idxs, n_max = 100000, col_types = readr::cols(readr::col_integer(), readr::col_skip(), readr::col_skip()))
decrement_idxs(x$feature_idxs)
n_nonzero_features_chunk <- initializer()
inc_n_entries(n_nonzero_features_chunk, x$feature_idxs)
sum_in_place(acc[[1]], n_nonzero_features_chunk)
acc[[2]][[length(acc[[2]]) + 1]] <- n_nonzero_features_chunk
return(acc)
}
# acc: list of two elements: (i) a vector to store the n_nonzero_features accumulator, and (ii) a list to store the number of nonzero features in each chunk.
acc_init <- list(initializer(), list())
arguments <- arguments_enum()
ret <- readr::read_delim_chunked(file = mtx_fp,
chunk_size = n_lines_per_chunk,
skip = n_rows_to_skip,
callback = readr::AccumulateCallback$new(closure, acc = acc_init),
delim = " ",
col_names = arguments$feature_idxs,
progress = progress,
col_types = if (is_logical) "i_" else "i__")
return(ret)
}
#' Run subtask 2a
#'
#' Runs subtask a of part 2 of mtx algorithm: updates the accumulator of each terminal symbol.
#'
#' @param x a data.table (passed by ref)
#' @param bag_of_variables the bag_of_variables (also passed by ref)
#' @param acc the accumulator list
#' @param terminal_functs_args arguments to pass to the accumulator for a given terminal
#'
#' @return NULL
#' @noRd
run_subtask_2a <- function(x, bag_of_variables, acc, terminal_functs_args) {
arguments <- arguments_enum()
bag_of_variables[[arguments$cell_idxs]] <- x$cell_idxs
bag_of_variables[[arguments$feature_idxs]] <- x$feature_idxs
bag_of_variables[[arguments$umi_counts]] <- x$umi_counts
for (i in 1:length(acc)) {
# Add current acc_vect to bag of variables
bag_of_variables[[arguments$acc_vect]] <- acc[[i]]
# Obtain relevant arguments via mget
curr_args <- mget(x = c(arguments$acc_vect, terminal_functs_args[[i]]$acc_args),
envir = bag_of_variables) %>% setNames(NULL)
# Call relevant function
do.call(what = terminal_functs_args[[i]]$acc_funct, args = curr_args)
}
return(invisible())
}
#' Run subtask 2b
#'
#' Runs subtask 2b of algorithm: Writes chunk of gene_idxs and (if integer matrix) data to CSC matrix on-disk.
#'
#' NOTE: parts of this function should be rewritten in C++; in particular, we may need to handle pos manually, as r integers have limited size.
#'
#' @param x a data.table
#' @param pos the starting row in the data.table; uses 1-based indexing
#' @param h5_fp file path to the on-disk h5 file.
#' @param is_logical is the mtx logical
#'
#' @return NULL
#' @noRd
run_subtask_2b <- function(x, pos, h5_fp, is_logical) {
# Write feature idxs
write_data_h5(h5_fp, "feature_idxs", x$feature_idxs, pos - 1L)
if (!is_logical) {
# If integer matrix, write data too
write_data_h5(h5_fp, "data_csc", x$umi_counts, pos - 1L)
}
return(invisible())
}
#' Run subtask 2c
#'
#' @param x a data.table
#' @param h5_fp file path to on-disk h5 file
#' @param is_logical (boolean) is the matrix logical?
#' @param row_ptr the (accumulated) row pointer
#' @param n_nonzero_features_per_chunk a list of the number of nonzero features in each chunk
#' @param chunk_no the current chunk number
#' @param n_features total number of features in matrix
#' @noRd
run_subtask_2c <- function(x, h5_fp, is_logical, row_ptr, n_nonzero_features_per_chunk, chunk_no, n_features) {
arguments <- arguments_enum()
data.table::setorderv(x, arguments$feature_idxs)
n_nonzero_features_chunk <- n_nonzero_features_per_chunk[[chunk_no]]
in_memory_row_ptr <- c(0L, cumsum(n_nonzero_features_chunk))
if (!is_logical) {
map_memory_to_disk(file_name_in = h5_fp,
m_cell_idxs = x$cell_idxs,
cell_idxs_name = "cell_idxs",
m_umi_counts = x$umi_counts,
umi_counts_name = "data_csr",
n_features = n_features,
m_row_ptr = in_memory_row_ptr,
f_row_ptr = row_ptr)
} else {
map_memory_to_disk_logical_matrix(file_name_in = h5_fp,
m_cell_idxs = x$cell_idxs,
cell_idxs_name = "cell_idxs",
n_features = n_features,
m_row_ptr = in_memory_row_ptr,
f_row_ptr = row_ptr)
}
sum_in_place(row_ptr, n_nonzero_features_chunk)
}
#' Run mtx algo step 2
#'
#' This function runs step 2 of the core mtx algorithm. It (a) computes the terminal symbols, (b) writes to the CSC matrix, and (b) sorts the data by feature_idx, then writes to the CSR matrix.
#'
#' @param h5_fp full path to the h5 file on-disk
#' @param mtx_fp file path to mtx
#' @param is_logical is the mtx file logical
#' @param bag_of_variables the bag of variables containing the variables to pass to the accumulator functions
#' @param n_lines_per_chunk number of elements to load per chunk
#' @param n_rows_to_skip n rows to skip in mtx file
#' @param initial_accumulators list of starting accumulators
#' @param terminal_functs_args list of accumulator function names and arguments
#' @param row_ptr the starting row pointer
#' @param n_nonzero_features_per_chunk initial vector for n_nonzero_features_per_chunk
#'
#' @return a list containing the values of the terminals
#' @noRd
run_mtx_algo_step_2 <- function(h5_fp, mtx_fp, is_logical, bag_of_variables, n_lines_per_chunk, n_rows_to_skip, initial_accumulators, terminal_functs_args, row_ptr, n_nonzero_features_per_chunk, progress) {
chunk_no <- 1L
# Define closure to be called by readr::read_delim_chunked
closure <- function(x, pos, acc) {
# example chunk: x <- read.table(file = mtx_fp, header = FALSE, sep = " ", col.names = c("feature_idxs", "cell_idxs", if (is_logical) NULL else "umi_counts"), skip = n_rows_to_skip, colClasses = rep("integer", if (is_logical) 2 else 3), nrows = n_lines_per_chunk); pos <- 1
data.table::setDT(x)
decrement_idxs(x$feature_idxs)
decrement_idxs(x$cell_idxs)
# run subtask a
run_subtask_2a(x, bag_of_variables, acc[[1]], terminal_functs_args)
# run subtask b
if (progress) cat("\nWriting CSC data.\n")
run_subtask_2b(x, pos, h5_fp, is_logical)
# run subtask c
if (progress) cat("Writing CSR data.\n")
run_subtask_2c(x, h5_fp, is_logical, acc[[2]], n_nonzero_features_per_chunk, chunk_no, bag_of_variables$n_features)
# increment chunk_no in enclosing environment
chunk_no <<- chunk_no + 1L
return(acc)
}
arguments <- arguments_enum()
# first element: terminal accumulator list; second element: row_ptr.
acc_init <- list(initial_accumulators, row_ptr)
terminals <- readr::read_delim_chunked(file = mtx_fp,
chunk_size = n_lines_per_chunk,
skip = n_rows_to_skip,
callback = readr::AccumulateCallback$new(closure, acc = acc_init),
delim = " ",
col_names = c(arguments$feature_idxs, arguments$cell_idxs, if (is_logical) NULL else arguments$umi_counts),
progress = progress,
col_types = if (is_logical) "ii" else "iii")
return(terminals)
}
#' Run core mtx algo
#'
#' Runs to core algorithm for mtx files. There are two steps:
#' (i) compute the row pointer
#' (ii) write CSC matrix, write CSR matrix, compute covariate matrices
#'
#' @param h5_fp file path to h5 file
#' @param mtx_fp file path to .mtx file
#' @param is_logical (logical) is expression matrix logical?
#' @param covariates a list of two elements: the feature covariates and the cell covariates
#' @param bag_of_variables environment containing named variables
#' @param n_lines_per_chunk number of elements to load per chunk
#' @param n_rows_to_skip number of rows to skip in .mtx file
#' @noRd
run_core_mtx_algo <- function(h5_fp, mtx_fp, is_logical, covariates, bag_of_variables, n_lines_per_chunk, n_rows_to_skip, progress) {
grammar <- initialize_grammar()
symbols <- symbols_enum()
# Run step 1 of core algorithm
row_ptrs <- run_mtx_algo_step_1(mtx_fp, initialize_accumulator, bag_of_variables,
is_logical, n_lines_per_chunk, n_rows_to_skip, progress)
# Compute row pointer and write to disk.
row_ptr <- c(0L, cumsum(row_ptrs[[1]]))
write_data_h5(file_name_in = h5_fp, dataset_name_in = "feature_ptr", buffer = row_ptr, start_pos = 0)
# Prepare step 2 of core algo; determine which terminal symbols to compute
terminal_symbols <- lapply(unlist(covariates),
get_terminals_for_covariate, grammar = grammar) %>% unlist() %>% unique()
# Exclude from this vector n_nonzero_features, which we already computed in step 1.
terminal_symbols <- terminal_symbols[!(terminal_symbols == symbols$n_nonzero_feature)]
# Obtain the starting accumulator, as well as the accumulator function name and args, for each terminal
initial_accumulators <- lapply(terminal_symbols, initialize_accumulator, bag_of_variables = bag_of_variables)
terminal_functs_args <- lapply(terminal_symbols, get_accumulator_funct_arg_list)
# Run step 2 of core algorithm
terminal_values_and_row_ptr <- run_mtx_algo_step_2(h5_fp, mtx_fp, is_logical, bag_of_variables,
n_lines_per_chunk, n_rows_to_skip, initial_accumulators,
terminal_functs_args, row_ptr, row_ptrs[[2]], progress)
# Compute and write column pointer to CSC matrix
terminal_values <- terminal_values_and_row_ptr[[1]]
n_nonzero_cell <- terminal_values[[which(terminal_symbols == symbols$n_nonzero_cell)]]
cell_ptr <- c(0, cumsum(n_nonzero_cell))
rhdf5::h5write(cell_ptr, h5_fp, "cell_ptr")
# compute the covariate matrices
for (i in 1:length(terminal_symbols)) grammar[[terminal_symbols[i]]]$value <- terminal_values[[i]]
grammar[[symbols$n_nonzero_feature]]$value <- row_ptrs[[1]]
cov_mats <- lapply(covariates, function(covariate_vect) {
out <- lapply(covariate_vect, evaluate_grammar, grammar) %>% list2DF
colnames(out) <- gsub('_(feature|cell)', "", covariate_vect)
return(out)
})
# return covariate matrices
return(cov_mats)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.