R/sql.R

Defines functions .sql_filter_metadata .sql_meta_list .sql_meta .sql_meta_remove .sql_meta_gets .get_nonrelative_ids .get_all_colnames .get_tbl_rid .get_rid_filenotfound .sql_set_fpath .sql_set_expires .sql_get_expires .sql_set_etag .sql_get_etag .sql_set_last_modified .sql_get_last_modified .get_all_web_rids .get_all_rids .sql_clean_cache .sql_set_rtype .sql_set_rname .sql_set_time .sql_set_rpath .sql_get_rpath .sql_get_fpath .sql_get_rtype .sql_get_rname .sql_get_field .sql_get_nrows .sql_get_resource_table .sql_remove_resource .sql_add_resource .sql_select_query .sql_create_db .sql_db_get_query .sql_db_execute .sql_schema_version .sql_disconnect .sql_connect_RW .sql_connect_RO .sql_lock_path .sql_cmd .sql_dbfile .sql_file .unlock2 .lock2

#' @import RSQLite
#' @importFrom DBI dbExecute dbSendStatement
#' @import dbplyr
#' @importFrom dplyr %>% tbl select collect summarize filter n left_join
#' @importFrom curl curl_escape
#' @importFrom filelock lock unlock

.formatID <- . %>% collect(Inf) %>% `[[`("rid")

lock.env <- new.env()
lock.env$status <- NA

.lock2 <- function(dbfile, exclusive) {
    if (is.na(lock.env$status)) {
        lock.env$status <- exclusive
        lock(.sql_lock_path(dbfile), exclusive = exclusive)
    } else if (lock.env$status || !exclusive) {
        # Exclusive lock held by a caller is compatible
        # with a subsequent request for a shared lock;
        # we're not escalating privileges here.
        NULL
    } else {
        stop("requested an exclusive lock when caller only holds a shared lock")
    }
}

.unlock2 <- function(loc) {
    if (!is.null(loc)) {
        lock.env$status <- NA
        unlock(loc)
    }
}

.sql_file <-
    function(bfc, file)
{
    file.path(bfccache(bfc), file)
}

.sql_dbfile <-
    function(bfc)
{
    .sql_file(bfc, .CACHE_FILE)
}

.sql_cmd <-
    function(cmd_name, add=FALSE, ...)
{
    sql_cmd_file <-
        system.file(package="BiocFileCache", "schema", "BiocFileCache.sql")
    sql_cmds <- readLines(sql_cmd_file)
    grps <- cumsum(grepl("^--", sql_cmds))
    cmds <- split(sql_cmds, grps)
    names <- vapply(cmds, "[[", character(1), 1)
    cmds <- paste(cmds[[which(names == cmd_name)]], collapse="\n")
    if (add)
        cmds <- sprintf(cmds, ...)
    cmds
}

.sql_lock_path <- 
    function(dbfile)
{
    paste0(dbfile, '.LOCK')
}

.sql_connect_RO <-
    function(dbfile)
{
    ## See notes in AnnotationDbi::dbFileConnect
    ## did not want to import AnnotationDbi in BFC because it is large
    ## overkill for this function
    if (!file.exists(dbfile))
        stop("DB file '", dbfile, "' not found")

    loc <- .lock2(dbfile, exclusive=FALSE)

    if (.Platform$OS.type == "unix") {
        con <- dbConnect(SQLite(), dbname=dbfile, cache_size=64000L,
                         synchronous="off", flags=SQLITE_RO, vfs="unix-none")
    } else {
        ## Use default 'vfs' on Windows.
        con <- dbConnect(SQLite(), dbname=dbfile, cache_size=64000L,
                         synchronous="off", flags=SQLITE_RO)
    }

    list(lock=loc, con=con)
}

.sql_connect_RW <-
    function(dbfile)
{
    ## We also need a RW function to allow writing to the cache

    loc <- .lock2(dbfile, exclusive=TRUE)

    if (.Platform$OS.type == "unix") {
        con <- dbConnect(SQLite(), dbname=dbfile, cache_size=64000L,
                         synchronous="off", vfs="unix-none")
    } else {
        ## Use default 'vfs' on Windows.
        con <- dbConnect(SQLite(), dbname=dbfile, cache_size=64000L,
                         synchronous="off")
    }

    list(lock=loc, con=con)
}

.sql_disconnect <- 
    function(info)
{
    dbDisconnect(info$con)
    .unlock2(info$lock)
}

.sql_schema_version <-
    function(bfc)
{
    tryCatch({
        info <- .sql_connect_RO(.sql_dbfile(bfc))
        con <- info$con
        src <- src_dbi(con)
        tbl <- tbl(src, "metadata") %>% collect(n = Inf)
        }, finally={.sql_disconnect(info)})
    tbl$value[tbl$key=="schema_version"]
}

## R / RSQLite, DBI interface

.sql_db_execute <-
    function(bfc, sql, ..., con)
{
    param <- data.frame(..., stringsAsFactors = FALSE)
    if (nrow(param) == 0L)
        param <- NULL

    if (missing(con)) {
        info <- .sql_connect_RW(.sql_dbfile(bfc))
        con <- info$con
        on.exit(.sql_disconnect(info))
    }
    dbExecute(con, sql, params = param)
}

.sql_db_get_query <-
    function(bfc, sql, ..., con)
{
    param <- data.frame(..., stringsAsFactors = FALSE)
    if (nrow(param) == 0L)
        param <- NULL

    if (missing(con)) {
        info <- .sql_connect_RO(.sql_dbfile(bfc))
        con <- info$con      
        on.exit(.sql_disconnect(info))
    }
    dbGetQuery(con, sql, param)
}

## BiocFileCache / RSQLite interface

.sql_create_db <-
    function(bfc)
{
    fl <- .sql_dbfile(bfc)
    if (!file.exists(fl)) {
        sql <- strsplit(.sql_cmd("-- CREATE_DB"), ";")[[1]]
        tryCatch({
            info <- .sql_connect_RW(.sql_dbfile(bfc))
            con <- info$con
            dbExecute(con, sql[[1]])
            ## update metadata table
            .sql_db_execute(bfc, sql[[2]], con=con)
            package_version <- as.character(packageVersion("BiocFileCache"))
            .sql_db_execute(
                bfc, sql[[3]],
                key = c('schema_version', 'package_version'),
                value = c(.CURRENT_SCHEMA_VERSION, package_version),
                con=con)
            ## create new resource table
            .sql_db_execute(bfc, sql[[4]], con=con)
            dbExecute(con, sql[[5]])
        }, finally={.sql_disconnect(info)})
    }
    .sql_validate_version(bfc)
    fl
}

.sql_select_query <-
    function(bfc, where, ...)
{
    sql <- .sql_cmd("-- SELECT_QUERY")
    cmd <- sprintf(sql, where)
    res <- .sql_db_get_query(bfc, cmd, ...)
    class(res) <- c("tbl_bfc", class(res))
    res
}

.sql_add_resource <-
    function(bfc, rname, rtype, fpath, ext = NA_character_, fname = "unique")
{
    # The connection attempt handles locking if another process is adding a
    # resource at the same time; by trying to connect first, we don't have to
    # worry about whether the choice of temporary file name is thread-safe.
    info <- .sql_connect_RW(.sql_dbfile(bfc))
    on.exit(if (!is.null(info)) { .sql_disconnect(info) })

    rpath <- rep(path.expand(tempfile("", bfccache(bfc))), length(fpath))
    rtype <- unname(rtype)
    dx <- rtype == "relative" | rtype == "web"
    rpath[dx] <- basename(rpath[dx])

    fpath[is.na(fpath)] <- rpath[is.na(fpath)]
    ext[is.na(ext)] <- ""
    bfname <- basename(fpath)
    bfname <- curl_escape(bfname)
    if (fname=="unique") {
        rpath <- sprintf("%s_%s%s", rpath, bfname, ext)
    } else {
        rpath <- sprintf("%s%s",bfname, ext)
    }
    sql <- strsplit(.sql_cmd("-- INSERT"), ";")[[1]]
    con <- info$con
    dbExecute(con, sql[[1]])
    original_rid <- .sql_db_get_query(bfc, sql[[2]], con=con)[["rid"]]
    .sql_db_execute(
        bfc, sql[[3]],
        rname = rname, rtype = rtype, fpath = fpath, rpath = rpath,
        last_modified_time = as.Date(NA_character_), etag = NA_character_,
        expires = NA_character_, con=con
        )
    .sql_db_execute(bfc, sql[[4]], con=con)
    rid <- .sql_db_get_query(bfc, sql[[2]], con=con)[["rid"]]
    dbExecute(con, sql[[5]])

    # Free the file, as .sql_get_rpath() reacquires the lock internally.
    .sql_disconnect(info)
    info <- NULL

    .sql_get_rpath(bfc, setdiff(rid, original_rid))
}

.sql_remove_resource <-
    function(bfc, rid)
{
    sql <- .sql_cmd("-- REMOVE")
    cmd <- sprintf(sql, paste0("'", rid, "'", collapse = ","))
    .sql_db_execute(bfc, cmd)
}

.sql_get_resource_table <-
    function(bfc, rids)
{
    tryCatch({
        info <- .sql_connect_RO(.sql_dbfile(bfc))
        con <- info$con
        src <- src_dbi(con)
        tbl <- tbl(src, "resource")

        if (missing(rids)) {
        } else if (length(rids) == 0) {
            tbl <- tbl %>% dplyr::filter(rid == NA_character_)
        } else if (length(rids) == 1) {
            tbl <- tbl %>% dplyr::filter(rid == rids)
        } else {
            tbl <- tbl %>% dplyr::filter(rid %in% rids)
        }

        ## join metadata
        meta <- setdiff(dbListTables(con), .RESERVED$TABLES)
        for (m in meta)
            tbl <- left_join(tbl, tbl(src, m), by="rid")

        tbl <- tbl %>% collect
    }, finally={.sql_disconnect(info)})
    class(tbl) <- c("tbl_bfc", class(tbl))
    tbl %>% dplyr::select(-id)
}

.sql_get_nrows <-
    function(bfc)
{
    summarize(bfc, n=n()) %>% collect %>% `[[`("n")
}

.sql_get_field <-
    function(bfc, id, field)
{
    stopifnot(all(id %in% .get_all_rids(bfc)))
    sql <- .sql_cmd("-- SELECT_COLUMN")
    cmd <- sprintf(sql, field, paste0("'", id, "'", collapse = ","))
    res <- .sql_db_get_query(bfc, cmd)
    setNames(res[[field]], res[["rid"]])
}

.sql_get_rname <-
    function(bfc, rid)
{
    .sql_get_field(bfc, rid, "rname")
}

.sql_get_rtype <-
    function(bfc, rid)
{
    .sql_get_field(bfc, rid, "rtype")
}

.sql_get_fpath <-
    function(bfc, rid)
{
    .sql_get_field(bfc, rid, "fpath")
}

.sql_get_rpath <-
    function(bfc, rid)
{
    rtype <- .sql_get_rtype(bfc, rid)
    rpath <- .sql_get_field(bfc, rid, "rpath")
    idx <- rtype %in% c("relative", "web")
    rpath[idx] <- file.path(bfccache(bfc), rpath)[idx]
    rpath
}

.sql_set_rpath <-
    function(bfc, rid, rpath)
{
    sql <- .sql_cmd("-- UPDATE_PATH")
    .sql_db_execute(bfc, sql, rid = rid, rpath = rpath)
}

.sql_set_time <-
    function(bfc, rid)
{
    sql <- .sql_cmd("-- UPDATE_TIME")
    cmd <- sprintf(sql, paste0("'", rid, "'", collapse = ","))
    .sql_db_execute(bfc, cmd)
}

.sql_set_rname <-
    function(bfc, rid, rname)

{
    sql <- .sql_cmd("-- UPDATE_RNAME")
    .sql_db_execute(bfc, sql, rid = rid, rname = rname)
}

.sql_set_rtype <-
    function(bfc, rid, rtype)
{
    sql <- .sql_cmd("-- UPDATE_RTYPE")
    .sql_db_execute(bfc, sql, rid = rid, rtype = rtype)
}

.sql_clean_cache <-
    function(bfc, days)
{
    mytbl <- .sql_get_resource_table(bfc) %>%
        dplyr::select(rid, access_time) %>% collect(Inf)
    accessDate <- as.Date(as.character(mytbl$access_time))
    diffTime <- Sys.Date() - accessDate
    mytbl[diffTime > days, 1] %>% .formatID
}

.get_all_rids <-
    function(bfc)
{
    sql <- .sql_cmd("-- SELECT_IDS")
    .sql_db_get_query(bfc, sql)[,1]
}

.get_all_web_rids <-
    function(bfc)
{
    sql <- .sql_cmd("-- SELECT_WEB")
    .sql_db_get_query(bfc, sql)[,1]
}

.sql_get_last_modified <-
    function(bfc, rid)
{
    .sql_get_field(bfc, rid, "last_modified_time")
}

.sql_set_last_modified <-
    function(bfc, rid, last_modified_time)
{
    sql <- .sql_cmd("-- UPDATE_MODIFIED")
    .sql_db_execute(
        bfc, sql, rid = rid, last_modified_time = last_modified_time
    )
}

.sql_get_etag <-
    function(bfc, rid)
{
    .sql_get_field(bfc, rid, "etag")
}

.sql_set_etag <-
    function(bfc, rid, etag)
{
    sql <- .sql_cmd("-- UPDATE_ETAG")
    .sql_db_execute(bfc, sql, rid = rid, etag = etag)
}
.sql_get_expires <-
    function(bfc, rid)
{
    .sql_get_field(bfc, rid, "expires")
}

.sql_set_expires <-
    function(bfc, rid, expires)
{
    sql <- .sql_cmd("-- UPDATE_EXPIRES")
    .sql_db_execute(bfc, sql, rid = rid, expires = expires)
}

.sql_set_fpath <-
    function(bfc, rid, fpath)
{
    sql <- .sql_cmd("-- UPDATE_FPATH")
    .sql_db_execute(bfc, sql, rid = rid, fpath = fpath)
}

.get_rid_filenotfound <-
    function(bfc)
{
    allpaths <- bfcrpath(bfc)
    names(allpaths)[!file.exists(allpaths)]
}

.get_tbl_rid <-
    function(tbl)
{
    tbl %>% .formatID
}

.get_all_colnames <-
    function(bfc)
{
    colnames(.sql_get_resource_table(bfc))
}

.get_nonrelative_ids <-
    function(bfc)
{
    rpaths <- .sql_get_rpath(bfc, bfcrid(bfc))
    res <- startsWith(rpaths, bfccache(bfc))
    names(rpaths)[!res]
}

##
## .sql_meta_*
##

.sql_meta_gets <-
    function(bfc, name, value, ...)
{
    tryCatch({
        info <- .sql_connect_RW(.sql_dbfile(bfc))
        con <- info$con
        dbWriteTable(con, name, value, ...)
    }, finally={.sql_disconnect(info)})
}

.sql_meta_remove <-
    function(bfc, name, ...)
{
    tryCatch({
        info <- .sql_connect_RW(.sql_dbfile(bfc))
        con <- info$con
        if (dbExistsTable(con, name))
            dbRemoveTable(con, name, ...)
    }, finally={.sql_disconnect(info)})
}

.sql_meta <-
    function(bfc, name, ...)
{
    tryCatch({
        info <- .sql_connect_RO(.sql_dbfile(bfc))
        con <- info$con
        if (!dbExistsTable(con, name))
            stop("'", name, "' not found in database")
        dbReadTable(con, name, ...)
    }, finally={.sql_disconnect(info)})
}

.sql_meta_list <-
    function(bfc)
{
    tryCatch({
        info <- .sql_connect_RO(.sql_dbfile(bfc))
        con <- info$con
        res <- dbListTables(con)
        setdiff(res, .RESERVED$TABLES)
    }, finally={.sql_disconnect(info)})
}

.sql_filter_metadata <-
    function(bfc, name, verbose)
{
    df <- bfcmeta(bfc, name)
    rids <- bfcrid(bfc)
    check <- as.character(df$rid) %in% rids
    if (all(!check)) {
        bfcmetaremove(bfc, name)
        vl <- FALSE
    } else if (any(!check)) {
        df <- df[check,]
        bfcmeta(bfc, name, overwrite=TRUE) <- df
        vl <- FALSE
    } else {
        vl <- TRUE
    }
    vl
}
Bioconductor/BiocFileCache documentation built on May 2, 2024, 4:19 p.m.