R/arrow.R

Defines functions warn_problems callback_write_parquet prepare_dataset_base get_chunk_paths write_single_partition_dataset read_delim_chunked_to_dataset metadata_parquet

Documented in callback_write_parquet get_chunk_paths metadata_parquet prepare_dataset_base read_delim_chunked_to_dataset warn_problems write_single_partition_dataset

#' Query metadata of a Parquet file
#'
#' Query column names and column types of an Apache Parquet data file.
#'
#' @param file Path to a Parquet file
#'
#' @return
#' 
#' A data frame with two columns:
#'
#' \describe{
#'   \item{`name`}{Column names}
#'   \item{`type`}{Column types, as returned by the
#'   [`ParquetFileReader$GetSchema()`][arrow::ParquetFileReader]
#'   method. This does not directly correspond to navtive R data types
#'   but should be informative nevertheless.}
#' }
#'
#' @author AndrĂ¡s Svraka
#'
#' @seealso [arrow::write_parquet()], [arrow::read_parquet()]
#'
#' @examples
#' tmp <- tempfile()
#' arrow::write_parquet(iris, tmp)
#' metadata_parquet(tmp)
#'
#' @export
metadata_parquet <- function(file) {
  pq <- arrow::ParquetFileReader$create(file)

  pq_schema <- pq$GetSchema()
  pq_names <- pq_schema$names
  pq_types <- sapply(pq_schema$fields, function(x) x$type$ToString())

  df <- tibble::tibble(name = pq_names, type = pq_types)

  df
}

#' Read a delimited file by chunks and write into Hive-style Parquet files
#'
#' Read a single delimited file in chunks using
#' [readr::read_delim_chunked()] and save chunks in Parquet files
#' under a simple Hive-style partitioned directory (i.e.
#' `dataset_base_name/chunk=XX/data.parquet`) to be used as the source
#' of a multi-file Apache Arrow dataset.
#'
#' @inheritParams readr::read_delim_chunked
#' @param dataset_base_name Path of the directory to write the Hive
#'   partitioned Parquet files to.
#' @param file_nrow Number of data rows in `file`. As there is no
#'   reliable and cross-platform way to get the exact number of lines
#'   in a compressed file, this has to be set manually to calculate
#'   the number of chunks and the names of partitions. Use `wc` on a
#'   Unix-like system to determine row count (`zcat file.gz | wc -l`,
#'   or similar). Only count rows considered as data, otherwise the
#'   dataset's partitioning scheme will have empty directories. This
#'   does not result in errors but it is undesirable for
#'   human-readability. Subtract from the row count any header row(s),
#'   or the number of lines skipped with the `skip` (again, `zcat
#'   file.gz | head`, or similar can be useful).
#' @param processing_function A function that takes each chunk and
#'   does arbitrary data processing on it before writing the resulting
#'   data frame into its Parquet partition.
#' @param chunk_col_name Name of the column indicating partition
#'   numbers in the Hive-style partition structure.
#' @param chunk_file_name Name of the individual Parquet files in the
#'   Hive-style partition structure.
#' @param ... Passed to [readr::read_delim_chunked()]
#'
#' @details
#'
#' The main goal of this function is to read a single, large,
#' unpartitioned delimited file into a partitioned Arrow dataset on a
#' RAM limited machine. Therefore these Arrow partitions have no
#' inherent meaning. Although `processing_function` allows flexible
#' changes during reading in, this function was intended to be used in
#' workflows where only minimal data processing is done and the
#' original structure of the delimited files is kept unchanged. Thus
#' `read_delim_chunked_to_dataset` will create a partitioning that
#' keeps the original row order from the delimited file. However,
#' within partition ordering can be changed through
#' `processing_function`.
#'
#' @return
#'
#' Invisibly return a tibble with parsing problems caught by
#' \pkg{readr} (see [readr::problems()]). `NULL` if no parsing
#' problems occurred.
#'
#' @seealso
#'
#' `vignette(topic = "dataset", package = "arrow")` on how to use
#' multi-file Apache Arrow datasets.
#'
#' @export
read_delim_chunked_to_dataset <- function(file,
                                          dataset_base_name,
                                          file_nrow, chunk_size,
                                          processing_function = NULL,
                                          chunk_col_name = "chunk",
                                          chunk_file_name = "data.parquet",
                                          ...) {
  stopifnot(requireNamespace("arrow", quietly = TRUE))

  chunk_paths <- get_chunk_paths(dataset_base_name, file_nrow,
                                 chunk_size, chunk_col_name,
                                 chunk_file_name)

  prepare_dataset_base(dataset_base_name, chunk_paths)

  out <- readr::read_delim_chunked(
    file,
    callback = readr::DataFrameCallback$new(
      callback_write_parquet(chunk_paths, chunk_size, processing_function)
    ),
    chunk_size = chunk_size,
    ...
  )

  warn_problems(out)
}

#' @describeIn read_delim_chunked_to_dataset
#'
#' @param df A data frame
#'
#' @export
write_single_partition_dataset <- function(df, dataset_base_name,
                                           chunk_col_name = "chunk",
                                           chunk_file_name = "data.parquet") {
  stopifnot(requireNamespace("arrow", quietly = TRUE))

  chunk_paths <- get_chunk_paths(dataset_base_name, nrow(df),
                                 nrow(df), chunk_col_name,
                                 chunk_file_name)

  prepare_dataset_base(dataset_base_name, chunk_paths)

  arrow::write_parquet(df, chunk_paths)
}

#' Create Hive-style partition paths
#'
#' To be used with [read_delim_chunked_to_dataset()].
#'
#' @inheritParams read_delim_chunked_to_dataset
#'
#' @return A character vector with the paths to the partitions.
#' @keywords internal
get_chunk_paths <- function(dataset_base_name, file_nrow, chunk_size,
                            chunk_col_name = "chunk",
                            chunk_file_name = "data.parquet") {
  chunk_numbers <- seq_len(ceiling(file_nrow / chunk_size))

  # Pad partition numbers with zeros to keep the dataset's ordering
  # after reading with back `open_dataset()`
  max_nchar <- nchar(as.character(max(chunk_numbers)))
  fmt <- sprintf("%%0.%si", max_nchar)
  chunk_numbers <- sprintf(fmt, chunk_numbers)

  hive_name <- paste0(chunk_col_name, "=", chunk_numbers)

  file.path(dataset_base_name, hive_name, chunk_file_name)
}

#' Prepare directory structure for an Arrow dataset
#'
#' To be used with [read_delim_chunked_to_dataset()].
#'
#' @inheritParams read_delim_chunked_to_dataset
#' @keywords internal
prepare_dataset_base <- function(dataset_base_name, chunk_paths) {
  # In order to prevent conflicting chunks, first we clean up
  # everything.
  if (dir.exists(dataset_base_name)) unlink(dataset_base_name, recursive = TRUE)
  dir.create(dataset_base_name)

  # Partitioning directories need be created recursively.
  x <- dirname(chunk_paths)
  lapply(x, dir.create)
  invisible(x)
}

#' Callback function to write Parquet partition
#'
#' A function factory that creates a callback function for
#' [readr::read_delim_chunked()].
#'
#' @inheritParams read_delim_chunked_to_dataset
#'
#' @return A function to be used in [read_delim_chunked_to_dataset()].
#'
#' @seealso <https://stackoverflow.com/a/49241426>
#' @keywords internal
callback_write_parquet <- function(chunk_paths, chunk_size,
                                   processing_function = NULL) {
  function(x, pos) {
    chunk_number <- (pos %/% chunk_size) + 1

    problems <- readr::problems(x)

    if (!is.null(processing_function)) {
      x <- processing_function(x)
    }

    arrow::write_parquet(x, sink = chunk_paths[chunk_number])

    return(problems)
  }
}

#' Report parsing failures
#'
#' Helper function to return parsing failures caught by \pkg{readr} in
#' [read_delim_chunked_to_dataset()]. Idea taken from an unexported
#' function in \pkg{readr} (`warn_problems`) but implementation is
#' much simplified here.
#'
#' @param x A data frame
#' @keywords internal
warn_problems <- function(x) {
  n_problems <- nrow(x)

  if (n_problems != 0) {
    warning(
      n_problems, " parsing failure", if (n_problems > 1) "s", ".\n",
      paste(format(x), collapse = "\n"),
      call. = FALSE, immediate. = TRUE, noBreaks. = TRUE
    )

    invisible(x)
  } else {
    invisible(NULL)
  }
}
svraka/asmisc documentation built on June 12, 2025, 12:04 p.m.