tools/process-cran.R

# Provides support to find and index CRAN resources, supports running
# locally or in Spark clusters.
#
# Local:
#   cran_index <- cran_find_local()
#
# Cluster:
#   library(sparklyr)
#   sc <- spark_connect(master = "yarn", config = cran_find_config(10))
#
#   data <- cran_find_resources(sc, 10^2, 10^1)
#   data <- cran_find_resources(sc, 10^3, 10^2)
#   data <- cran_find_resources(sc, 10^5, 10^4)
#
#   cran_index <- data %>% collect()
#
# Saving:
#   cran_save_dataset(cran_index)
#   cran_clean_dataset()

cran_process_file <- function(package_path, file_path) {
  dataset_title <- NULL
  file_name <- basename(file_path)
  dataset_name <- tools::file_path_sans_ext(file_name)
  if (tools::file_ext(file_name) %in% c("rda", "RData")) {
    doc_file <- file.path(package_path, "man", paste0(dataset_name, ".Rd"))
    if (file.exists(doc_file)) {
      dataset_doc <- tools::parse_Rd(doc_file)
      dataset_title <- Filter(function(e) identical(attr(e, "Rd_tag"), "\\title"), dataset_doc)
      if (length(dataset_title) > 0) {
        dataset_title <- gsub(
          "\n|^ *\\\"?[ \n]*|[ \n]*\\\"? +$",
          "",
          paste(as.character(dataset_title[[1]]), collapse = " "))
        dataset_title <- gsub("  +", " ", dataset_title)

        dataset_content <- tryCatch({
          get(load(file_path))
        }, error = function(e) {
          stop("Failed to load '", file_name, "'")
        })

        dataset_rows <- tryCatch({
          nrow(dataset_content)
        }, error = function(e) {
          stop("Failed to retrieve rows for '", file_name, "' and class '", class(dataset_content)[[1]], "'")
        })
        if (typeof(dataset_rows) != "integer" || length(dataset_rows) != 1) dataset_rows <- -1L

        dataset_cols <- tryCatch({
          ncol(dataset_content)
        }, error = function(e) {
          stop("Failed to retrieve cols for '", file_name, "' and class '", class(dataset_content)[[1]], "'")
        })
        if (typeof(dataset_cols) != "integer" || length(dataset_cols) != 1) dataset_cols <- -1L

        dataset_class <- class(dataset_content)[[1]]
        if (typeof(dataset_class) != "character" || length(dataset_class) != 1) dataset_class <- typeof(dataset_content)

        rm(dataset_content)
      }
    }
  }

  if (is.null(dataset_title)) {
    data.frame(name = c(), description = c(), rows = c(), cols = c(), class = c())
  }
  else {
    data.frame(
      name = paste(basename(package_path), dataset_name, sep = ":"),
      description = dataset_title,
      rows = dataset_rows,
      cols = dataset_cols,
      class = dataset_class)
  }
}

cran_process_package <- function(package) {
  results <- data.frame(name = c(), description = c(), rows = c(), cols = c(), class = c())

  if (!dir.exists(file.path("packages", package))) {
    download.packages(package, "packages", repos = "https://cran.rstudio.com/")

    tar <- dir("packages", pattern = "*.tar.gz", full.names = TRUE)[1]
    untar(tar, exdir = "packages")

    unlink("packages/*.gz")
  }

  package_path <- file.path("packages", package)
  dataset_paths <- dir(file.path(package_path, "data"), full.names = TRUE)
  for (dataset_path in dataset_paths) {
    new_result <- tryCatch({
      cran_process_file(package_path, dataset_path)
    }, error = function(e) {
      data.frame(name = paste("error", package, sep = ":"), description = e$message, rows = -1L, cols = -1L, class = "")
    })

    results <- rbind(
      results,
      new_result
    )
  }

  results
}

cran_process_packages <- function(packages) {
  if (!dir.exists("packages")) dir.create("packages")

  results <- data.frame(name = c(), description = c(), rows = c(), cols = c(), class = c())

  for (package in packages) {
    new_result <- tryCatch({
      cran_process_package(package)
    }, error = function(e) {
      data.frame(name = paste("error", package, sep = ":"), description = e$message, rows = -1L, cols = -1L, class = "")
    })

    results <- rbind(
      results,
      new_result
    )
  }

  results
}

cran_find_resources <- function(sc,
                               samples = 2,
                               repartition = sc$config[["sparklyr.shell.num-executors"]]) {
  pkgnames <- available.packages()[,1]

  packages <- copy_to(
    sc,
    data.frame(package = pkgnames[1:samples]),
    repartition = ifelse(is.null(repartition), 0, repartition),
    overwrite = T)

  # package dependencies
  context <- list(
    cran_process_packages = cran_process_packages,
    cran_process_package = cran_process_package,
    cran_process_file = cran_process_file
  )

  packages %>% spark_apply(
    function(df, context) {
      for (name in names(context)) assign(name, context[[name]], envir = .GlobalEnv)
      cran_process_packages(df$package)
    },
    context = context,
    columns = list(name = "character", description = "character", rows = "integer", cols = "integer", class = "character"),
    name = "cran_resources")
}

cran_find_local <- function(samples = 2) {
  cran_process_packages(pkgnames[1:samples])
}

cran_find_config <- function(workers = 3, worker_cpus = 8) {
  config <- spark_config()

  config["sparklyr.shell.driver-memory"] <- "8g"
  config["sparklyr.shell.executor-memory"] <- "1g"
  config["sparklyr.shell.executor-cores"] <- 1
  config["sparklyr.shell.num-executors"] <- workers * worker_cpus
  config["spark.speculation"] <- TRUE
  config["spark.speculation.multiplier"] <- 4
  config["spark.memory.fraction"] <- 0.8

  config
}

cran_save_dataset <- function(cran_index) {
  if (!dir.exists("data")) dir.create("data")

  cranfiles <- dplyr::transmute(
    cran_index,
    package = gsub(":.*", "", name),
    dataset = gsub(".*:", "", name),
    description = description,
    rows = rows,
    cols = cols,
    class = class
  )

  save(cranfiles, file = "data/cranfiles.rda")
}

cran_clean_dataset <- function(cran_index) {
  cranfiles <- get(load("data/cranfiles.rda"))
  cranfiles <- cranfiles[cranfiles$package != "error" & cranfiles$rows > 0 & cranfiles$cols > 0,]
  cranfiles$metadata <- sapply(1:nrow(cranfiles), function(e) paste0('{"rows":', cranfiles[e,]$rows, ',"cols":', cranfiles[e,]$cols, '}'))
  save(cranfiles, file = "data/cranfiles.rda")
}
javierluraschi/pins documentation built on July 15, 2019, 1:21 p.m.