marrow <- function(.x, .f, ..., .path, .partitioning = c(),
.format = "parquet", output) {
arrow_temp_dirs <- purrr::map_chr(1:length(.x), ~tempfile())
purrr::walk2(.x = .x, .y = arrow_temp_dirs, .f = function(x, y, ...){
if(missing(...)) {
ft <- .f(x)
} else {
ft <- .f(x, ...)
}
if(nrow(ft) == 0) stop("Cannot handle zero-row results.")
arrow::write_dataset(ft, path = y, format = "parquet")
}, ...)
arrow_datasets <- purrr::map(arrow_temp_dirs, arrow::open_dataset, format = "parquet")
arrow_combined <- arrow::open_dataset(arrow_datasets)
arrow::write_dataset(arrow_combined, path = .path,
partitioning = .partitioning, format = .format)
unlink(arrow_temp_dirs, recursive = T)
otpt <- switch(output,
dir = .path,
ds = arrow::open_dataset(.path),
files = list.files(.path, recursive = T, full.names = T))
return(otpt)
}
#' Iteratively collate output of function into an Arrow dataset out of memory
#'
#'`r lifecycle::badge('experimental')`
#' map + arrow: iterate over a function and collate the results into
#' an Arrow dataset. This happens without the whole dataset being in memory,
#' so is suitable for large data objects. The function must return a data.frame or
#' tibble. The returned value is a path to the directory containing the
#' Arrow dataset.
#'
#' @param .x vector or list of values for .f to iterate over
#' @param .f function; must return a data.frame/tibble
#' @param ... other arguments to .f
#' @param .path path to directory where collated Arrow dataset will be stored.
#' will be created if it does not exist
#' @param .partitioning character vector of columns to use for partitioning.
#' Columns must exist in output of .f.
#' @param .format "parquet" (the default) or "arrow".
#' @describeIn marrow Return path to directory containing dataset
#'
#' @return path to new dataset directory; character string of length one.
#' @examples
#' months <- unique(airquality$Month)
#' td <- tempdir()
#' part_of_aq <- function(month) {
#' airquality[airquality$Month==month,]
#' }
#'
#' aq_arrow <- purrrow:::marrow_dir(months, part_of_aq,
#' .path = td)
#' @export
marrow_dir <- function(.x, .f, ..., .path, .partitioning = c(),
.format = "parquet") {
marrow(.x, .f, ..., .path = .path, .partitioning = .partitioning,
.format = .format, output = "dir")
}
#' @describeIn marrow Return Arrow Dataset
#' @return an Arrow Dataset
#' @export
marrow_ds <- function(.x, .f, ..., .path, .partitioning = c(),
.format = "parquet") {
marrow(.x, .f, ..., .path = .path, .partitioning = .partitioning,
.format = .format, output = "ds")
}
#' @describeIn marrow Return paths to all files in dataset dir
#' @return character vector containing paths to all files in dataset dir
#' @export
marrow_files <- function(.x, .f, ..., .path, .partitioning = c(),
.format = "parquet") {
marrow(.x, .f, ..., .path = .path, .partitioning = .partitioning,
.format = .format, output = "files")
}
marrow2_dir <- function(.x, .y, .f, ..., .path, .partitioning = c(), .format = "parquet") {
arrow_temp_dirs <- file.path(tempdir(),
purrr::map_chr(1:.x, ~tempfile()))
purrr::pwalk(.l = list(.x, .y, arrow_temp_dirs), ~{
ft <- .f(..1, ..2, ...)
stopifnot(is.data.frame(ft))
arrow::write_dataset(ft, path = ..3, format = "parquet")
})
arrow_datasets <- purrr::map(arrow_temp_dirs, arrow::open_dataset, format = "parquet")
arrow_combined <- arrow::open_dataset(arrow_datasets)
arrow::write_dataset(arrow_combined, path = .path,
partitioning = .partitioning, format = .format)
unlink(arrow_temp_dirs, recursive = T)
return(.path)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.