R/cran-data-archive.R

#' Trawl a local CRAN archive and extract statistics from all packages
#'
#' @param path Path to local archive of R packages, either as source
#' directories, or '.tar.gz' files such as in a CRAN mirror.
#' @param archive If `TRUE`, extract statistics for all packages in the
#' `/Archive` sub-directory, otherwise only statistics for main directory (that
#' is, current packages only).
#' @param prev_results Result of previous call to this function, if available.
#' Submitting previous results will ensure that only newer packages not present
#' in previous result will be analysed, with new results simply appended to
#' previous results. This parameter can also specify a file to be read with
#' `readRDS()`.
#' @param results_file Can be used to specify the name or full path of a `.Rds`
#' file to which results should be saved once they have been generated. The
#' '.Rds' extension will be automatically appended, and any other extensions
#' will be ignored.
#' @param chunk_size Divide large archive trawl into chunks of this size, and
#' save intermediate results to local files. These intermediate files can be
#' combined to generate a single `prev_results` file, to enable jobs to be
#' stopped and re-started without having to recalculate all results. These files
#' will be named `pkgstats-results-N.Rds`, where "N" incrementally numbers each
#' file.
#' @param num_cores Number of machine cores to use in parallel, defaulting to
#' single-core processing.
#' @param save_full If `TRUE`, full \link{pkgstats} results are saved for each
#' package to files in `results_path`.
#' @param save_ex_calls If `TRUE`, the results of the `external_calls` component
#' are saved for each package to files in `results_path` (only if `save_full =
#' FALSE`).
#' @param results_path Path to save intermediate files generated by the
#' `chunk_size` parameter described above.
#'
#' @return A `data.frame` object with one row for each package containing
#' summary statistics generated from the \link{pkgstats_summary} function.
#'
#' @family archive
#' @export
#' @examples
#' # Create fake archive directory with single tarball:
#' f <- system.file ("extdata", "pkgstats_9.9.tar.gz", package = "pkgstats")
#' tarball <- basename (f)
#'
#' archive_path <- file.path (tempdir (), "archive")
#' if (!dir.exists (archive_path)) {
#'     dir.create (archive_path)
#' }
#' path <- file.path (archive_path, tarball)
#' file.copy (f, path)
#' tarball_path <- file.path (archive_path, "tarballs")
#' dir.create (tarball_path, recursive = TRUE)
#' file.copy (path, file.path (tarball_path, tarball))
#' \dontrun{
#' out <- pkgstats_from_archive (tarball_path)
#' }
pkgstats_from_archive <- function (path,
                                   archive = TRUE,
                                   prev_results = NULL,
                                   results_file = NULL,
                                   chunk_size = 1000L,
                                   num_cores = 1L,
                                   save_full = FALSE,
                                   save_ex_calls = FALSE,
                                   results_path = fs::path_temp ()) {

    requireNamespace ("hms")
    requireNamespace ("parallel")
    requireNamespace ("callr")

    checkmate::assert_string (path)
    checkmate::assert_directory_exists (path)
    if (!is.null (prev_results)) {
        checkmate::assert_string (prev_results)
        checkmate::assert_file_exists (prev_results)
    }
    if (!is.null (results_file)) {
        checkmate::assert_string (results_file)
    }
    checkmate::assert_int (chunk_size, lower = 1L)
    checkmate::assert_int (num_cores, lower = 1L)
    checkmate::assert_logical (archive)
    checkmate::assert_scalar (archive)
    checkmate::assert_logical (save_full)
    checkmate::assert_scalar (save_full)
    checkmate::assert_logical (save_ex_calls)
    checkmate::assert_scalar (save_ex_calls)
    checkmate::assert_string (results_path)

    if (is.character (prev_results)) {
        checkmate::assert_file_exists (prev_results)
        prev_results <- readRDS (prev_results)
    }

    if (!is.null (prev_results)) {
        if (!identical (names (prev_results), names (pkgstats_summary ()))) {
            stop (
                "'prev_results' must contain a ",
                "'data.frame' of 'pkgstats' summaries"
            )
        }
    }

    res <- NULL
    results_files <- NULL
    out <- prev_results

    flist <- list_archive_files (path, archive)
    if (all (grepl ("\\.tar\\.gz$", flist))) {
        flist <- rm_prev_files (flist, prev_results)
    }
    flist <- exclude_these_tarballs (flist)
    flist <- rm_tars_with_different_desc (flist)
    npkgs <- length (flist)

    if (npkgs > 0) {

        n <- ceiling (npkgs / chunk_size)
        n <- factor (rep (seq (n), each = chunk_size)) [seq (npkgs)]
        flist <- split (flist, f = n)

        message (
            "Starting trawl of ", npkgs,
            " files in ", length (flist), " chunks"
        )

        results_path <- expand_path (results_path)
        if (!fs::dir_exists (results_path)) {
            fs::dir_create (results_path, recurse = TRUE)
        }

        index <- 1 # name of temporary files
        pt0 <- proc.time ()

        for (f in flist) {

            if (num_cores > 1L) {

                res <- parallel::mclapply (f, function (i) {

                    one_summary_from_archive (
                        i,
                        save_full,
                        save_ex_calls,
                        results_path
                    )

                }, mc.cores = num_cores)

            } else {

                res <- lapply (f, function (i) {

                    one_summary_from_archive (
                        i,
                        save_full,
                        save_ex_calls,
                        results_path
                    )
                })
            }

            fname <- fs::path (
                results_path,
                paste0 ("pkgstats-results-", index, ".Rds")
            )
            saveRDS (do.call (rbind, res), fname)
            results_files <- c (results_files, fname)

            archive_trawl_progress_message (index, chunk_size, npkgs, pt0)
            index <- index + 1
        }

        res <- do.call (rbind, lapply (results_files, readRDS))
    }

    out <- rbind (out, res)
    out <- out [which (!is.na (out$package)), ]
    rownames (out) <- NULL

    if (!is.null (results_files)) {
        fs::file_delete (results_files)
    }

    if (!is.null (res) && !is.null (results_file)) {

        results_file <- archive_results_file_name (results_file)
        saveRDS (out, results_file)
    }

    invisible (out)
}

list_archive_files <- function (path, recursive = FALSE) {

    tarballs <- grepl ("tarballs", path)
    if (!tarballs) {
        flist <- fs::dir_ls (path, full.names = TRUE, recurse = FALSE)
        tarballs <- any (grepl ("tarballs", flist))
        if (tarballs) {
            path <- fs::path (path, "tarballs")
        }
    }

    if (!fs::dir_exists (path)) {
        stop ("[", path, "] directory does not exist")
    }

    if (tarballs) {
        flist <- fs::dir_ls (
            path,
            recurse = recursive,
            regexp = "\\.tar\\.gz$"
        )
    } else {
        flist <- fs::dir_ls (
            path,
            recurse = FALSE
        )
        flist <- flist [which (fs::dir_exists (flist))]
        # Reduce to directories with "DESCRIPTION" files
        desc_paths <- fs::path (flist, "DESCRIPTION")
        flist <- flist [which (fs::file_exists (desc_paths))]
    }

    return (expand_path (flist))
}

#' Remove files for which results have already been generated
#' @param flist Full paths to all tarball files to be analysed
#' @param prev_results `data.frame` of previous results
#' @return Modified version of `flist`, after removing any entires present in
#' `prev_results`.
#' @noRd
rm_prev_files <- function (flist, prev_results) {

    if (!is.null (prev_results)) {

        if (is.character (prev_results)) {
            if (length (prev_results) > 1) {
                stop ("prev_results must be a single-length character")
            }
            if (!fs::file_exists (prev_results)) {
                stop ("file [", prev_results, "] does not exist")
            }
            prev_results <- tryCatch (readRDS (prev_results),
                error = function (e) e
            )
            if (methods::is (prev_results, "error")) {
                stop ("Unable to read prev_results: ", prev_results$message)
            }
            prev_results <-
                prev_results [which (!is.na (prev_results$package)), ]
        }

        tars <- basename (flist)

        prev_tars <- paste0 (
            prev_results$package,
            "_",
            prev_results$version,
            ".tar.gz"
        )

        flist <- flist [which (!tars %in% prev_tars)]
    }

    return (flist)
}

#' Some archive tarballs have version numbers in 'DESCRIPTION' files that differ
#' from the tarball numbers. This function re-maps the tarball names to match
#' the internal version numbers, to ensure these tarball file names are removed
#' as having been processed.
#' @noRd
rm_tars_with_different_desc <- function (flist) {

    # first is a grep pattern; second is direct replacement
    dat <- list (
        c ("acepack\\_1\\.1\\.tar\\.gz$", "acepack_1.0.4.tar.gz"),
        c ("bats\\_0\\.1\\-3\\.tar\\.gz$", "bats-0.1-2.tar.gz"),
        c ("HTML\\_0\\.4\\-1\\.tar\\.gz$", "HTML-0.4.tar.gz"),
        c ("survival5\\_1\\.0\\.tar\\.gz$", "survival5-1.0-0.tar.gz"),
        c ("timeslab\\_1\\.0\\-1\\.tar\\.gz$", "timeslab_1.0.tar.gz")
    )

    for (d in dat) {
        index <- grep (d [1], flist)
        if (length (index) == 1L) {
            flist <- flist [-index]
        }
    }

    return (flist)
}

#' These packages fail on main 'pkgstats' call, and can not be processed at all.
#'
#' The 'dse' and 'VR' tarballs have multiple packages in sub-directories with
#' multiple 'DESCRIPTION' files, so can not be processed as single packages, and
#' thus do not return any package names or versions.
#'
#' @noRd
exclude_these_tarballs <- function (flist) {

    exclude <- c (
        "dse_R2000.4-1.tar.gz",
        "dse_R2000.6-1.tar.gz",
        "VR_5.3pl037-1.tar.gz",
        "VR_5.3pl037-2.tar.gz",
        "VR_6.1-4.tar.gz"
    )
    index <- lapply (exclude, function (i) grep (i, flist, fixed = TRUE))
    index <- unlist (index)
    if (length (index) > 0L) {
        flist <- flist [-index]
    }

    return (flist)
}

#' Apply 'pkgstats' to one path and return the summary
#'
#' The archive trawl often stops for reasons which are not reproducible when
#' running individual calls. This therefore uses `callr` to run call processes
#' in the background, and terminate after 5 minutes with an error.
#' @noRd
one_summary_from_archive <- function (path, save_full,
                                      save_ex_calls, results_path) {

    logfiles <- list (
        stdout = fs::path (fs::path_temp (), "pkgstats-stdout"),
        stderr = fs::path (fs::path_temp (), "pkgstats-stderr")
    )
    if (fs::file_exists (logfiles$stdout)) {
        fs::file_delete (logfiles$stdout)
    }
    if (fs::file_exists (logfiles$stderr)) {
        fs::file_delete (logfiles$stderr)
    }

    ps <- callr::r_bg (
        func = pkgstats::pkgstats,
        args = list (path = path),
        stdout = logfiles$stdout,
        stderr = logfiles$stderr,
        package = TRUE
    )

    p0 <- proc.time ()
    elapsed <- proc.time () [3] - p0 [3]
    timeout <- 300
    while (ps$is_alive () && elapsed < timeout) {
        ps$wait (timeout = 10)
        elapsed <- proc.time () [3] - p0 [3]
    }
    if (elapsed < (timeout - 1)) {
        s <- tryCatch (ps$get_result (), error = function (e) NULL)
    } else {
        ps$kill ()
        s <- NULL
    }

    tryCatch (fs::file_delete (logfiles$stdout), error = function (e) NULL)
    tryCatch (fs::file_delete (logfiles$stderr), error = function (e) NULL)

    if (save_full || save_ex_calls) {
        pkg <- utils::tail (decompose_path (path) [[1]], 1L)
        pkg <- gsub ("\\.tar\\.gz$", ".Rds", pkg)
        if (save_full) {
            saveRDS (s, fs::path (results_path, pkg))
        } else if (save_ex_calls) {
            saveRDS (
                s$external_calls,
                fs::path (results_path, pkg)
            )
        }
    }

    summ <- tryCatch (
        pkgstats::pkgstats_summary (s),
        error = function (e) NULL
    )

    if (is.null (summ) || is.na (summ$package)) { # pkgstats failed
        summ <- pkgstats_summary () # null summary
        pkg_vers <- get_pkg_version (path)
        summ ["package"] <- pkg_vers [1]
        summ ["version"] <- pkg_vers [2]
    }

    return (summ)
}

archive_trawl_progress_message <- function (index, chunk_size, npkgs, pt0) {

    prog <- index * chunk_size / npkgs
    prog_fmt <- format (100 * prog, digits = 2)
    pt1 <- as.integer ((proc.time () - pt0) [3])
    t_per_file <- pt1 / (index * chunk_size)
    t_total <- t_per_file * npkgs
    t_rem <- hms::hms (t_total - pt1)

    ndone <- min (c (npkgs, index * chunk_size))

    message (
        "[", ndone, " / ", npkgs,
        "]  = ", prog_fmt, "%; (elapsed, remaining) = (",
        pt1, ", ", t_rem, ")"
    )
}

#' Check and convert 'results_file' to '.Rds' in an existing directory
#' @noRd
archive_results_file_name <- function (results_file) {

    if (!grepl (.Platform$file.sep, results_file)) {
        results_file <- fs::path (".", results_file)
    }
    results_file <- expand_path (results_file)

    results_path <- gsub (
        basename (results_file), "",
        results_file
    )
    results_path <- expand_path (results_path)
    if (!fs::dir_exists (results_path)) {
        stop ("Directory [", results_path, "] does not exist")
    }

    results_file <- basename (results_file)
    results_file <- tools::file_path_sans_ext (results_file)
    results_file <- fs::path (
        results_path,
        paste0 (results_file, ".Rds")
    )

    return (results_file)
}

#' Separate package name and version from a tarball name
#'
#' This is only used to fill in 'package' and 'version' columns of packages
#' which fail main 'pkgstats' call, to fill those columns regardless.
#'
#' @param path Full path to local tarball
#' @noRd
get_pkg_version <- function (path) {

    pkg <- basename (path)
    v_i <- regexpr ("\\_([0-9]|[[:punct:]]).*\\.tar\\.gz$", pkg)
    v <- gsub ("^\\_|\\.tar\\.gz$", "", regmatches (pkg, v_i))
    pkg <- substr (pkg, 1, v_i - 1)

    c (pkg, v)
}
mpadge/pkgstats documentation built on Feb. 28, 2025, 5:40 a.m.