Nothing
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
skip_if_not_available("parquet")
pq_file <- system.file("v0.7.1.parquet", package = "arrow")
test_that("reading a known Parquet file to tibble", {
skip_if_not_available("snappy")
df <- read_parquet(pq_file)
expect_true(tibble::is_tibble(df))
expect_identical(dim(df), c(10L, 11L))
# TODO: assert more about the contents
})
test_that("simple int column roundtrip", {
df <- tibble::tibble(x = 1:5)
pq_tmp_file <- tempfile() # You can specify the .parquet here but that's probably not necessary
write_parquet(df, pq_tmp_file)
df_read <- read_parquet(pq_tmp_file, mmap = FALSE)
expect_equal(df, df_read)
# Make sure file connection is cleaned up
expect_error(file.remove(pq_tmp_file), NA)
expect_false(file.exists(pq_tmp_file))
})
test_that("read_parquet() supports col_select", {
skip_if_not_available("snappy")
df <- read_parquet(pq_file, col_select = c(x, y, z))
expect_equal(names(df), c("x", "y", "z"))
df <- read_parquet(pq_file, col_select = starts_with("c"))
expect_equal(names(df), c("carat", "cut", "color", "clarity"))
})
test_that("read_parquet() with raw data", {
skip_if_not_available("snappy")
test_raw <- readBin(pq_file, what = "raw", n = 5000)
df <- read_parquet(test_raw)
expect_identical(dim(df), c(10L, 11L))
})
test_that("write_parquet() handles various compression= specs", {
skip_if_not_available("snappy")
tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5)
expect_parquet_roundtrip(tab, compression = "snappy")
expect_parquet_roundtrip(tab, compression = rep("snappy", 3L))
expect_parquet_roundtrip(tab, compression = c(x1 = "snappy", x2 = "snappy"))
})
test_that("write_parquet() handles various compression_level= specs", {
skip_if_not_available("gzip")
tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5)
expect_parquet_roundtrip(tab, compression = "gzip", compression_level = 4)
expect_parquet_roundtrip(tab, compression = "gzip", compression_level = rep(4L, 3L))
expect_parquet_roundtrip(tab, compression = "gzip", compression_level = c(x1 = 5L, x2 = 3L))
})
test_that("write_parquet() handles various use_dictionary= specs", {
tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5)
expect_parquet_roundtrip(tab, use_dictionary = TRUE)
expect_parquet_roundtrip(tab, use_dictionary = c(TRUE, FALSE, TRUE))
expect_parquet_roundtrip(tab, use_dictionary = c(x1 = TRUE, x2 = TRUE))
expect_error(
write_parquet(tab, tempfile(), use_dictionary = c(TRUE, FALSE)),
"unsupported use_dictionary= specification"
)
expect_error(
write_parquet(tab, tempfile(), use_dictionary = 12),
"is.logical(use_dictionary) is not TRUE",
fixed = TRUE
)
})
test_that("write_parquet() handles various write_statistics= specs", {
tab <- Table$create(x1 = 1:5, x2 = 1:5, y = 1:5)
expect_parquet_roundtrip(tab, write_statistics = TRUE)
expect_parquet_roundtrip(tab, write_statistics = c(TRUE, FALSE, TRUE))
expect_parquet_roundtrip(tab, write_statistics = c(x1 = TRUE, x2 = TRUE))
})
test_that("write_parquet() accepts RecordBatch too", {
batch <- RecordBatch$create(x1 = 1:5, x2 = 1:5, y = 1:5)
tab <- parquet_roundtrip(batch)
expect_equal(tab, Table$create(batch))
})
test_that("write_parquet() handles grouped_df", {
library(dplyr, warn.conflicts = FALSE)
df <- tibble::tibble(a = 1:4, b = 5) %>% group_by(b)
# Since `df` is a "grouped_df", this test asserts that we get a grouped_df back
expect_parquet_roundtrip(df, as_data_frame = TRUE)
})
test_that("write_parquet() with invalid input type", {
bad_input <- Array$create(1:5)
expect_snapshot_error(write_parquet(bad_input, tempfile()))
})
test_that("write_parquet() can truncate timestamps", {
tab <- Table$create(x1 = as.POSIXct("2020/06/03 18:00:00", tz = "UTC"))
expect_type_equal(tab$x1, timestamp("us", "UTC"))
tf <- tempfile()
on.exit(unlink(tf))
write_parquet(tab, tf, coerce_timestamps = "ms", allow_truncated_timestamps = TRUE)
new <- read_parquet(tf, as_data_frame = FALSE)
expect_type_equal(new$x1, timestamp("ms", "UTC"))
expect_equal(as.data.frame(tab), as.data.frame(new))
})
test_that("make_valid_parquet_version()", {
expect_equal(
make_valid_parquet_version("1.0"),
ParquetVersionType$PARQUET_1_0
)
expect_deprecated(
expect_equal(
make_valid_parquet_version("2.0"),
ParquetVersionType$PARQUET_2_0
)
)
expect_equal(
make_valid_parquet_version("2.4"),
ParquetVersionType$PARQUET_2_4
)
expect_equal(
make_valid_parquet_version("2.6"),
ParquetVersionType$PARQUET_2_6
)
expect_equal(
make_valid_parquet_version("latest"),
ParquetVersionType$PARQUET_2_6
)
expect_equal(make_valid_parquet_version(1), ParquetVersionType$PARQUET_1_0)
expect_deprecated(
expect_equal(make_valid_parquet_version(2), ParquetVersionType$PARQUET_2_0)
)
expect_equal(make_valid_parquet_version(1.0), ParquetVersionType$PARQUET_1_0)
expect_equal(make_valid_parquet_version(2.4), ParquetVersionType$PARQUET_2_4)
})
test_that("make_valid_parquet_version() input validation", {
expect_error(
make_valid_parquet_version("0.3.14"),
"`version` must be one of"
)
expect_error(
make_valid_parquet_version(NULL),
"`version` must be one of"
)
expect_error(
make_valid_parquet_version(c("2", "4")),
"`version` must be one of"
)
})
test_that("write_parquet() defaults to snappy compression", {
skip_if_not_available("snappy")
tmp1 <- tempfile()
tmp2 <- tempfile()
write_parquet(mtcars, tmp1)
write_parquet(mtcars, tmp2, compression = "snappy")
expect_equal(file.size(tmp1), file.size(tmp2))
})
test_that("write_parquet() does not detect compression from filename", {
# TODO(ARROW-17221): should this be supported?
without <- tempfile(fileext = ".parquet")
with_gz <- tempfile(fileext = ".parquet.gz")
write_parquet(mtcars, without)
write_parquet(mtcars, with_gz)
expect_equal(file.size(with_gz), file.size(without))
})
test_that("read_parquet() handles (ignores) compression in filename", {
df <- tibble::tibble(x = 1:5)
f <- tempfile(fileext = ".parquet.gz")
write_parquet(df, f)
expect_equal(read_parquet(f), df)
})
test_that("Factors are preserved when writing/reading from Parquet", {
fct <- factor(c("a", "b"), levels = c("c", "a", "b"))
ord <- factor(c("a", "b"), levels = c("c", "a", "b"), ordered = TRUE)
chr <- c("a", "b")
df <- tibble::tibble(fct = fct, ord = ord, chr = chr)
pq_tmp_file <- tempfile()
on.exit(unlink(pq_tmp_file))
write_parquet(df, pq_tmp_file)
df_read <- read_parquet(pq_tmp_file)
expect_equal(df, df_read)
})
test_that("Lists are preserved when writing/reading from Parquet", {
bool <- list(logical(0), NA, c(TRUE, FALSE))
int <- list(integer(0), NA_integer_, 1:4)
num <- list(numeric(0), NA_real_, c(1, 2))
char <- list(character(0), NA_character_, c("itsy", "bitsy"))
df <- tibble::tibble(bool = bool, int = int, num = num, char = char)
pq_tmp_file <- tempfile()
on.exit(unlink(pq_tmp_file))
write_parquet(df, pq_tmp_file)
df_read <- read_parquet(pq_tmp_file)
expect_equal(df, df_read, ignore_attr = TRUE)
})
test_that("Maps are preserved when writing/reading from Parquet", {
string_bool <- Array$create(list(data.frame(key = c("a", "b"), value = c(TRUE, FALSE), stringsAsFactors = FALSE)),
type = map_of(utf8(), boolean())
)
int_struct <- Array$create(
list(tibble::tibble(key = c(2, 4), value = data.frame(x = c(1, 2), y = c("a", "b"), stringsAsFactors = FALSE))),
type = map_of(int64(), struct(x = int64(), y = utf8()))
)
df <- arrow_table(string_bool = string_bool, int_struct = int_struct)
pq_tmp_file <- tempfile()
on.exit(unlink(pq_tmp_file))
write_parquet(df, pq_tmp_file)
df_read <- read_parquet(pq_tmp_file, as_data_frame = FALSE)
expect_equal(df, df_read, ignore_attr = TRUE)
})
test_that("read_parquet() and write_parquet() accept connection objects", {
skip_if_not_available("snappy")
tf <- tempfile()
on.exit(unlink(tf))
# make this big enough that we might expose concurrency problems,
# but not so big that it slows down the tests
test_tbl <- tibble::tibble(
x = 1:1e4,
y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE),
z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE)
)
write_parquet(test_tbl, file(tf))
expect_identical(read_parquet(tf), test_tbl)
expect_identical(read_parquet(file(tf)), read_parquet(tf))
})
test_that("write_parquet() to stream", {
df <- tibble::tibble(x = 1:5)
tf <- tempfile()
con <- FileOutputStream$create(tf)
on.exit(unlink(tf))
write_parquet(df, con)
con$close()
expect_equal(read_parquet(tf), df)
})
test_that("write_parquet() returns its input", {
df <- tibble::tibble(x = 1:5)
tf <- tempfile()
on.exit(unlink(tf))
df_out <- write_parquet(df, tf)
expect_equal(df, df_out)
})
test_that("write_parquet() handles version argument", {
df <- tibble::tibble(x = 1:5)
versions <- list("1.0", "2.4", "2.6", "latest", 1.0, 2.4, 2.6, 1L)
purrr::walk(versions, function(x) {
tf <- tempfile()
on.exit(unlink(tf))
write_parquet(df, tf, version = x)
expect_identical(read_parquet(tf), df)
})
invalid_versions <- list("3.0", 3.0, 3L, "A")
purrr::walk(invalid_versions, function(x) {
tf <- tempfile()
on.exit(unlink(tf))
expect_error(write_parquet(df, tf, version = x))
})
})
test_that("ParquetFileReader raises an error for non-RandomAccessFile source", {
skip_if_not_available("gzip")
expect_error(
ParquetFileReader$create(CompressedInputStream$create(pq_file)),
'file must be a "RandomAccessFile"'
)
})
test_that("ParquetFileWriter raises an error for non-OutputStream sink", {
sch <- schema(a = float32())
# ARROW-9946
expect_error(
ParquetFileWriter$create(schema = sch, sink = tempfile()),
regexp = "OutputStream"
)
})
test_that("ParquetFileReader $ReadRowGroup(s) methods", {
tab <- Table$create(x = 1:100)
tf <- tempfile()
on.exit(unlink(tf))
write_parquet(tab, tf, chunk_size = 10)
reader <- ParquetFileReader$create(tf)
expect_true(reader$ReadRowGroup(0) == Table$create(x = 1:10))
expect_true(reader$ReadRowGroup(9) == Table$create(x = 91:100))
expect_error(reader$ReadRowGroup(-1), "Some index in row_group_indices")
expect_error(reader$ReadRowGroup(111), "Some index in row_group_indices")
expect_error(reader$ReadRowGroup(c(1, 2)))
expect_error(reader$ReadRowGroup("a"))
expect_true(reader$ReadRowGroups(c(0, 1)) == Table$create(x = 1:20))
expect_error(reader$ReadRowGroups(c(0, 1, -2))) # although it gives a weird error
expect_error(reader$ReadRowGroups(c(0, 1, 31))) # ^^
expect_error(reader$ReadRowGroups(c("a", "b")))
## -- with column_indices
expect_true(reader$ReadRowGroup(0, 0) == Table$create(x = 1:10))
expect_error(reader$ReadRowGroup(0, 1))
expect_true(reader$ReadRowGroups(c(0, 1), 0) == Table$create(x = 1:20))
expect_error(reader$ReadRowGroups(c(0, 1), 1))
})
test_that("Error messages are shown when the compression algorithm snappy is not found", {
msg <- paste0(
".*",
"you will need to reinstall arrow with additional features enabled.\nSet one of these ",
"environment variables before installing:",
"\n\n \\* Sys\\.setenv\\(LIBARROW_MINIMAL = \"false\"\\) ",
"\\(for all optional features, including 'snappy'\\)",
"\n \\* Sys\\.setenv\\(ARROW_WITH_SNAPPY = \"ON\"\\) \\(for just 'snappy')\n\n",
"See https://arrow.apache.org/docs/r/articles/install.html for details"
)
if (codec_is_available("snappy")) {
d <- read_parquet(pq_file)
expect_s3_class(d, "data.frame")
} else {
expect_error(read_parquet(pq_file), msg)
}
})
test_that("Error is created when parquet reads a feather file", {
expect_error(
read_parquet(test_path("golden-files/data-arrow_2.0.0_lz4.feather")),
"Parquet magic bytes not found in footer"
)
})
test_that("ParquetFileWrite chunk_size defaults", {
tab <- Table$create(x = 1:101)
tf <- tempfile()
on.exit(unlink(tf))
# we can alter our default cells per group
withr::with_options(
list(
arrow.parquet_cells_per_group = 25
),
{
# this will be 4 chunks
write_parquet(tab, tf)
reader <- ParquetFileReader$create(tf)
expect_true(reader$ReadRowGroup(0) == Table$create(x = 1:26))
expect_true(reader$ReadRowGroup(3) == Table$create(x = 79:101))
expect_error(reader$ReadRowGroup(4), "Some index in row_group_indices")
}
)
# but we always have no more than max_chunks (even if cells_per_group is low!)
# use a new tempfile so that windows doesn't complain about the file being over-written
tf <- tempfile()
on.exit(unlink(tf))
withr::with_options(
list(
arrow.parquet_cells_per_group = 25,
arrow.parquet_max_chunks = 2
),
{
# this will be 4 chunks
write_parquet(tab, tf)
reader <- ParquetFileReader$create(tf)
expect_true(reader$ReadRowGroup(0) == Table$create(x = 1:51))
expect_true(reader$ReadRowGroup(1) == Table$create(x = 52:101))
expect_error(reader$ReadRowGroup(2), "Some index in row_group_indices")
}
)
})
test_that("ParquetFileWrite chunk_size calculation doesn't have integer overflow issues (ARROW-14894)", {
expect_equal(calculate_chunk_size(31869547, 108, 2.5e8, 200), 2451504)
# we can set the target cells per group, and it rounds appropriately
expect_equal(calculate_chunk_size(100, 1, 25), 25)
expect_equal(calculate_chunk_size(101, 1, 25), 26)
# but our max_chunks is respected
expect_equal(calculate_chunk_size(101, 1, 25, 2), 51)
})
test_that("deprecated int96 timestamp unit can be specified when reading Parquet files", {
tf <- tempfile()
on.exit(unlink(tf))
table <- Table$create(
some_datetime = as.POSIXct("2001-01-01 12:34:56.789")
)
write_parquet(
table,
tf,
use_deprecated_int96_timestamps = TRUE
)
props <- ParquetArrowReaderProperties$create()
props$set_coerce_int96_timestamp_unit(TimeUnit$MILLI)
expect_identical(props$coerce_int96_timestamp_unit(), TimeUnit$MILLI)
result <- read_parquet(
tf,
as_data_frame = FALSE,
props = props
)
expect_identical(result$some_datetime$type$unit(), TimeUnit$MILLI)
expect_true(result$some_datetime == table$some_datetime)
})
test_that("Can read parquet with nested lists and maps", {
# Construct the path to the parquet-testing submodule. This will search:
# * $ARROW_SOURCE_HOME/cpp/submodules/parquet-testing/data
# * ../cpp/submodules/parquet-testing/data
# ARROW_SOURCE_HOME is set in many of our CI setups, so that will find the files
# the .. version should catch some (thought not all) ways of running tests locally
base_path <- Sys.getenv("ARROW_SOURCE_HOME", "..")
# make this a full path, at the root of the filesystem if we're using ARROW_SOURCE_HOME
if (base_path != "..") {
base_path <- file.path("", base_path)
}
parquet_test_data <- file.path(base_path, "cpp", "submodules", "parquet-testing", "data")
skip_if_not(dir.exists(parquet_test_data) | force_tests(), "Parquet test data missing")
skip_if_not_available("snappy")
pq <- read_parquet(paste0(parquet_test_data, "/nested_lists.snappy.parquet"), as_data_frame = FALSE)
expect_type_equal(pq$a, list_of(field("element", list_of(field("element", list_of(field("element", utf8())))))))
pq <- read_parquet(paste0(parquet_test_data, "/nested_maps.snappy.parquet"), as_data_frame = FALSE)
expect_true(pq$a$type == map_of(utf8(), map_of(int32(), field("value", boolean(), nullable = FALSE))))
})
test_that("Can read Parquet files from a URL", {
skip_if_offline()
skip_on_cran()
skip_if_not_available("snappy")
parquet_url <- "https://github.com/apache/arrow/blob/64f2cc7986ce672dd1a8cb268d193617a80a1653/r/inst/v0.7.1.parquet?raw=true" # nolint
pu <- read_parquet(parquet_url)
expect_true(tibble::is_tibble(pu))
expect_identical(dim(pu), c(10L, 11L))
})
test_that("thrift string and container size can be specified when reading Parquet files", {
tf <- tempfile()
on.exit(unlink(tf))
table <- arrow_table(example_data)
write_parquet(table, tf)
file <- make_readable_file(tf)
on.exit(file$close())
# thrift string size
reader_props <- ParquetReaderProperties$create()
reader_props$set_thrift_string_size_limit(1)
expect_identical(reader_props$thrift_string_size_limit(), 1L)
# We get an error if we set the Thrift string size limit too small
expect_error(ParquetFileReader$create(file, reader_props = reader_props), "TProtocolException: Exceeded size limit")
# Increase the size and we can read successfully
reader_props$set_thrift_string_size_limit(10000)
reader <- ParquetFileReader$create(file, reader_props = reader_props)
data <- reader$ReadTable()
expect_identical(collect.ArrowTabular(data), example_data)
# thrift container size
reader_props_container <- ParquetReaderProperties$create()
reader_props_container$set_thrift_container_size_limit(1)
expect_identical(reader_props_container$thrift_container_size_limit(), 1L)
expect_error(
ParquetFileReader$create(file, reader_props = reader_props_container),
"TProtocolException: Exceeded size limit"
)
reader_props_container$set_thrift_container_size_limit(100)
reader_container <- ParquetFileReader$create(file, reader_props = reader_props_container)
data <- reader_container$ReadTable()
expect_identical(collect.ArrowTabular(data), example_data)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.