Nothing
#' @importFrom DBI dbConnect dbDisconnect dbExecute dbGetQuery dbExistsTable
#' @importFrom RSQLite SQLite
#' @importFrom cli cli_progress_along cli_progress_message cli_progress_done
#' @keywords internal
#' @noRd
NULL
#' Connect to the consolidated database
#' @param outdir The output directory
#' @param mode Either "read" or "write". If "read" (default), opens the database as
#' read-only and returns NULL if the database doesn't exist. If "write", creates the
#' database and table if they don't exist and opens with write permissions.
#' @return A database connection, or NULL if mode="read" and database doesn't exist
#' @keywords internal
#' @noRd
db_connect <- function(outdir, mode = c("read", "write")) {
mode <- match.arg(mode)
db_path <- file.path(outdir, "consolidated.db")
# If mode is "read" and database doesn't exist, return NULL
if (mode == "read" && !file.exists(db_path)) {
return(NULL)
}
# Open connection with appropriate flags
if (mode == "read") {
# Open read-only: SQLITE_OPEN_READONLY = 0x00000001
con <- DBI::dbConnect(RSQLite::SQLite(), db_path, flags = RSQLite::SQLITE_RO)
dbExecute(con, "PRAGMA busy_timeout = 3600000;")
} else {
# Open read-write, create if needed (default behavior)
con <- DBI::dbConnect(RSQLite::SQLite(), db_path)
dbExecute(con, "PRAGMA busy_timeout = 3600000;")
# Create table if it doesn't exist (only for write mode)
if (!DBI::dbExistsTable(con, "results")) {
DBI::dbExecute(con, "
CREATE TABLE results (
filename TEXT PRIMARY KEY,
data BLOB NOT NULL,
mtime REAL NOT NULL
)
")
}
}
con
}
#' Store a result in the consolidated database
#' @param con Database connection
#' @param filename The filename (basename) of the result
#' @param rds_data The serialized RDS data as raw bytes
#' @param mtime The modification time (as numeric, seconds since epoch)
#' @keywords internal
#' @noRd
db_store_result <- function(con, filename, rds_data, mtime) {
# Use list() to properly wrap the blob for RSQLite
DBI::dbExecute(con, "
INSERT OR REPLACE INTO results (filename, data, mtime) VALUES (?, ?, ?)
", params = list(filename, list(rds_data), mtime))
}
#' Retrieve a result from the consolidated database
#' @param con Database connection or outdir string. If a string, opens the database from that directory.
#' @param filename The filename (basename) of the result
#' @return The deserialized result list, or `empty_result` if not found
#' @keywords internal
#' @noRd
db_get_result <- function(con, filename) {
# If con is a string, treat it as outdir and open the database
if (is.character(con)) {
con <- db_connect(con)
if (is.null(con)) return(empty_result)
on.exit(DBI::dbDisconnect(con), add = TRUE)
}
result <- DBI::dbGetQuery(con, "
SELECT data FROM results WHERE filename = ?
", params = list(filename))
if (nrow(result) == 0) return(empty_result)
# Since we store raw RDS file contents which may be compressed,
# we use gzcon() to wrap the raw connection for decompression
raw_con <- rawConnection(result$data[[1]])
on.exit(close(raw_con), add = TRUE)
gz_con <- gzcon(raw_con)
readRDS(gz_con)
}
#' List all filenames in the consolidated database
#' @param con Database connection
#' @return Character vector of filenames
#' @keywords internal
#' @noRd
db_list_filenames <- function(con) {
result <- DBI::dbGetQuery(con, "SELECT filename FROM results")
result$filename
}
#' Check if a filename exists in the consolidated database
#' @param con The output directory (or a database connection)
#' @param filename The filename (basename) to check
#' @return TRUE if the filename exists in the database, FALSE otherwise (or if database doesn't exist)
#' @keywords internal
#' @noRd
db_has_result <- function(con, filename) {
# If outdir is a connection, use it directly
if (is.character(con)) {
con <- db_connect(con)
if (is.null(con)) return(FALSE)
on.exit(DBI::dbDisconnect(con), add = TRUE)
}
# Check if the filename exists in the database
# Use EXISTS for better performance than COUNT(*)
result <- DBI::dbGetQuery(con, "
SELECT EXISTS(SELECT 1 FROM results WHERE filename = ? LIMIT 1) as file_exists
", params = list(basename(filename)))
as.logical(result$file_exists[1])
}
#' Consolidate individual RDS files into the SQLite database
#' @param outdir The output directory
#' @return Number of files consolidated
#' @keywords internal
#' @noRd
consolidate_results <- function(outdir) {
# Use a lock file to ensure only one process consolidates at a time
lock_path <- file.path(outdir, ".consolidate.lock")
dir.create(outdir, recursive = TRUE, showWarnings = FALSE)
lock <- filelock::lock(lock_path, timeout = 0)
if (is.null(lock)) {
message("Another process is consolidating. Skipping.")
return(0)
}
on.exit({
filelock::unlock(lock)
unlink(lock_path)
})
# Get all .rds files (not .rds.lock or .rds.tmp)
cli_progress_message("Finding unconsolidated runs")
files_to_consolidate <- list.files(outdir, pattern = "\\.rds$", full.names = TRUE, recursive = TRUE)
cli_progress_done()
if (length(files_to_consolidate) == 0) return(0)
con <- db_connect(outdir, mode = "write")
on.exit(DBI::dbDisconnect(con), add = TRUE)
count <- 0
# Note: Future optimization could batch multiple inserts in a single transaction
# for improved performance when consolidating many files
for (i in cli::cli_progress_along(files_to_consolidate, "Consolidating files")) {
file_path <- files_to_consolidate[i]
# Only consolidate if it's a successful run
if (safe_readRDS(file_path)$OK) {
# Get file info before reading
file_mtime <- file.info(file_path)$mtime
# Read raw file contents directly
rds_data <- readBin(file_path, "raw", n = file.info(file_path)$size)
# Store in database with modification time
filename <- basename(file_path)
db_store_result(con, filename, rds_data, as.numeric(file_mtime))
# Delete the original file
unlink(file_path)
count <- count + 1
# Remove empty directories and any parents; stop as soon as a
# non-empty directory (resulting in file.remove() returning
# FALSE) or outdir is encountered.
dir_path <- dirname(file_path)
while (dir_path != outdir && suppressWarnings(file.remove(dir_path)))
dir_path <- dirname(dir_path)
}
}
count
}
#' Read a result from either individual file or consolidated database
#' @param outdir The output directory
#' @param filename The filename (can be full path or basename)
#' @param con an optional database connection that can be reused
#' @return The result list
#' @keywords internal
#' @noRd
read_result <- function(outdir, filename, con = NULL) {
if (grepl(".consolidated", filename, fixed = TRUE)) # If a .consolidated path,
db_get_result(con %||% outdir, basename(filename)) # read directly from database.
else if (file.exists(filename)) # If filename is a full path,
safe_readRDS(filename) # use it directly.
else # If not found,
db_get_result(con %||% outdir, basename(filename)) # try the database.
}
#' Get modification times for files (including consolidated ones)
#' @param outdir The output directory
#' @param files Character vector of file paths (can include .consolidated paths)
#' @return Named numeric vector of mtimes (seconds since epoch), with file paths as names
#' @keywords internal
#' @noRd
get_file_mtimes <- function(outdir, files) {
mtimes <- numeric(length(files))
names(mtimes) <- files
# Separate real files from consolidated files
is_consolidated <- grepl(".consolidated", files, fixed = TRUE)
# Get mtimes for real files
if (any(!is_consolidated)) {
cli_progress_message("Scanning individual runs")
real_files <- files[!is_consolidated]
real_mtimes <- file.info(real_files, extra_cols = FALSE)$mtime
mtimes[!is_consolidated] <- as.numeric(real_mtimes)
cli_progress_done()
}
# Get mtimes for consolidated files
if (any(is_consolidated)) {
cli_progress_message("Scanning consolidated runs")
con <- db_connect(outdir)
if (!is.null(con)) {
on.exit(DBI::dbDisconnect(con), add = TRUE)
consolidated_files <- files[is_consolidated]
basenames <- basename(consolidated_files)
# Create temporary table with index and filename for efficient JOIN
DBI::dbExecute(con, "CREATE TEMP TABLE requested_files (idx INTEGER NOT NULL, filename TEXT NOT NULL)")
# Insert all requested basenames with their indices in batch
DBI::dbExecute(con, "
INSERT INTO requested_files (idx, filename) VALUES (?, ?)
", params = list(seq_along(basenames), basenames))
# Query mtimes using JOIN, ordered by index for optimal performance
result <- DBI::dbGetQuery(con, "
SELECT r.mtime
FROM requested_files rf
LEFT JOIN results r ON rf.filename = r.filename
ORDER BY rf.idx
")
# Assign results directly using the index
mtimes[is_consolidated] <- result$mtime
# Clean up temporary table
DBI::dbExecute(con, "DROP TABLE requested_files")
}
cli_progress_done()
}
# Convert to POSIXct for compatibility with existing code
as.POSIXct(mtimes, origin = "1970-01-01", tz = "UTC")
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.