#' @rdname MsBackendSql
#'
#' @export MsBackendSql
MsBackendSql <- function() {
new("MsBackendSql")
}
#' @importFrom DBI dbListTables
#'
#' @noRd
.valid_dbcon <- function(x) {
if (length(x)) {
if (!inherits(x, "DBIConnection"))
return("'dbcon' is expected to be a connection to a database")
tables <- dbListTables(x)
if (!all(c("msms_spectrum", "msms_spectrum_peak") %in% tables |
c("msms_spectrum", "msms_spectrum_peak_blob") %in% tables |
c("msms_spectrum", "msms_spectrum_peak_blob2") %in% tables))
return("Database lacks some required tables.")
}
NULL
}
.dbcon <- function(x) {
x@dbcon
}
#' Returns the spectra data, from the database and eventually filling with
#' *core* spectra variables, if they are not available in the database.
#'
#' The data can be either:
#' - in the database.
#' - in the local data (if new variables were added with $name <-).
#' - core spectra variables - if they are not in the database they have to be
#' initialized with `NA` and the correct data type.
#'
#' @return a `data.frame` - always, even if only with a single column.
#'
#' @importFrom IRanges NumericList CharacterList
#'
#' @importFrom S4Vectors extractCOLS
#'
#' @importFrom S4Vectors make_zero_col_DFrame
#'
#' @importFrom methods as callNextMethod getMethod
#'
#' @author Johannes Rainer
#'
#' @noRd
.spectra_data_sql <- function(x, columns = spectraVariables(x)) {
res <- getMethod("spectraData", "MsBackendCached")(x, columns = columns)
if (is.null(res))
res <- make_zero_col_DFrame(length(x))
## Define what needs to be still retrieved.
db_cols <- intersect(columns, x@spectraVariables)
db_cols <- db_cols[!db_cols %in% c("mz", "intensity", colnames(res))]
mz_cols <- intersect(columns, c("mz", "intensity"))
if (length(db_cols))
res <- cbind(
res, as(.fetch_spectra_data_sql(x, columns = db_cols), "DataFrame"))
## Get m/z and intensity values
if (length(mz_cols)) {
if (any(mz_cols == "mz")) {
if (nrow(res))
res$mz <- NumericList(x@peak_fun(x, "mz", TRUE),
compress = FALSE)
else res$mz <- NumericList(compress = FALSE)
}
if (any(mz_cols == "intensity")) {
if (nrow(res))
res$intensity <- NumericList(x@peak_fun(x, "intensity", TRUE),
compress = FALSE)
else res$intensity <- NumericList(compress = FALSE)
}
}
if (any(columns == "centroided") && !is.logical(res$centroided))
res$centroided <- as.logical(res$centroided)
if (any(columns == "smoothed") && !is.logical(res$smoothed))
res$smoothed <- as.logical(res$smoothed)
extractCOLS(res, columns)
}
#' @param x backend
#'
#' @param columns `character` with the column(s) to retrieve.
#'
#' @param drop `logical(1)` whether dimensions should be dropped if a single
#' column is requested.
#'
#' @noRd
.fetch_peaks_data_long <- function(x, columns = c("mz", "intensity"),
drop = FALSE) {
emat <- matrix(NA_real_, ncol = length(columns), nrow = 0,
dimnames = list(character(), columns))
if (length(x@dbcon)) {
p <- dbGetQuery(
x@dbcon,
stri_c("select spectrum_id_,", stri_c(columns, collapse = ","),
" from msms_spectrum_peak where spectrum_id_ in (",
stri_c(unique(x@spectraIds), collapse = ","), ")"))
sid <- as.factor(p$spectrum_id)
if (drop && length(columns == 1L)) {
p <- split(p[, columns], sid)[as.character(x@spectraIds)]
el <- which(lengths(p) == 0)
if (length(el))
p[el] <- replicate(length(el), numeric())
unname(p)
} else {
p <- split.data.frame(
as.matrix(p[, columns, drop = drop]), sid)[as.character(
x@spectraIds)]
el <- which(lengths(p) == 0)
if (length(el))
p[el] <- replicate(length(el), emat)
unname(p)
}
} else list()
}
#' Data is stored as blob, so we always have a spectrum entry, even if it
#' has no peaks.
#'
#' @importFrom fastmatch fmatch
#'
#' @noRd
.fetch_peaks_data_blob <- function(x, columns = c("mz", "intensity"),
drop = FALSE) {
if (length(x@dbcon)) {
p <- dbGetQuery(
x@dbcon,
stri_c("select spectrum_id_,", stri_c(columns, collapse = ","),
" from msms_spectrum_peak_blob where spectrum_id_ in (",
stri_c(unique(x@spectraIds), collapse = ","), ")"))
sid <- p$spectrum_id_
attributes(sid) <- NULL
if (!length(sid)) return(list())
if (drop && length(columns == 1L))
res <- lapply(p[[columns]], unserialize)
else
res <- do.call(
base::mapply,
args = c(lapply(p[columns], function(z) lapply(z, unserialize)),
list(FUN = base::cbind, SIMPLIFY = FALSE,
USE.NAMES = FALSE,
MoreArgs = list(deparse.level = 0))))
if (identical(base::unname(x@spectraIds), sid))
res
else
res[fmatch(x@spectraIds, p$spectrum_id_)]
} else list()
}
#' peaks matrix is stored as a blob. The columns of this peaks matrix are
#' expected to be `"mz"` and `"intensity"`. Extracting peak matrices is
#' thus supposedly fast. But if only one column is requested or columns are
#' in a different order we need to iterate over the result.
#'
#' @noRd
.fetch_peaks_data_blob2 <- function(x, columns = c("mz", "intensity"),
drop = FALSE) {
if (length(x@dbcon)) {
p <- dbGetQuery(
x@dbcon,
stri_c("select spectrum_id_, peaks",
" from msms_spectrum_peak_blob2 where spectrum_id_ in (",
stri_c(unique(x@spectraIds), collapse = ","), ")"))
sid <- p$spectrum_id_
attributes(sid) <- NULL
if (!length(sid)) return(list())
## Get colnames of first peak matrix
cn <- colnames(unserialize(p[["peaks"]][[1L]]))
attributes(cn) <- NULL
if (identical(base::unname(columns), base::unname(cn)))
res <- lapply(p[["peaks"]], unserialize)
else
res <- lapply(p[["peaks"]], function(z) {
unserialize(z)[, columns, drop = drop]
})
if (identical(base::unname(x@spectraIds), sid))
res
else
res[fmatch(x@spectraIds, p$spectrum_id_)]
} else list()
}
.fetch_spectra_data_sql <- function(x, columns = c("spectrum_id_")) {
orig_columns <- columns
sql_columns <- unique(c("spectrum_id_", columns))
## That turns out to be faster than dbBind if we use a field in the
## database that is unique (such as spectrum_id).
res <- dbGetQuery(
x@dbcon,
stri_c("select ", stri_c(sql_columns, collapse = ","), " from ",
"msms_spectrum where spectrum_id_ in (",
stri_c(unique(x@spectraIds), collapse = ", ") , ")"))
idx <- fmatch(x@spectraIds, res$spectrum_id_)
res <- res[idx[!is.na(idx)], , drop = FALSE]
rownames(res) <- NULL
res[, orig_columns, drop = FALSE]
}
#' Get columns from the msms_spectrum_peak database table (dropping spectrum_id)
#'
#' @param x `MsBackendSql`
#'
#' @noRd
.available_peaks_variables <- function(x) {
if (length(x@dbcon)) {
tbls <- dbListTables(.dbcon(x))
tbl <- grep("msms_spectrum_peak", tbls, value = TRUE)
if (tbl == "msms_spectrum_peak_blob2") {
if (length(x))
res <- peaksData(x[1L])[[1L]]
else res <- matrix(
NA_real_, ncol = 2, nrow = 0,
dimnames = list(character(), c("mz", "intensity")))
} else
res <- dbGetQuery(
.dbcon(x), stri_c("select * from ", tbl, " limit 1"))
colnames(res)[!colnames(res) %in% c("spectrum_id_", "peak_id")]
} else character()
}
.is_maria_db <- function(x) {
inherits(x, "MariaDBConnection")
}
##
## Insertion of data below.
##
.initialize_tables_sql <- function(con, cols, partitionBy = "none",
partitionNumber = 10) {
sql_a <- stri_c("CREATE TABLE msms_spectrum (",
stri_c(names(cols), cols, sep = " ", collapse = ", "),
", spectrum_id_ INTEGER, PRIMARY KEY (spectrum_id_))")
sql_b <- stri_c("CREATE TABLE msms_spectrum_peak (mz DOUBLE, intensity ",
"REAL, spectrum_id_ INTEGER")
## MySQL/MariaDB supports partitioning
if (.is_maria_db(con)) {
sql_a <- stri_c(sql_a, " ENGINE=ARIA;")
if (partitionBy == "none")
sql_b <- stri_c(sql_b, ", INDEX (spectrum_id_)) ENGINE=ARIA;")
if (partitionBy == "spectrum")
sql_b <- stri_c(sql_b, ", INDEX (spectrum_id_)) ENGINE=ARIA ",
"PARTITION BY HASH (spectrum_id_) PARTITIONS ",
partitionNumber, ";")
if (partitionBy == "chunk")
sql_b <- stri_c(sql_b, ", partition_ SMALLINT, ",
"INDEX (spectrum_id_)) ENGINE=ARIA ",
"PARTITION BY HASH (partition_) PARTITIONS ",
partitionNumber, ";")
} else
sql_b <- stri_c(sql_b, ");")
list(sql_a, sql_b)
}
.initialize_tables <- function(con, cols, partitionBy = "none",
partitionNumber = 10) {
sql <- .initialize_tables_sql(con, cols, partitionBy, partitionNumber)
res <- dbExecute(con, sql[[1L]])
res <- dbExecute(con, sql[[2L]])
}
.initialize_tables_blob_sql <-
function(con, cols, partitionBy = "none", partitionNumber = 10,
peaks_sql = "mz MEDIUMBLOB, intensity MEDIUMBLOB",
peaks_table = "msms_spectrum_peak_blob") {
sql_a <- stri_c("CREATE TABLE msms_spectrum (",
stri_c(names(cols), cols, sep = " ", collapse = ", "),
", spectrum_id_ INTEGER, PRIMARY KEY (spectrum_id_))")
sql_b <- stri_c("CREATE TABLE ", peaks_table, " (", peaks_sql,
", spectrum_id_ INTEGER")
## MySQL/MariaDB supports partitioning
if (.is_maria_db(con)) {
sql_a <- stri_c(sql_a, " ENGINE=ARIA;")
if (partitionBy == "none")
sql_b <- stri_c(
sql_b, ", PRIMARY KEY (spectrum_id_)) ENGINE=ARIA;")
if (partitionBy == "spectrum")
sql_b <- stri_c(
sql_b, ", PRIMARY KEY (spectrum_id_)) ENGINE=ARIA ",
"PARTITION BY HASH (spectrum_id_) PARTITIONS ",
partitionNumber, ";")
if (partitionBy == "chunk")
sql_b <- stri_c(
sql_b, ", partition_ SMALLINT, ",
"PRIMARY KEY (spectrum_id_)) ENGINE=ARIA ",
"PARTITION BY HASH (partition_) PARTITIONS ",
partitionNumber, ";")
} else
sql_b <- stri_c(sql_b, ");")
list(sql_a, sql_b)
}
.initialize_tables_blob2 <- function(con, cols, partitionBy = "none",
partitionNumber = 10) {
sql <- .initialize_tables_blob_sql(con, cols, partitionBy, partitionNumber,
peaks_sql = "peaks MEDIUMBLOB",
peaks_table = "msms_spectrum_peak_blob2")
res <- dbExecute(con, sql[[1L]])
res <- dbExecute(con, sql[[2L]])
}
.initialize_tables_blob <- function(con, cols, partitionBy = "none",
partitionNumber = 10) {
sql <- .initialize_tables_blob_sql(con, cols, partitionBy, partitionNumber)
res <- dbExecute(con, sql[[1L]])
res <- dbExecute(con, sql[[2L]])
}
#' @importFrom DBI dbWriteTable
#'
#' @noRd
.insert_spectra_variables <- function(con, data) {
info <- dbGetInfo(con)
if (length(info$dbname))
data$dataStorage <- info$dbname
else data$dataStorage <- "<database>"
if (.is_maria_db(con) || inherits(con, "MySQLConnection"))
.load_data_file(con, data, "msms_spectrum")
else
dbWriteTable(con, name = "msms_spectrum", value = data, append = TRUE)
invisible(TRUE)
}
.insert_peaks <- function(con, data) {
if (.is_maria_db(con) || inherits(con, "MySQLConnection"))
.load_data_file(con, data, "msms_spectrum_peak")
else
dbWriteTable(con, name = "msms_spectrum_peak",
value = data, append = TRUE)
invisible(TRUE)
}
#' For MySQL databases: export data and use LOAD DATA FILE to import.
#'
#' @importFrom data.table fwrite
#'
#' @importFrom DBI dbExecute
#'
#' @noRd
.load_data_file <- function(con, data, name) {
f <- tempfile()
logicals <- which(vapply(data, is.logical, TRUE))
if (length(logicals))
for (i in logicals)
data[, i] <- as.integer(data[, i])
fwrite(data, file = f, row.names = FALSE, col.names = FALSE, sep = "\t",
na = "\\N", eol = "\n", quote = FALSE, showProgress = FALSE)
res <- dbExecute(
con, stri_c("LOAD DATA LOCAL INFILE '", f, "' INTO TABLE ", name,
" FIELDS TERMINATED BY 0x09;"))
file.remove(f)
}
#' Inserts the data of a single backend to a database.
#'
#' @param con database
#'
#' @param x `MsBackend` or `Spectra`. Advantage of using a `Spectra` is that it
#' performs by default parallel processing also on the `peaksData` call.
#'
#' @param index `integer(1)` defining the last used spectrum_id for peaks.
#'
#' @param partitionBy `character(1)` how and if the table should be partitioned.
#'
#' @param chunk `integer(1)` with the number of the current chunk.
#'
#' @return `integer(1)` last used spectrum_id
#'
#' @noRd
.insert_backend <- function(con, x, index = 0L,
partitionBy = "none", chunk = 0L) {
sv <- spectraVariables(x)
sv <- sv[!sv %in% c("mz", "intensity")]
spd <- as.data.frame(spectraData(x, columns = sv))
spectrum_id <- seq(index + 1L, index + nrow(spd))
spd$spectrum_id_ <- spectrum_id
.insert_spectra_variables(con, spd)
pks <- as(x, "list")
lns <- lengths(pks) / 2
pks <- as.data.frame(do.call(rbind, pks))
pks$spectrum_id_ <- rep(spectrum_id, lns)
if (partitionBy == "chunk" && .is_maria_db(con)) {
## Append an integer for the current processed chunk to be used for
## the partitioning
pks$partition_ <- chunk
}
.insert_peaks(con, pks)
spectrum_id[length(spectrum_id)]
}
#' store m/z and intensity values as BLOB.
#'
#' @noRd
.insert_backend_blob <- function(con, x, index = 0L, xdr = FALSE, ...) {
sv <- spectraVariables(x)
sv <- sv[!sv %in% c("mz", "intensity")]
spd <- as.data.frame(spectraData(x, columns = sv))
spectrum_id <- seq(index + 1L, index + nrow(spd))
spd$spectrum_id_ <- spectrum_id
.insert_spectra_variables(con, spd)
pks <- peaksData(x, columns = c("mz", "intensity"))
template <- data.frame(matrix(ncol = 0, nrow = 1))
pks_table <- do.call(rbind, lapply(pks, function(z) {
template$mz <- list(serialize(z[, 1], NULL, xdr = xdr))
template$intensity <- list(serialize(z[, 2], NULL, xdr = xdr))
template
}))
pks_table$spectrum_id_ <- spectrum_id
dbWriteTable(con, name = "msms_spectrum_peak_blob",
value = pks_table, append = TRUE)
spectrum_id[length(spectrum_id)]
}
#' store the peaks matrix as BLOB.
#'
#' @noRd
.insert_backend_blob2 <- function(con, x, index = 0L, xdr = FALSE, ...) {
sv <- spectraVariables(x)
sv <- sv[!sv %in% c("mz", "intensity")]
spd <- as.data.frame(spectraData(x, columns = sv))
spectrum_id <- seq(index + 1L, index + nrow(spd))
spd$spectrum_id_ <- spectrum_id
.insert_spectra_variables(con, spd)
pks_table <- data.frame(spectrum_id_ = spectrum_id)
pks_table$peaks <- lapply(peaksData(x, columns = c("mz", "intensity")),
base::serialize, connection = NULL, xdr = xdr)
dbWriteTable(con, name = "msms_spectrum_peak_blob2",
value = pks_table, append = TRUE)
spectrum_id[length(spectrum_id)]
}
#' Insert data sequentially from files provided by x and adds it to a database
#'
#' @details
#'
#' MySQL databases support data partitioning hence splitting huge tables into
#' multiple *partitions* which can improve data insertion and index generation.
#' For very large datasets it is suggested to enable table partitioning by
#' selecting either `partitionBy = "spectrum"` or `partitionBy = "chunk"`. The
#' first assignes consecutive spectra to different partitions while the latter
#' puts spectra from files part of the same *chunk* into the same partition.
#' Both options have about the same performance but `partitionBy = "spectrum"`
#' requires less disk space.
#'
#' @param con database connection.
#'
#' @param partitionBy `character(1)` defining if and how the peak data table
#' should be partitioned. `"none"`: no partitioning, `"spectrum"`: peaks
#' are assigned to the partition based on the spectrum ID (number), i.e.
#' spectra are evenly (consecutively) assigned across partitions. For
#' `partitionNumber = 3`, the first spectrum is assigned to the first
#' partition, the second to the second, the third to the third and the
#' fourth spectrum again to the first partition. `"chunk"`:
#' spectra processed as part of the same *chunk* are placed into the same
#' partition. All spectra from the next processed chunk are assigned to the
#' next partition. Note that this is only available for MySQL/MariaDB
#' databases, i.e., if `con` is a `MariaDBConnection`.
#' See details for more information.
#'
#' @param partitionNumber `integer(1)` defining the number of partitions the
#' database table will be partitioned into (only supported for MySQL/MariaDB
#' databases).
#'
#' @param chunksize `integer(1)` number of files that are imported/processed as
#' one *chunk*. Smaller numbers will require less memory but might result in
#' slower imports.
#'
#' @param x `character` with the raw data files from which the data should be
#' imported.
#'
#' @param storage `character(1)` defining the storage mode for the peaks data.
#' Can be either `"long"` to store each peak individually, `"blob"` to
#' store the m/z values and the intensity values as data type BLOB into the
#' database and `"blob2"` to store each peak matrix as a BLOB.
#'
#' @importFrom progress progress_bar
#'
#' @importFrom BiocParallel bpparam
#'
#' @importFrom Spectra Spectra
#'
#' @noRd
.insert_data <- function(con, x, backend = MsBackendMzR(), chunksize = 10,
partitionBy = c("none", "spectrum", "chunk"),
partitionNumber = 10,
storage = c("blob2", "blob", "long")) {
partitionBy <- match.arg(partitionBy)
storage <- match.arg(storage)
## initialize backend and initialize database.
be <- backendInitialize(backend, x[1L])
sv <- spectraVariables(be)
sv <- sv[!sv %in% c("mz", "intensity")]
spd <- as.data.frame(spectraData(be, columns = sv))
cols <- .db_data_type(con, spd)
switch(storage,
long = {
.initialize_tables(con, cols, partitionBy, partitionNumber)
peak_table <- "msms_spectrum_peak"
},
blob = {
.initialize_tables_blob(con, cols, partitionBy, partitionNumber)
peak_table <- "msms_spectrum_peak_blob"
},
blob2 = {
.initialize_tables_blob2(con, cols, partitionBy, partitionNumber)
peak_table <- "msms_spectrum_peak_blob2"
})
## Loop over x to insert data.
index <- 0
message("Importing data ... ")
idxs <- seq_along(x)
chunks <- split(idxs, ceiling(idxs / chunksize))
pb <- progress_bar$new(format = stri_c("[:bar] :current/:",
"total (:percent) in ",
":elapsed"),
total = length(chunks), clear = FALSE, force = TRUE)
pb$tick(0)
if (.is_maria_db(con)) res <- .disable_mysql_keys(con, peak_table)
for (i in seq_along(chunks)) {
s <- Spectra(source = backend, x[chunks[[i]]], BPPARAM = bpparam())
index <- switch(
storage,
long = .insert_backend(con, s, index = index, partitionBy, i),
blob = .insert_backend_blob(con, s, index = index),
blob2 = .insert_backend_blob2(con, s, index = index))
rm(s)
gc()
pb$tick(1)
}
.create_indices(con, peak_table)
}
#' @importFrom DBI dbDataType
#'
#' @noRd
.db_data_type <- function(con, x) {
if (inherits(con, "MySQLConnection"))
vapply(x, function(z) dbDataType(con, z), NA_character_)
else dbDataType(con, x)
}
.disable_mysql_keys <- function(con, peak_table = "msms_spectrum_peak") {
res <- dbExecute(con, "SET FOREIGN_KEY_CHECKS = 0;")
res <- dbExecute(con, "SET UNIQUE_CHECKS = 0;")
res <- dbExecute(con, "ALTER TABLE msms_spectrum DISABLE KEYS;")
res <- dbExecute(con,
stri_c("ALTER TABLE ", peak_table, " DISABLE KEYS;"))
res
}
#' Similar to the .insert_data but takes data from the provided `Spectra`
#' object and inserts that (chunk-wise) into the database.
#'
#' @importFrom stringi stri_c
#'
#' @noRd
.set_backend_insert_data <- function(object, f = processingChunkFactor(object),
con, BPPARAM = SerialParam(), blob = TRUE,
peaksStorageMode =
c("blob2", "long", "blob"), ...) {
if (!blob) peaksStorageMode <- "long"
peaksStorageMode <- match.arg(peaksStorageMode)
if (!length(f))
f <- rep(1L, length(object))
if (!is.factor(f))
f <- force(factor(f, levels = unique(f)))
if (length(f) != length(object))
stop("length of 'f' has to match length of 'object'")
sv <- spectraVariables(object)
sv <- sv[!sv %in% c("mz", "intensity", "spectrum_id_")]
spd <- as.data.frame(spectraData(object[1], columns = sv))
cols = .db_data_type(con, spd)
switch(peaksStorageMode,
long = {
.initialize_tables(con, cols, partitionBy = "none", 10)
peak_table <- "msms_spectrum_peak"
},
blob = {
.initialize_tables_blob(con, cols, partitionBy = "none", 10)
peak_table <- "msms_spectrum_peak_blob"
},
blob2 = {
.initialize_tables_blob2(con, cols, partitionBy = "none", 10)
peak_table <- "msms_spectrum_peak_blob2"
})
if (.is_maria_db(con)) .disable_mysql_keys(con, peak_table)
index <- 0
message("Importing data ... ")
pb <- progress_bar$new(format = stri_c("[:bar] :current/:",
"total (:percent) in ",
":elapsed"),
total = length(levels(f)), clear = FALSE,
force = TRUE)
pb$tick(0)
for (l in levels(f)) {
s <- Spectra(object@backend[f == l])
index <- switch(peaksStorageMode,
long = .insert_backend(con, s, index = index),
blob = .insert_backend_blob(con, s, index = index),
blob2 = .insert_backend_blob2(con, s, index = index)
)
pb$tick(1)
}
.create_indices(con, peak_table)
}
.create_indices <- function(con, peak_table) {
message("Creating indices ", appendLF = FALSE)
if (.is_maria_db(con)) {
res <- dbExecute(con, "SET FOREIGN_KEY_CHECKS = 1;")
message(".", appendLF = FALSE)
res <- dbExecute(con, "SET UNIQUE_CHECKS = 1;")
message(".", appendLF = FALSE)
res <- dbExecute(con, "ALTER TABLE msms_spectrum ENABLE KEYS;")
message(".", appendLF = FALSE)
res <- dbExecute(con, stri_c("ALTER TABLE ",peak_table," ENABLE KEYS;"))
message(".", appendLF = FALSE)
} else {
res <- dbExecute(con, stri_c("CREATE INDEX peak_spectrum_id on ",
peak_table, " (spectrum_id_)"))
message(".", appendLF = FALSE)
res <- dbExecute(con, stri_c("CREATE INDEX spectrum_spectrum_id on ",
"msms_spectrum (spectrum_id_)"))
message(".", appendLF = FALSE)
}
## create remaining indices
res <- dbExecute(con, stri_c("CREATE INDEX spectrum_rtime on ",
"msms_spectrum (rtime)"))
message(".", appendLF = FALSE)
res <- dbExecute(con, stri_c("CREATE INDEX spectrum_precursor_mz on ",
"msms_spectrum (precursorMz)"))
message(".", appendLF = FALSE)
res <- dbExecute(con, stri_c("CREATE INDEX spectrum_ms_level on ",
"msms_spectrum (msLevel)"))
message(" Done")
TRUE
}
#' @rdname MsBackendSql
#'
#' @importFrom Spectra MsBackendMzR
#'
#' @export
createMsBackendSqlDatabase <-
function(dbcon, x = character(), backend = MsBackendMzR(),
chunksize = 10L, blob = TRUE,
peaksStorageMode = c("blob2", "long", "blob"),
partitionBy = c("none", "spectrum", "chunk"),
partitionNumber = 10L) {
if (!blob) peaksStorageMode <- "long"
peaksStorageMode = match.arg(peaksStorageMode)
partitionBy <- match.arg(partitionBy)
if (!length(x)) return(FALSE)
if (!inherits(dbcon, "DBIConnection"))
stop("'dbcon' needs to be a valid connection to a database.")
.insert_data(dbcon, x, backend, chunksize = chunksize,
partitionBy = partitionBy,
partitionNumber = partitionNumber[1L],
storage = peaksStorageMode)
TRUE
}
.has_local_variable <- function(x, variable = character()) {
all(variable %in% colnames(x@localData))
}
.subset_query <- function(object, qry) {
ids <- dbGetQuery(.dbcon(object), qry)[, "spectrum_id_"]
extractByIndex(object, which(object@spectraIds %in% ids))
}
.precursor_mz_query <- function(mz, ppm = 20, tolerance = 0) {
lmz <- length(mz)
if (length(ppm) != lmz)
ppm <- rep(ppm[1L], lmz)
if (length(tolerance) != lmz)
tolerance <- rep(tolerance[1L], lmz)
mzdiff <- ppm(mz, ppm) + tolerance
mzr <- rep(mz, each = 2) + c(-1, 1) * rep(mzdiff, each = 2)
qry <- stri_c("precursorMz", c(" >= ", " <= "), mzr, c(" and ", " or "),
collapse = "")
substring(qry, 1, nchar(qry) - 4)
}
#' Helper function to create the SQL query to fetch IDs
#'
#' @noRd
.id_query <- function(x) {
qry <- stri_c("select spectrum_id_ from msms_spectrum where ")
if (length(x) < 2000000)
qry <- stri_c(qry, "spectrum_id_ in (",
stri_c(x@spectraIds, collapse = ","), ") and ")
qry
}
#' @importFrom MsCoreUtils vapply1c rbindFill
.combine <- function(objects) {
if (length(objects) == 1L)
return(objects[[1L]])
if (!all(vapply1c(objects, class) == class(objects[[1]])))
stop("Can only merge backends of the same type: ", class(objects[[1]]))
ids <- vapply(objects, .db_info_string, character(1))
if (length(unique(ids)) != 1L)
stop("Can only merge backends connected to the same database")
res <- objects[[1]]
res@spectraIds <- unname(
do.call(c, lapply(objects, function(z) z@spectraIds)))
## merge slots of MsBackendCached.
res@localData <- do.call(
rbindFill, lapply(objects, function(z) z@localData))
if (!nrow(res@localData))
res@localData <- data.frame(row.names = seq_along(res@spectraIds))
res@spectraVariables <- unique(
unlist(lapply(objects, function(z) z@spectraVariables),
use.names = FALSE))
res@nspectra <- length(res@spectraIds)
res
}
#' @importFrom DBI dbGetInfo
.db_info_string <- function(x) {
do.call(paste, dbGetInfo(.dbcon(x)))
}
#' Create a new `MsBackendSql` database and insert the (full) data provided
#' with a `spectraData` `DataFrame`. This is supposed to support changing
#' backend using `setBackend` to a `MsBackendSql`.
#'
#' @note
#'
#' At present we are not supporting additional peak variables. These
#' might eventually be provided with an additional parameter
#' `peaksVariables` and these would have to be extracted and
#' merged with the `peaks` data frame.
#'
#' @param data `DataFrame` with the full spectra data as returned by
#' `spectraData`.
#'
#' @note
#'
#' There is some redundancy and code duplication between this function and
#' the `insert_backend` and `insert_backend_blob` functions. These functions
#' are not (yet) merged, because the latter are designed to be more memory
#' efficient and to enable import of large data set. This function assumes
#' that the full data is already loaded into memory and passed through the
#' `data` parameter to this function (`data` being a `data.frame` such as
#' returned by `spectraData`). The main reason why this function (or
#' `setBackend`) does not support parallel or chunk-wise insertion is the
#' primary keys of the spectra and the index creation.
#'
#' @author Johannes Rainer
#'
#' @importFrom Spectra coreSpectraVariables
#'
#' @noRd
.create_from_spectra_data <- function(dbcon, data, blob = TRUE,
peaksStorageMode=c("blob2","long","blob"),
xdr = FALSE, ...) {
if (!blob) peaksStorageMode <- "long"
peaksStorageMode <- match.arg(peaksStorageMode)
tbls <- dbListTables(dbcon)
if (any(c("msms_spectrum", "msms_spectrum_peak",
"msms_spectrum_peak_blob", "msms_spectrum_peak_blob2") %in% tbls))
stop("'dbcon' contains already tables of a 'MsBackendSql' database. ",
"If this error occurred during a 'setBackend' call, try ",
"passing 'f = factor()' to that function.", call. = FALSE)
if (!all(c("mz", "intensity") %in% colnames(data)))
stop("'data' lacks required columns \"mz\" and \"intensity\"")
if (any(colnames(data) == "spectrum_id_")) {
warning("Replacing original column \"spectrum_id_\"")
data$spectrum_id_ <- NULL
}
message("preparing data ... ", appendLF = FALSE)
data <- .drop_na_columns(
data, keep = setdiff(colnames(data), names(coreSpectraVariables())))
if (!any(colnames(data) == "precursorMz"))
data$precursorMz <- NA_real_
if (!any(colnames(data) == "msLevel"))
data$msLevel <- NA_integer_
if (!any(colnames(data) == "rtime"))
data$rtime <- NA_real_
lns <- lengths(data$mz)
mzs <- as.list(data$mz)
ints <- as.list(data$intensity)
data$mz <- NULL
data$intensity <- NULL
data <- as.data.frame(data)
if (nrow(data))
data$dataStorage <- "<database>"
cols <- .db_data_type(dbcon, data)
switch(peaksStorageMode,
long = {
peak_table <- "msms_spectrum_peak"
.initialize_tables(dbcon, cols)
},
blob = {
peak_table <- "msms_spectrum_peak_blob"
.initialize_tables_blob(dbcon, cols)
},
blob2 = {
peak_table <- "msms_spectrum_peak_blob2"
.initialize_tables_blob2(dbcon, cols)
})
if (nrow(data)) {
if (.is_maria_db(dbcon)) .disable_mysql_keys(dbcon, peak_table)
sid <- seq_len(nrow(data))
data$spectrum_id_ <- sid
message("Done")
message("Inserting data ... ", appendLF = FALSE)
.insert_spectra_variables(dbcon, data)
switch(peaksStorageMode,
long = {
pks <- data.frame(mz = unlist(mzs), intensity = unlist(ints))
pks$spectrum_id_ <- rep(sid, lns)
.insert_peaks(dbcon, pks)
},
blob = {
pks <- data.frame(matrix(ncol = 0, nrow = nrow(data)))
pks$mz <- lapply(as.list(mzs), serialize,
connection = NULL, xdr = xdr)
pks$intensity <- lapply(as.list(ints), serialize,
connection = NULL, xdr = xdr)
pks$spectrum_id_ <- sid
dbWriteTable(dbcon, name = peak_table,
value = pks, append = TRUE)
},
blob2 = {
pks <- data.frame(spectrum_id_ = sid)
pks$peaks <- mapply(FUN = function(a, b) {
res <- cbind(a, b)
colnames(res) <- c("mz", "intensity")
serialize(res, connection = NULL, xdr = xdr)
}, mzs, ints, SIMPLIFY = FALSE, USE.NAMES = FALSE)
dbWriteTable(dbcon, name = peak_table,
value = pks, append = TRUE)
})
message("Done")
.create_indices(dbcon, peak_table)
} else message("Done \nInserting data ... Done")
}
#' @importFrom MsCoreUtils vapply1l
#'
#' @noRd
.drop_na_columns <- function(x, keep = character()) {
if (!nrow(x))
return(x)
nas <- vapply1l(x, function(z) {
allna <- all(is.na(z))
if (length(allna) > 1)
FALSE
else allna
})
nas <- nas & !colnames(x) %in% keep
if (any(nas))
x[, !nas, drop = FALSE]
else x
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.