R/read-parquet.R

Defines functions read_parquet_column_chunk read_parquet_row_group dump_connection post_process_read_result read_parquet

Documented in read_parquet

#' Read a Parquet file into a data frame
#'
#' Converts the contents of the named Parquet file to a R data frame.
#'
#' @param file Path to a Parquet file. It may also be an R connection,
#'   in which case it first reads all data from the connection, writes
#'   it into a temporary file, then reads the temporary file, and
#'   deletes it. The connection might be open, it which case it must be
#'   a binary connection. If it is not open, then `read_parquet()` will
#'   open it and also close it in the end.
#' @param col_select Columns to read. It can be a numeric vector of column
#'   indices, or a character vector of column names. It is an error to
#'   select the same column multiple times. The order of the columns in
#'   the result is the same as the order in `col_select`.
#' @param options Nanoparquet options, see [parquet_options()].
#' @return A `data.frame` with the file's contents.
#' @export
#' @seealso See [write_parquet()] to write Parquet files,
#'   [nanoparquet-types] for the R <-> Parquet type mapping.
#'   See [read_parquet_info()], for general information,
#'   [read_parquet_schema()] for information about the
#'   columns, and [read_parquet_metadata()] for the complete metadata.
#' @examples
#' file_name <- system.file("extdata/userdata1.parquet", package = "nanoparquet")
#' parquet_df <- nanoparquet::read_parquet(file_name)
#' print(str(parquet_df))

read_parquet <- function(file, col_select = NULL,
												 options = parquet_options()) {
	if (inherits(file, "connection")) {
		tmp <- tempfile(fileext = ".parquet")
		on.exit(unlink(tmp), add = TRUE)
		dump_connection(file, tmp)
		file <- tmp
	}
  file <- path.expand(file)

	if (!is.null(col_select)) {
		stopifnot(is.numeric(col_select) || is.character(col_select))
		col_select <- col_select[!is.na(col_select)]
		col_select_mtpl <- col_select

		col_select <- unique(col_select)
		if (length(col_select) != length(col_select_mtpl)) {
			dpl <- sort(unique(col_select_mtpl[duplicated(col_select_mtpl)]))
			stop(
				"Column", if (length(dpl) > 1) "s", " ",
				paste(dpl, collapse = ", "),
				" selected multiple times in `read_parquet()`."
			)
		}

		if (is.numeric(col_select)) {
			col_select <- as.integer(col_select)
		} else {
			pq_col_names <- .Call(nanoparquet_read_col_names, file)
			col_match <- match(col_select, pq_col_names)
			if (anyNA(col_match)) {
				bad <- unique(col_select[is.na(col_match)])
				stop(
					"Column", if (length(bad) > 1) "s", " ",
					paste(bad, collapse = ", "),
					if (length(bad) == 1) " does " else " do ",
					"not exist in Parquet file"
				)
			}
			col_select <- col_match
		}
		stopifnot(all(col_select >= 1L))
	}

  res <- .Call(nanoparquet_read2, file, options, col_select, sys.call())
	post_process_read_result(res, file, options, col_select)
}

post_process_read_result <- function(res, file, options, col_select) {
	dicts <- res[[2]]
	types <- res[[3]]
	arrow_schema <- res[[4]]
	res <- res[[1]]
	if (options[["use_arrow_metadata"]] && !is.na(arrow_schema)) {
		res <- apply_arrow_schema(res, file, arrow_schema, dicts, types, col_select)
	}

	# convert hms from milliseconds to seconds, also integer -> double
	hmss <- which(vapply(res, "inherits", "hms", FUN.VALUE = logical(1)))
	for (idx in hmss) {
		res[[idx]] <- structure(
			unclass(res[[idx]]) / 1000,
			class = class(res[[idx]])
		)
	}

	# convert POSIXct from milliseconds to seconds
	posixcts <- which(vapply(res, "inherits", "POSIXct", FUN.VALUE = logical(1)))
	for (idx in posixcts) {
		res[[idx]][] <- structure(
			unclass(res[[idx]]) / 1000,
			class = class(res[[idx]])
		)
	}

	if (length(options[["class"]]) > 0) {
		class(res) <- unique(c(options[["class"]], class(res)))
	}

  res
}

# dump the contents of a connection to path
dump_connection <- function(con, path) {
	if (!isOpen(con)) {
		on.exit(close(con), add = TRUE)
		open(con, "rb")
	}
	ocon <- file(path, open = "wb")
	# 10 MB buffer by default
	bs <- getOption("nanoparquet.con_buffer_size", 1024L * 1024L * 10)
	while (TRUE) {
		buf <- readBin(con, what = "raw", n = bs)
		if (length(buf) == 0) {
			break
		}
		writeBin(buf, ocon)
	}
	close(ocon)
}

read_parquet_row_group <- function(file, row_group,
																	 options = parquet_options()) {
	if (inherits(file, "connection")) {
		tmp <- tempfile(fileext = ".parquet")
		on.exit(unlink(tmp), add = TRUE)
		dump_connection(file, tmp)
		file <- tmp
	}
  file <- path.expand(file)
  res <- .Call(nanoparquet_read_row_group, file, row_group,
							 options, sys.call())
	post_process_read_result(res, file, options, col_select = NULL)
}

# TODO: this does not work currently
read_parquet_column_chunk <- function(file, row_group = 0L, column = 0L,
																			options = parquet_options()) {
	if (inherits(file, "connection")) {
		tmp <- tempfile(fileext = ".parquet")
		on.exit(unlink(tmp), add = TRUE)
		dump_connection(file, tmp)
		file <- tmp
	}
  file <- path.expand(file)
  res <- .Call(nanoparquet_read_column_chunk, file, row_group, column,
							 options, sys.call())
	post_process_read_result(res, file, options)
}

Try the nanoparquet package in your browser

Any scripts or data that you put into this service are public.

nanoparquet documentation built on April 3, 2025, 11:26 p.m.