R/MsBackendSql-functions.R

Defines functions .drop_na_columns .create_from_spectra_data .db_info_string .combine .id_query .precursor_mz_query .subset_query .has_local_variable createMsBackendSqlDatabase .create_indices .set_backend_insert_data .disable_mysql_keys .db_data_type .insert_data .insert_backend_blob2 .insert_backend_blob .insert_backend .load_data_file .insert_peaks .insert_spectra_variables .initialize_tables_blob .initialize_tables_blob2 .initialize_tables_blob_sql .initialize_tables .initialize_tables_sql .is_maria_db .available_peaks_variables .fetch_spectra_data_sql .fetch_peaks_data_blob2 .fetch_peaks_data_blob .fetch_peaks_data_long .spectra_data_sql .dbcon .valid_dbcon MsBackendSql

Documented in createMsBackendSqlDatabase MsBackendSql

#' @rdname MsBackendSql
#'
#' @export MsBackendSql
MsBackendSql <- function() {
    new("MsBackendSql")
}

#' @importFrom DBI dbListTables
#'
#' @noRd
.valid_dbcon <- function(x) {
    if (length(x)) {
        if (!inherits(x, "DBIConnection"))
            return("'dbcon' is expected to be a connection to a database")
        tables <- dbListTables(x)
        if (!all(c("msms_spectrum", "msms_spectrum_peak") %in% tables |
                 c("msms_spectrum", "msms_spectrum_peak_blob") %in% tables |
                 c("msms_spectrum", "msms_spectrum_peak_blob2") %in% tables))
            return("Database lacks some required tables.")
    }
    NULL
}

.dbcon <- function(x) {
    x@dbcon
}

#' Returns the spectra data, from the database and eventually filling with
#' *core* spectra variables, if they are not available in the database.
#'
#' The data can be either:
#' - in the database.
#' - in the local data (if new variables were added with $name <-).
#' - core spectra variables - if they are not in the database they have to be
#'   initialized with `NA` and the correct data type.
#'
#' @return a `data.frame` - always, even if only with a single column.
#'
#' @importFrom IRanges NumericList CharacterList
#'
#' @importFrom S4Vectors extractCOLS
#'
#' @importFrom S4Vectors make_zero_col_DFrame
#'
#' @importFrom methods as callNextMethod getMethod
#'
#' @author Johannes Rainer
#'
#' @noRd
.spectra_data_sql <- function(x, columns = spectraVariables(x)) {
    res <- getMethod("spectraData", "MsBackendCached")(x, columns = columns)
    if (is.null(res))
        res <- make_zero_col_DFrame(length(x))
    ## Define what needs to be still retrieved.
    db_cols <- intersect(columns, x@spectraVariables)
    db_cols <- db_cols[!db_cols %in% c("mz", "intensity", colnames(res))]
    mz_cols <- intersect(columns, c("mz", "intensity"))

    if (length(db_cols))
        res <- cbind(
            res, as(.fetch_spectra_data_sql(x, columns = db_cols), "DataFrame"))
    ## Get m/z and intensity values
    if (length(mz_cols)) {
        if (any(mz_cols == "mz")) {
            if (nrow(res))
                res$mz <- NumericList(x@peak_fun(x, "mz", TRUE),
                                      compress = FALSE)
            else res$mz <- NumericList(compress = FALSE)
        }
        if (any(mz_cols == "intensity")) {
            if (nrow(res))
                res$intensity <- NumericList(x@peak_fun(x, "intensity", TRUE),
                                             compress = FALSE)
            else res$intensity <- NumericList(compress = FALSE)
        }
    }
    if (any(columns == "centroided") && !is.logical(res$centroided))
        res$centroided <- as.logical(res$centroided)
    if (any(columns == "smoothed") && !is.logical(res$smoothed))
        res$smoothed <- as.logical(res$smoothed)
    extractCOLS(res, columns)
}


#' @param x backend
#'
#' @param columns `character` with the column(s) to retrieve.
#'
#' @param drop `logical(1)` whether dimensions should be dropped if a single
#'     column is requested.
#'
#' @noRd
.fetch_peaks_data_long <- function(x, columns = c("mz", "intensity"),
                                   drop = FALSE) {
    emat <- matrix(NA_real_, ncol = length(columns), nrow = 0,
                   dimnames = list(character(), columns))
    if (length(x@dbcon)) {
        p <- dbGetQuery(
            x@dbcon,
            stri_c("select spectrum_id_,", stri_c(columns, collapse = ","),
                   " from msms_spectrum_peak where spectrum_id_ in (",
                   stri_c(unique(x@spectraIds), collapse = ","), ")"))
        sid <- as.factor(p$spectrum_id)
        if (drop && length(columns == 1L)) {
            p <- split(p[, columns], sid)[as.character(x@spectraIds)]
            el <- which(lengths(p) == 0)
            if (length(el))
                p[el] <- replicate(length(el), numeric())
            unname(p)
        } else {
            p <- split.data.frame(
                as.matrix(p[, columns, drop = drop]), sid)[as.character(
                                     x@spectraIds)]
            el <- which(lengths(p) == 0)
            if (length(el))
                p[el] <- replicate(length(el), emat)
            unname(p)
        }
    } else list()
}

#' Data is stored as blob, so we always have a spectrum entry, even if it
#' has no peaks.
#'
#' @importFrom fastmatch fmatch
#'
#' @noRd
.fetch_peaks_data_blob <- function(x, columns = c("mz", "intensity"),
                                   drop = FALSE) {
    if (length(x@dbcon)) {
        p <- dbGetQuery(
            x@dbcon,
            stri_c("select spectrum_id_,", stri_c(columns, collapse = ","),
                   " from msms_spectrum_peak_blob where spectrum_id_ in (",
                   stri_c(unique(x@spectraIds), collapse = ","), ")"))
        sid <- p$spectrum_id_
        attributes(sid) <- NULL
        if (!length(sid)) return(list())
        if (drop && length(columns == 1L))
            res <- lapply(p[[columns]], unserialize)
        else
            res <- do.call(
                base::mapply,
                args = c(lapply(p[columns], function(z) lapply(z, unserialize)),
                         list(FUN = base::cbind, SIMPLIFY = FALSE,
                              USE.NAMES = FALSE,
                              MoreArgs = list(deparse.level = 0))))
        if (identical(base::unname(x@spectraIds), sid))
            res
        else
            res[fmatch(x@spectraIds, p$spectrum_id_)]
    } else list()
}

#' peaks matrix is stored as a blob. The columns of this peaks matrix are
#' expected to be `"mz"` and `"intensity"`. Extracting peak matrices is
#' thus supposedly fast. But if only one column is requested or columns are
#' in a different order we need to iterate over the result.
#'
#' @noRd
.fetch_peaks_data_blob2 <- function(x, columns = c("mz", "intensity"),
                                    drop = FALSE) {
    if (length(x@dbcon)) {
        p <- dbGetQuery(
            x@dbcon,
            stri_c("select spectrum_id_, peaks",
                   " from msms_spectrum_peak_blob2 where spectrum_id_ in (",
                   stri_c(unique(x@spectraIds), collapse = ","), ")"))
        sid <- p$spectrum_id_
        attributes(sid) <- NULL
        if (!length(sid)) return(list())
        ## Get colnames of first peak matrix
        cn <- colnames(unserialize(p[["peaks"]][[1L]]))
        attributes(cn) <- NULL
        if (identical(base::unname(columns), base::unname(cn)))
            res <- lapply(p[["peaks"]], unserialize)
        else
            res <- lapply(p[["peaks"]], function(z) {
                unserialize(z)[, columns, drop = drop]
            })
        if (identical(base::unname(x@spectraIds), sid))
            res
        else
            res[fmatch(x@spectraIds, p$spectrum_id_)]
    } else list()
}

.fetch_spectra_data_sql <- function(x, columns = c("spectrum_id_")) {
    orig_columns <- columns
    sql_columns <- unique(c("spectrum_id_", columns))
    ## That turns out to be faster than dbBind if we use a field in the
    ## database that is unique (such as spectrum_id).
    res <- dbGetQuery(
        x@dbcon,
        stri_c("select ", stri_c(sql_columns, collapse = ","), " from ",
               "msms_spectrum where spectrum_id_ in (",
               stri_c(unique(x@spectraIds), collapse = ", ") , ")"))
    idx <- fmatch(x@spectraIds, res$spectrum_id_)
    res <- res[idx[!is.na(idx)], , drop = FALSE]
    rownames(res) <- NULL
    res[, orig_columns, drop = FALSE]
}

#' Get columns from the msms_spectrum_peak database table (dropping spectrum_id)
#'
#' @param x `MsBackendSql`
#'
#' @noRd
.available_peaks_variables <- function(x) {
    if (length(x@dbcon)) {
        tbls <- dbListTables(.dbcon(x))
        tbl <- grep("msms_spectrum_peak", tbls, value = TRUE)
        if (tbl == "msms_spectrum_peak_blob2") {
            if (length(x))
                res <- peaksData(x[1L])[[1L]]
            else res <- matrix(
                     NA_real_, ncol = 2, nrow = 0,
                     dimnames = list(character(), c("mz", "intensity")))
        } else
            res <- dbGetQuery(
                .dbcon(x), stri_c("select * from ", tbl, " limit 1"))
        colnames(res)[!colnames(res) %in% c("spectrum_id_", "peak_id")]
    } else character()
}

.is_maria_db <- function(x) {
    inherits(x, "MariaDBConnection")
}

##
## Insertion of data below.
##
.initialize_tables_sql <- function(con, cols, partitionBy = "none",
                                   partitionNumber = 10) {
    sql_a <- stri_c("CREATE TABLE msms_spectrum (",
                    stri_c(names(cols), cols, sep = " ", collapse = ", "),
                    ", spectrum_id_ INTEGER, PRIMARY KEY (spectrum_id_))")
    sql_b <- stri_c("CREATE TABLE msms_spectrum_peak (mz DOUBLE, intensity ",
                    "REAL, spectrum_id_ INTEGER")
    ## MySQL/MariaDB supports partitioning
    if (.is_maria_db(con)) {
        sql_a <- stri_c(sql_a, " ENGINE=ARIA;")
        if (partitionBy == "none")
            sql_b <- stri_c(sql_b, ", INDEX (spectrum_id_)) ENGINE=ARIA;")
        if (partitionBy == "spectrum")
            sql_b <- stri_c(sql_b, ", INDEX (spectrum_id_)) ENGINE=ARIA ",
                            "PARTITION BY HASH (spectrum_id_) PARTITIONS ",
                            partitionNumber, ";")
        if (partitionBy == "chunk")
            sql_b <- stri_c(sql_b, ", partition_ SMALLINT, ",
                            "INDEX (spectrum_id_)) ENGINE=ARIA ",
                            "PARTITION BY HASH (partition_) PARTITIONS ",
                            partitionNumber, ";")
    } else
        sql_b <- stri_c(sql_b, ");")
    list(sql_a, sql_b)
}

.initialize_tables <- function(con, cols, partitionBy = "none",
                               partitionNumber = 10) {
    sql <- .initialize_tables_sql(con, cols, partitionBy, partitionNumber)
    res <- dbExecute(con, sql[[1L]])
    res <- dbExecute(con, sql[[2L]])
}

.initialize_tables_blob_sql <-
    function(con, cols, partitionBy = "none", partitionNumber = 10,
             peaks_sql = "mz MEDIUMBLOB, intensity MEDIUMBLOB",
             peaks_table = "msms_spectrum_peak_blob") {
        sql_a <- stri_c("CREATE TABLE msms_spectrum (",
                        stri_c(names(cols), cols, sep = " ", collapse = ", "),
                        ", spectrum_id_ INTEGER, PRIMARY KEY (spectrum_id_))")
        sql_b <- stri_c("CREATE TABLE ", peaks_table, " (", peaks_sql,
                        ", spectrum_id_ INTEGER")
        ## MySQL/MariaDB supports partitioning
        if (.is_maria_db(con)) {
            sql_a <- stri_c(sql_a, " ENGINE=ARIA;")
            if (partitionBy == "none")
                sql_b <- stri_c(
                    sql_b, ", PRIMARY KEY (spectrum_id_)) ENGINE=ARIA;")
            if (partitionBy == "spectrum")
                sql_b <- stri_c(
                    sql_b, ", PRIMARY KEY (spectrum_id_)) ENGINE=ARIA ",
                    "PARTITION BY HASH (spectrum_id_) PARTITIONS ",
                    partitionNumber, ";")
            if (partitionBy == "chunk")
                sql_b <- stri_c(
                    sql_b, ", partition_ SMALLINT, ",
                    "PRIMARY KEY (spectrum_id_)) ENGINE=ARIA ",
                    "PARTITION BY HASH (partition_) PARTITIONS ",
                    partitionNumber, ";")
        } else
            sql_b <- stri_c(sql_b, ");")
        list(sql_a, sql_b)
    }

.initialize_tables_blob2 <- function(con, cols, partitionBy = "none",
                                     partitionNumber = 10) {
    sql <- .initialize_tables_blob_sql(con, cols, partitionBy, partitionNumber,
                                       peaks_sql = "peaks MEDIUMBLOB",
                                       peaks_table = "msms_spectrum_peak_blob2")
    res <- dbExecute(con, sql[[1L]])
    res <- dbExecute(con, sql[[2L]])
}

.initialize_tables_blob <- function(con, cols, partitionBy = "none",
                                    partitionNumber = 10) {
    sql <- .initialize_tables_blob_sql(con, cols, partitionBy, partitionNumber)
    res <- dbExecute(con, sql[[1L]])
    res <- dbExecute(con, sql[[2L]])
}

#' @importFrom DBI dbWriteTable
#'
#' @noRd
.insert_spectra_variables <- function(con, data) {
    info <- dbGetInfo(con)
    if (length(info$dbname))
        data$dataStorage <- info$dbname
    else data$dataStorage <- "<database>"
    if (.is_maria_db(con) || inherits(con, "MySQLConnection"))
        .load_data_file(con, data, "msms_spectrum")
    else
        dbWriteTable(con, name = "msms_spectrum", value = data, append = TRUE)
    invisible(TRUE)
}

.insert_peaks <- function(con, data) {
    if (.is_maria_db(con) || inherits(con, "MySQLConnection"))
        .load_data_file(con, data, "msms_spectrum_peak")
    else
        dbWriteTable(con, name = "msms_spectrum_peak",
                     value = data, append = TRUE)
    invisible(TRUE)
}

#' For MySQL databases: export data and use LOAD DATA FILE to import.
#'
#' @importFrom data.table fwrite
#'
#' @importFrom DBI dbExecute
#'
#' @noRd
.load_data_file <- function(con, data, name) {
    f <- tempfile()
    logicals <- which(vapply(data, is.logical, TRUE))
    if (length(logicals))
        for (i in logicals)
            data[, i] <- as.integer(data[, i])
    fwrite(data, file = f, row.names = FALSE, col.names = FALSE, sep = "\t",
           na = "\\N", eol = "\n", quote = FALSE, showProgress = FALSE)
    res <- dbExecute(
        con, stri_c("LOAD DATA LOCAL INFILE '", f, "' INTO TABLE ", name,
                    " FIELDS TERMINATED BY 0x09;"))
    file.remove(f)
}

#' Inserts the data of a single backend to a database.
#'
#' @param con database
#'
#' @param x `MsBackend` or `Spectra`. Advantage of using a `Spectra` is that it
#'     performs by default parallel processing also on the `peaksData` call.
#'
#' @param index `integer(1)` defining the last used spectrum_id for peaks.
#'
#' @param partitionBy `character(1)` how and if the table should be partitioned.
#'
#' @param chunk `integer(1)` with the number of the current chunk.
#'
#' @return `integer(1)` last used spectrum_id
#'
#' @noRd
.insert_backend <- function(con, x, index = 0L,
                            partitionBy = "none", chunk = 0L) {
    sv <- spectraVariables(x)
    sv <- sv[!sv %in% c("mz", "intensity")]
    spd <- as.data.frame(spectraData(x, columns = sv))
    spectrum_id <- seq(index + 1L, index + nrow(spd))
    spd$spectrum_id_ <- spectrum_id
    .insert_spectra_variables(con, spd)
    pks <- as(x, "list")
    lns <- lengths(pks) / 2
    pks <- as.data.frame(do.call(rbind, pks))
    pks$spectrum_id_ <- rep(spectrum_id, lns)
    if (partitionBy == "chunk" && .is_maria_db(con)) {
        ## Append an integer for the current processed chunk to be used for
        ## the partitioning
        pks$partition_ <- chunk
    }
    .insert_peaks(con, pks)
    spectrum_id[length(spectrum_id)]
}

#' store m/z and intensity values as BLOB.
#'
#' @noRd
.insert_backend_blob <- function(con, x, index = 0L, xdr = FALSE, ...) {
    sv <- spectraVariables(x)
    sv <- sv[!sv %in% c("mz", "intensity")]
    spd <- as.data.frame(spectraData(x, columns = sv))
    spectrum_id <- seq(index + 1L, index + nrow(spd))
    spd$spectrum_id_ <- spectrum_id
    .insert_spectra_variables(con, spd)
    pks <- peaksData(x, columns = c("mz", "intensity"))
    template <- data.frame(matrix(ncol = 0, nrow = 1))
    pks_table <- do.call(rbind, lapply(pks, function(z) {
        template$mz <- list(serialize(z[, 1], NULL, xdr = xdr))
        template$intensity <- list(serialize(z[, 2], NULL, xdr = xdr))
        template
    }))
    pks_table$spectrum_id_ <- spectrum_id
    dbWriteTable(con, name = "msms_spectrum_peak_blob",
                 value = pks_table, append = TRUE)
    spectrum_id[length(spectrum_id)]
}

#' store the peaks matrix as BLOB.
#'
#' @noRd
.insert_backend_blob2 <- function(con, x, index = 0L, xdr = FALSE, ...) {
    sv <- spectraVariables(x)
    sv <- sv[!sv %in% c("mz", "intensity")]
    spd <- as.data.frame(spectraData(x, columns = sv))
    spectrum_id <- seq(index + 1L, index + nrow(spd))
    spd$spectrum_id_ <- spectrum_id
    .insert_spectra_variables(con, spd)
    pks_table <- data.frame(spectrum_id_ = spectrum_id)
    pks_table$peaks <- lapply(peaksData(x, columns = c("mz", "intensity")),
                              base::serialize, connection = NULL, xdr = xdr)
    dbWriteTable(con, name = "msms_spectrum_peak_blob2",
                 value = pks_table, append = TRUE)
    spectrum_id[length(spectrum_id)]
}

#' Insert data sequentially from files provided by x and adds it to a database
#'
#' @details
#'
#' MySQL databases support data partitioning hence splitting huge tables into
#' multiple *partitions* which can improve data insertion and index generation.
#' For very large datasets it is suggested to enable table partitioning by
#' selecting either `partitionBy = "spectrum"` or `partitionBy = "chunk"`. The
#' first assignes consecutive spectra to different partitions while the latter
#' puts spectra from files part of the same *chunk* into the same partition.
#' Both options have about the same performance but `partitionBy = "spectrum"`
#' requires less disk space.
#'
#' @param con database connection.
#'
#' @param partitionBy `character(1)` defining if and how the peak data table
#'     should be partitioned. `"none"`: no partitioning, `"spectrum"`: peaks
#'     are assigned to the partition based on the spectrum ID (number), i.e.
#'     spectra are evenly (consecutively) assigned across partitions. For
#'     `partitionNumber = 3`, the first spectrum is assigned to the first
#'     partition, the second to the second, the third to the third and the
#'     fourth spectrum again to the first partition. `"chunk"`:
#'     spectra processed as part of the same *chunk* are placed into the same
#'     partition. All spectra from the next processed chunk are assigned to the
#'     next partition. Note that this is only available for MySQL/MariaDB
#'     databases, i.e., if `con` is a `MariaDBConnection`.
#'     See details for more information.
#'
#' @param partitionNumber `integer(1)` defining the number of partitions the
#'     database table will be partitioned into (only supported for MySQL/MariaDB
#'     databases).
#'
#' @param chunksize `integer(1)` number of files that are imported/processed as
#'     one *chunk*. Smaller numbers will require less memory but might result in
#'     slower imports.
#'
#' @param x `character` with the raw data files from which the data should be
#'     imported.
#'
#' @param storage `character(1)` defining the storage mode for the peaks data.
#'     Can be either `"long"` to store each peak individually, `"blob"` to
#'     store the m/z values and the intensity values as data type BLOB into the
#'     database and `"blob2"` to store each peak matrix as a BLOB.
#'
#' @importFrom progress progress_bar
#'
#' @importFrom BiocParallel bpparam
#'
#' @importFrom Spectra Spectra
#'
#' @noRd
.insert_data <- function(con, x, backend = MsBackendMzR(), chunksize = 10,
                         partitionBy = c("none", "spectrum", "chunk"),
                         partitionNumber = 10,
                         storage = c("blob2", "blob", "long")) {
    partitionBy <- match.arg(partitionBy)
    storage <- match.arg(storage)
    ## initialize backend and initialize database.
    be <- backendInitialize(backend, x[1L])
    sv <- spectraVariables(be)
    sv <- sv[!sv %in% c("mz", "intensity")]
    spd <- as.data.frame(spectraData(be, columns = sv))
    cols <- .db_data_type(con, spd)
    switch(storage,
           long = {
               .initialize_tables(con, cols, partitionBy, partitionNumber)
               peak_table <- "msms_spectrum_peak"
           },
           blob = {
               .initialize_tables_blob(con, cols, partitionBy, partitionNumber)
               peak_table <- "msms_spectrum_peak_blob"
           },
           blob2 = {
               .initialize_tables_blob2(con, cols, partitionBy, partitionNumber)
               peak_table <- "msms_spectrum_peak_blob2"
           })
    ## Loop over x to insert data.
    index <- 0
    message("Importing data ... ")
    idxs <- seq_along(x)
    chunks <- split(idxs, ceiling(idxs / chunksize))
    pb <- progress_bar$new(format = stri_c("[:bar] :current/:",
                                           "total (:percent) in ",
                                           ":elapsed"),
                           total = length(chunks), clear = FALSE, force = TRUE)
    pb$tick(0)
    if (.is_maria_db(con)) res <- .disable_mysql_keys(con, peak_table)
    for (i in seq_along(chunks)) {
        s <- Spectra(source = backend, x[chunks[[i]]], BPPARAM = bpparam())
        index <- switch(
            storage,
            long = .insert_backend(con, s, index = index, partitionBy, i),
            blob = .insert_backend_blob(con, s, index = index),
            blob2 = .insert_backend_blob2(con, s, index = index))
        rm(s)
        gc()
        pb$tick(1)
    }
    .create_indices(con, peak_table)
}

#' @importFrom DBI dbDataType
#'
#' @noRd
.db_data_type <- function(con, x) {
    if (inherits(con, "MySQLConnection"))
        vapply(x, function(z) dbDataType(con, z), NA_character_)
    else dbDataType(con, x)
}

.disable_mysql_keys <- function(con, peak_table = "msms_spectrum_peak") {
    res <- dbExecute(con, "SET FOREIGN_KEY_CHECKS = 0;")
    res <- dbExecute(con, "SET UNIQUE_CHECKS = 0;")
    res <- dbExecute(con, "ALTER TABLE msms_spectrum DISABLE KEYS;")
    res <- dbExecute(con,
                     stri_c("ALTER TABLE ", peak_table, " DISABLE KEYS;"))
    res
}

#' Similar to the .insert_data but takes data from the provided `Spectra`
#' object and inserts that (chunk-wise) into the database.
#'
#' @importFrom stringi stri_c
#'
#' @noRd
.set_backend_insert_data <- function(object, f = processingChunkFactor(object),
                                     con, BPPARAM = SerialParam(), blob = TRUE,
                                     peaksStorageMode =
                                         c("blob2", "long", "blob"), ...) {
    if (!blob) peaksStorageMode <- "long"
    peaksStorageMode <- match.arg(peaksStorageMode)
    if (!length(f))
        f <- rep(1L, length(object))
    if (!is.factor(f))
        f <- force(factor(f, levels = unique(f)))
    if (length(f) != length(object))
        stop("length of 'f' has to match length of 'object'")
    sv <- spectraVariables(object)
    sv <- sv[!sv %in% c("mz", "intensity", "spectrum_id_")]
    spd <- as.data.frame(spectraData(object[1], columns = sv))
    cols = .db_data_type(con, spd)
    switch(peaksStorageMode,
           long = {
               .initialize_tables(con, cols, partitionBy = "none", 10)
               peak_table <- "msms_spectrum_peak"
           },
           blob = {
               .initialize_tables_blob(con, cols, partitionBy = "none", 10)
               peak_table <- "msms_spectrum_peak_blob"
           },
           blob2 = {
               .initialize_tables_blob2(con, cols, partitionBy = "none", 10)
               peak_table <- "msms_spectrum_peak_blob2"
           })
    if (.is_maria_db(con)) .disable_mysql_keys(con, peak_table)
    index <- 0
    message("Importing data ... ")
    pb <- progress_bar$new(format = stri_c("[:bar] :current/:",
                                           "total (:percent) in ",
                                           ":elapsed"),
                           total = length(levels(f)), clear = FALSE,
                           force = TRUE)
    pb$tick(0)
    for (l in levels(f)) {
        s <- Spectra(object@backend[f == l])
        index <- switch(peaksStorageMode,
                        long = .insert_backend(con, s, index = index),
                        blob = .insert_backend_blob(con, s, index = index),
                        blob2 = .insert_backend_blob2(con, s, index = index)
                        )
        pb$tick(1)
    }
    .create_indices(con, peak_table)
}

.create_indices <- function(con, peak_table) {
    message("Creating indices ", appendLF = FALSE)
    if (.is_maria_db(con)) {
        res <- dbExecute(con, "SET FOREIGN_KEY_CHECKS = 1;")
        message(".", appendLF = FALSE)
        res <- dbExecute(con, "SET UNIQUE_CHECKS = 1;")
        message(".", appendLF = FALSE)
        res <- dbExecute(con, "ALTER TABLE msms_spectrum ENABLE KEYS;")
        message(".", appendLF = FALSE)
        res <- dbExecute(con, stri_c("ALTER TABLE ",peak_table," ENABLE KEYS;"))
        message(".", appendLF = FALSE)
    } else {
        res <- dbExecute(con, stri_c("CREATE INDEX peak_spectrum_id on ",
                                     peak_table, " (spectrum_id_)"))
        message(".", appendLF = FALSE)
        res <- dbExecute(con, stri_c("CREATE INDEX spectrum_spectrum_id on ",
                                     "msms_spectrum (spectrum_id_)"))
        message(".", appendLF = FALSE)
    }
    ## create remaining indices
    res <- dbExecute(con, stri_c("CREATE INDEX spectrum_rtime on ",
                                  "msms_spectrum (rtime)"))
    message(".", appendLF = FALSE)
    res <- dbExecute(con, stri_c("CREATE INDEX spectrum_precursor_mz on ",
                                  "msms_spectrum (precursorMz)"))
    message(".", appendLF = FALSE)
    res <- dbExecute(con, stri_c("CREATE INDEX spectrum_ms_level on ",
                                  "msms_spectrum (msLevel)"))
    message(" Done")
    TRUE
}

#' @rdname MsBackendSql
#'
#' @importFrom Spectra MsBackendMzR
#'
#' @export
createMsBackendSqlDatabase <-
    function(dbcon, x = character(), backend = MsBackendMzR(),
             chunksize = 10L, blob = TRUE,
             peaksStorageMode = c("blob2", "long", "blob"),
             partitionBy = c("none", "spectrum", "chunk"),
             partitionNumber = 10L) {
        if (!blob) peaksStorageMode <- "long"
        peaksStorageMode = match.arg(peaksStorageMode)
        partitionBy <- match.arg(partitionBy)
        if (!length(x)) return(FALSE)
        if (!inherits(dbcon, "DBIConnection"))
            stop("'dbcon' needs to be a valid connection to a database.")
        .insert_data(dbcon, x, backend, chunksize = chunksize,
                     partitionBy = partitionBy,
                     partitionNumber = partitionNumber[1L],
                     storage = peaksStorageMode)
        TRUE
    }

.has_local_variable <- function(x, variable = character()) {
    all(variable %in% colnames(x@localData))
}

.subset_query <- function(object, qry) {
            ids <- dbGetQuery(.dbcon(object), qry)[, "spectrum_id_"]
            extractByIndex(object, which(object@spectraIds %in% ids))
}

.precursor_mz_query <- function(mz, ppm = 20, tolerance = 0) {
    lmz <- length(mz)
    if (length(ppm) != lmz)
        ppm <- rep(ppm[1L], lmz)
    if (length(tolerance) != lmz)
        tolerance <- rep(tolerance[1L], lmz)
    mzdiff <- ppm(mz, ppm) + tolerance
    mzr <- rep(mz, each = 2) + c(-1, 1) * rep(mzdiff, each = 2)
    qry <- stri_c("precursorMz", c(" >= ", " <= "), mzr, c(" and ", " or "),
                  collapse = "")
    substring(qry, 1, nchar(qry) - 4)
}

#' Helper function to create the SQL query to fetch IDs
#'
#' @noRd
.id_query <- function(x) {
    qry <- stri_c("select spectrum_id_ from msms_spectrum where ")
    if (length(x) < 2000000)
        qry <- stri_c(qry, "spectrum_id_ in (",
                      stri_c(x@spectraIds, collapse = ","), ") and ")
    qry
}

#' @importFrom MsCoreUtils vapply1c rbindFill
.combine <- function(objects) {
    if (length(objects) == 1L)
        return(objects[[1L]])
    if (!all(vapply1c(objects, class) == class(objects[[1]])))
        stop("Can only merge backends of the same type: ", class(objects[[1]]))
    ids <- vapply(objects, .db_info_string, character(1))
    if (length(unique(ids)) != 1L)
        stop("Can only merge backends connected to the same database")
    res <- objects[[1]]
    res@spectraIds <- unname(
        do.call(c, lapply(objects, function(z) z@spectraIds)))
    ## merge slots of MsBackendCached.
    res@localData <- do.call(
        rbindFill, lapply(objects, function(z) z@localData))
    if (!nrow(res@localData))
        res@localData <- data.frame(row.names = seq_along(res@spectraIds))
    res@spectraVariables <- unique(
        unlist(lapply(objects, function(z) z@spectraVariables),
               use.names = FALSE))
    res@nspectra <- length(res@spectraIds)
    res
}

#' @importFrom DBI dbGetInfo
.db_info_string <- function(x) {
    do.call(paste, dbGetInfo(.dbcon(x)))
}

#' Create a new `MsBackendSql` database and insert the (full) data provided
#' with a `spectraData` `DataFrame`. This is supposed to support changing
#' backend using `setBackend` to a `MsBackendSql`.
#'
#' @note
#'
#' At present we are not supporting additional peak variables. These
#' might eventually be provided with an additional parameter
#' `peaksVariables` and these would have to be extracted and
#' merged with the `peaks` data frame.
#'
#' @param data `DataFrame` with the full spectra data as returned by
#'     `spectraData`.
#'
#' @note
#'
#' There is some redundancy and code duplication between this function and
#' the `insert_backend` and `insert_backend_blob` functions. These functions
#' are not (yet) merged, because the latter are designed to be more memory
#' efficient and to enable import of large data set. This function assumes
#' that the full data is already loaded into memory and passed through the
#' `data` parameter to this function (`data` being a `data.frame` such as
#' returned by `spectraData`). The main reason why this function (or
#' `setBackend`) does not support parallel or chunk-wise insertion is the
#' primary keys of the spectra and the index creation.
#'
#' @author Johannes Rainer
#'
#' @importFrom Spectra coreSpectraVariables
#'
#' @noRd
.create_from_spectra_data <- function(dbcon, data, blob = TRUE,
                                      peaksStorageMode=c("blob2","long","blob"),
                                      xdr = FALSE, ...) {
    if (!blob) peaksStorageMode <- "long"
    peaksStorageMode <- match.arg(peaksStorageMode)
    tbls <- dbListTables(dbcon)
    if (any(c("msms_spectrum", "msms_spectrum_peak",
              "msms_spectrum_peak_blob", "msms_spectrum_peak_blob2") %in% tbls))
        stop("'dbcon' contains already tables of a 'MsBackendSql' database. ",
             "If this error occurred during a 'setBackend' call, try ",
             "passing 'f = factor()' to that function.", call. = FALSE)
    if (!all(c("mz", "intensity") %in% colnames(data)))
        stop("'data' lacks required columns \"mz\" and \"intensity\"")
    if (any(colnames(data) == "spectrum_id_")) {
        warning("Replacing original column \"spectrum_id_\"")
        data$spectrum_id_ <- NULL
    }
    message("preparing data ... ", appendLF = FALSE)
    data <- .drop_na_columns(
        data, keep = setdiff(colnames(data), names(coreSpectraVariables())))
    if (!any(colnames(data) == "precursorMz"))
        data$precursorMz <- NA_real_
    if (!any(colnames(data) == "msLevel"))
        data$msLevel <- NA_integer_
    if (!any(colnames(data) == "rtime"))
        data$rtime <- NA_real_
    lns <- lengths(data$mz)
    mzs <- as.list(data$mz)
    ints <- as.list(data$intensity)
    data$mz <- NULL
    data$intensity <- NULL
    data <- as.data.frame(data)
    if (nrow(data))
        data$dataStorage <- "<database>"
    cols <- .db_data_type(dbcon, data)
    switch(peaksStorageMode,
           long = {
               peak_table <- "msms_spectrum_peak"
               .initialize_tables(dbcon, cols)
           },
           blob = {
               peak_table <- "msms_spectrum_peak_blob"
               .initialize_tables_blob(dbcon, cols)
           },
           blob2 = {
               peak_table <- "msms_spectrum_peak_blob2"
               .initialize_tables_blob2(dbcon, cols)
           })
    if (nrow(data)) {
        if (.is_maria_db(dbcon)) .disable_mysql_keys(dbcon, peak_table)
        sid <- seq_len(nrow(data))
        data$spectrum_id_ <- sid
        message("Done")
        message("Inserting data ... ", appendLF = FALSE)
        .insert_spectra_variables(dbcon, data)
        switch(peaksStorageMode,
               long = {
                   pks <- data.frame(mz = unlist(mzs), intensity = unlist(ints))
                   pks$spectrum_id_ <- rep(sid, lns)
                   .insert_peaks(dbcon, pks)
               },
               blob = {
                   pks <- data.frame(matrix(ncol = 0, nrow = nrow(data)))
                   pks$mz <- lapply(as.list(mzs), serialize,
                                    connection = NULL, xdr = xdr)
                   pks$intensity <- lapply(as.list(ints), serialize,
                                           connection = NULL, xdr = xdr)
                   pks$spectrum_id_ <- sid
                   dbWriteTable(dbcon, name = peak_table,
                                value = pks, append = TRUE)
               },
               blob2 = {
                   pks <- data.frame(spectrum_id_ = sid)
                   pks$peaks <- mapply(FUN = function(a, b) {
                       res <- cbind(a, b)
                       colnames(res) <- c("mz", "intensity")
                       serialize(res, connection = NULL, xdr = xdr)
                   }, mzs, ints, SIMPLIFY = FALSE, USE.NAMES = FALSE)
                   dbWriteTable(dbcon, name = peak_table,
                                value = pks, append = TRUE)
               })
        message("Done")
        .create_indices(dbcon, peak_table)
    } else message("Done \nInserting data ... Done")
}

#' @importFrom MsCoreUtils vapply1l
#'
#' @noRd
.drop_na_columns <- function(x, keep = character()) {
    if (!nrow(x))
        return(x)
    nas <- vapply1l(x, function(z) {
        allna <- all(is.na(z))
        if (length(allna) > 1)
            FALSE
        else allna
    })
    nas <- nas & !colnames(x) %in% keep
    if (any(nas))
        x[, !nas, drop = FALSE]
    else x
}
rformassspectrometry/MsqlBackend documentation built on July 3, 2025, 8:46 p.m.