Nothing
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
skip_if_not_available("dataset")
library(dplyr, warn.conflicts = FALSE)
hive_dir <- make_temp_dir()
csv_dir <- make_temp_dir()
test_that("Setup (putting data in the dirs)", {
if (arrow_with_parquet()) {
dir.create(file.path(hive_dir, "subdir", "group=1", "other=xxx"), recursive = TRUE)
dir.create(file.path(hive_dir, "subdir", "group=2", "other=yyy"), recursive = TRUE)
write_parquet(df1, file.path(hive_dir, "subdir", "group=1", "other=xxx", "file1.parquet"))
write_parquet(df2, file.path(hive_dir, "subdir", "group=2", "other=yyy", "file2.parquet"))
expect_length(dir(hive_dir, recursive = TRUE), 2)
}
# Now, CSV
dir.create(file.path(csv_dir, 5))
dir.create(file.path(csv_dir, 6))
write.csv(df1, file.path(csv_dir, 5, "file1.csv"), row.names = FALSE)
write.csv(df2, file.path(csv_dir, 6, "file2.csv"), row.names = FALSE)
expect_length(dir(csv_dir, recursive = TRUE), 2)
})
test_that("Writing a dataset: CSV->IPC", {
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equal(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
# Check whether "int" is present in the files or just in the dirs
first <- read_feather(
dir(dst_dir, pattern = ".arrow$", recursive = TRUE, full.names = TRUE)[1],
as_data_frame = FALSE
)
# It shouldn't be there
expect_false("int" %in% names(first))
})
test_that("Writing a dataset: Parquet->IPC", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equal(
new_ds %>%
select(string = chr, integer = int, group) %>%
filter(integer > 6 & group == 1) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: CSV->Parquet", {
skip_if_not_available("parquet")
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "parquet", partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir)
expect_equal(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: Parquet->Parquet (default)", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir)
expect_equal(
new_ds %>%
select(string = chr, integer = int, group) %>%
filter(integer > 6 & group == 1) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: `basename_template` default behavior", {
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "parquet", max_rows_per_file = 5L)
expect_identical(
dir(dst_dir, full.names = FALSE, recursive = TRUE),
paste0("part-", 0:3, ".parquet")
)
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "parquet", basename_template = "{i}.data", max_rows_per_file = 5L)
expect_identical(
dir(dst_dir, full.names = FALSE, recursive = TRUE),
paste0(0:3, ".data")
)
dst_dir <- make_temp_dir()
expect_error(
write_dataset(ds, dst_dir, format = "parquet", basename_template = "part-i.parquet"),
"basename_template did not contain '\\{i\\}'"
)
feather_dir <- make_temp_dir()
write_dataset(ds, feather_dir, format = "feather", partitioning = "int")
expect_identical(
dir(feather_dir, full.names = FALSE, recursive = TRUE),
sort(paste(paste("int", c(1:10, 101:110), sep = "="), "part-0.arrow", sep = "/"))
)
ipc_dir <- make_temp_dir()
write_dataset(ds, ipc_dir, format = "ipc", partitioning = "int")
expect_identical(
dir(ipc_dir, full.names = FALSE, recursive = TRUE),
sort(paste(paste("int", c(1:10, 101:110), sep = "="), "part-0.arrow", sep = "/"))
)
})
test_that("Writing a dataset: existing data behavior", {
# This test does not work on Windows because unlink does not immediately
# delete the data.
skip_on_os("windows")
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
expect_true(dir.exists(dst_dir))
check_dataset <- function() {
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equal(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
}
check_dataset()
# By default we should overwrite
write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
check_dataset()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "overwrite")
check_dataset()
expect_error(
write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error"),
"directory is not empty"
)
unlink(dst_dir, recursive = TRUE)
write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error")
check_dataset()
})
test_that("Writing a dataset: no format specified", {
dst_dir <- make_temp_dir()
write_dataset(example_data, dst_dir)
new_ds <- open_dataset(dst_dir)
expect_equal(
list.files(dst_dir, pattern = "parquet"),
"part-0.parquet"
)
expect_true(
inherits(new_ds$format, "ParquetFileFormat")
)
expect_equal(
new_ds %>% collect(),
example_data
)
})
test_that("Dataset writing: dplyr methods", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
# Specify partition vars by group_by
ds %>%
group_by(int) %>%
write_dataset(dst_dir, format = "feather")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
# select to specify schema (and rename)
dst_dir2 <- tempfile()
ds %>%
group_by(int) %>%
select(chr, dubs = dbl) %>%
write_dataset(dst_dir2, format = "feather")
new_ds <- open_dataset(dst_dir2, format = "feather")
expect_equal(
collect(new_ds) %>% arrange(int),
rbind(df1[c("chr", "dbl", "int")], df2[c("chr", "dbl", "int")]) %>% rename(dubs = dbl)
)
# filter to restrict written rows
dst_dir3 <- tempfile()
ds %>%
filter(int == 4) %>%
write_dataset(dst_dir3, format = "feather")
new_ds <- open_dataset(dst_dir3, format = "feather")
expect_equal(
new_ds %>% select(names(df1)) %>% collect(),
df1 %>% filter(int == 4)
)
# mutate
dst_dir3 <- tempfile()
ds %>%
filter(int == 4) %>%
mutate(twice = int * 2) %>%
write_dataset(dst_dir3, format = "feather")
new_ds <- open_dataset(dst_dir3, format = "feather")
expect_equal(
new_ds %>% select(c(names(df1), "twice")) %>% collect(),
df1 %>% filter(int == 4) %>% mutate(twice = int * 2)
)
# head
dst_dir4 <- tempfile()
ds %>%
mutate(twice = int * 2) %>%
arrange(int) %>%
head(3) %>%
write_dataset(dst_dir4, format = "feather")
new_ds <- open_dataset(dst_dir4, format = "feather")
expect_equal(
new_ds %>%
select(c(names(df1), "twice")) %>%
collect(),
df1 %>%
mutate(twice = int * 2) %>%
head(3)
)
})
test_that("Dataset writing: non-hive", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int", hive_style = FALSE)
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(as.character(c(1:10, 101:110))))
})
test_that("Dataset writing: no partitioning", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
write_dataset(ds, dst_dir, format = "feather", partitioning = NULL)
expect_true(dir.exists(dst_dir))
expect_true(length(dir(dst_dir)) > 0)
})
test_that("Dataset writing: partition on null", {
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
partitioning <- hive_partition(lgl = boolean())
write_dataset(ds, dst_dir, partitioning = partitioning)
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), c("lgl=__HIVE_DEFAULT_PARTITION__", "lgl=false", "lgl=true"))
dst_dir <- tempfile()
partitioning <- hive_partition(lgl = boolean(), null_fallback = "xyz")
write_dataset(ds, dst_dir, partitioning = partitioning)
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), c("lgl=false", "lgl=true", "lgl=xyz"))
ds_readback <- open_dataset(dst_dir, partitioning = hive_partition(lgl = boolean(), null_fallback = "xyz"))
expect_identical(
ds %>%
select(int, lgl) %>%
collect() %>%
arrange(lgl, int),
ds_readback %>%
select(int, lgl) %>%
collect() %>%
arrange(lgl, int)
)
})
test_that("Dataset writing: from data.frame", {
dst_dir <- tempfile()
stacked <- rbind(df1, df2)
stacked %>%
group_by(int) %>%
write_dataset(dst_dir, format = "feather")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equal(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Dataset writing: from RecordBatch", {
dst_dir <- tempfile()
stacked <- record_batch(rbind(df1, df2))
stacked %>%
mutate(twice = int * 2) %>%
group_by(int) %>%
write_dataset(dst_dir, format = "feather")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equal(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: Ipc format options & compression", {
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
codec <- NULL
if (codec_is_available("zstd")) {
codec <- Codec$create("zstd")
}
write_dataset(ds, dst_dir, format = "feather", codec = codec)
expect_true(dir.exists(dst_dir))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equal(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: Parquet format options", {
skip_if_not_available("parquet")
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
dst_dir_no_truncated_timestamps <- make_temp_dir()
# Use trace() to confirm that options are passed in
suppressMessages(trace(
"parquet___ArrowWriterProperties___create",
tracer = quote(warning("allow_truncated_timestamps == ", allow_truncated_timestamps)),
print = FALSE,
where = write_dataset
))
expect_warning(
write_dataset(ds, dst_dir_no_truncated_timestamps, format = "parquet", partitioning = "int"),
"allow_truncated_timestamps == FALSE"
)
expect_warning(
write_dataset(ds, dst_dir, format = "parquet", partitioning = "int", allow_truncated_timestamps = TRUE),
"allow_truncated_timestamps == TRUE"
)
suppressMessages(untrace(
"parquet___ArrowWriterProperties___create",
where = write_dataset
))
# Now confirm we can read back what we sent
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir)
expect_equal(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: CSV format options", {
df <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
write_dataset(df, dst_dir, format = "csv")
expect_true(dir.exists(dst_dir))
new_ds <- open_dataset(dst_dir, format = "csv")
expect_equal(new_ds %>% collect(), df)
dst_dir <- make_temp_dir()
write_dataset(df, dst_dir, format = "csv", include_header = FALSE)
expect_true(dir.exists(dst_dir))
new_ds <- open_dataset(dst_dir,
format = "csv",
column_names = c("int", "dbl", "lgl", "chr")
)
expect_equal(new_ds %>% collect(), df)
})
test_that("Dataset writing: unsupported features/input validation", {
skip_if_not_available("parquet")
expect_error(write_dataset(4), "You must supply a")
expect_error(
write_dataset(data.frame(x = 1, x = 2, check.names = FALSE)),
"Field names must be unique"
)
ds <- open_dataset(hive_dir)
expect_error(
write_dataset(ds, partitioning = c("int", "NOTACOLUMN"), format = "ipc"),
'Invalid field name: "NOTACOLUMN"'
)
expect_error(
write_dataset(ds, tempfile(), basename_template = "something_without_i")
)
expect_error(
write_dataset(ds, tempfile(), basename_template = NULL)
)
})
# see https://issues.apache.org/jira/browse/ARROW-12315
test_that("Max partitions fails with non-integer values and less than required partitions values", {
skip_if_not_available("parquet")
df <- tibble::tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
# max_partitions = 10 => pass
expect_silent(
write_dataset(df, dst_dir, partitioning = "int", max_partitions = 10)
)
# max_partitions < 10 => error
expect_error(
write_dataset(df, dst_dir, partitioning = "int", max_partitions = 5),
"Fragment would be written into 10 partitions. This exceeds the maximum of 5"
)
# negative max_partitions => error
expect_error(
write_dataset(df, dst_dir, partitioning = "int", max_partitions = -3),
"max_partitions must be a positive, non-missing integer"
)
# round(max_partitions, 0) != max_partitions => error
expect_error(
write_dataset(df, dst_dir, partitioning = "int", max_partitions = 3.5),
"max_partitions must be a positive, non-missing integer"
)
# max_partitions = NULL => fail
expect_error(
write_dataset(df, dst_dir, partitioning = "int", max_partitions = NULL),
"max_partitions must be a positive, non-missing integer"
)
# max_partitions = NA => fail
expect_error(
write_dataset(df, dst_dir, partitioning = "int", max_partitions = NA_integer_),
"max_partitions must be a positive, non-missing integer"
)
# max_partitions = chr => error
expect_error(
write_dataset(df, dst_dir, partitioning = "int", max_partitions = "foobar"),
"max_partitions must be a positive, non-missing integer"
)
})
test_that("max_rows_per_group is adjusted if at odds with max_rows_per_file", {
skip_if_not_available("parquet")
df <- tibble::tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
# max_rows_per_group unset adjust silently
expect_silent(
write_dataset(df, dst_dir, max_rows_per_file = 5)
)
})
test_that("write_dataset checks for format-specific arguments", {
df <- tibble::tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
expect_snapshot(
write_dataset(df, dst_dir, format = "feather", compression = "snappy"),
error = TRUE
)
expect_snapshot(
write_dataset(df, dst_dir, format = "feather", nonsensical_arg = "blah-blah"),
error = TRUE
)
expect_snapshot(
write_dataset(df, dst_dir, format = "arrow", nonsensical_arg = "blah-blah"),
error = TRUE
)
expect_snapshot(
write_dataset(df, dst_dir, format = "ipc", nonsensical_arg = "blah-blah"),
error = TRUE
)
expect_snapshot(
write_dataset(df, dst_dir, format = "csv", nonsensical_arg = "blah-blah"),
error = TRUE
)
expect_snapshot(
write_dataset(df, dst_dir, format = "parquet", nonsensical_arg = "blah-blah"),
error = TRUE
)
})
get_num_of_files <- function(dir, format) {
files <- list.files(dir, pattern = paste(".", format, sep = ""), recursive = TRUE, full.names = TRUE)
length(files)
}
test_that("Dataset write max open files", {
skip_if_not_available("parquet")
# test default partitioning
dst_dir <- make_temp_dir()
file_format <- "parquet"
partitioning <- "c2"
num_of_unique_c2_groups <- 5
record_batch_1 <- record_batch(
c1 = c(1, 2, 3, 4, 0, 10),
c2 = c("a", "b", "c", "d", "e", "a")
)
record_batch_2 <- record_batch(
c1 = c(5, 6, 7, 8, 0, 1),
c2 = c("a", "b", "c", "d", "e", "c")
)
record_batch_3 <- record_batch(
c1 = c(9, 10, 11, 12, 0, 1),
c2 = c("a", "b", "c", "d", "e", "d")
)
record_batch_4 <- record_batch(
c1 = c(13, 14, 15, 16, 0, 1),
c2 = c("a", "b", "c", "d", "e", "b")
)
table <- Table$create(
d1 = record_batch_1, d2 = record_batch_2,
d3 = record_batch_3, d4 = record_batch_4
)
write_dataset(table, path = dst_dir, format = file_format, partitioning = partitioning)
# reduce 1 from the length of list of directories, since it list the search path)
expect_equal(length(list.dirs(dst_dir)) - 1, num_of_unique_c2_groups)
max_open_files <- 3
dst_dir <- make_temp_dir()
write_dataset(
table,
path = dst_dir,
format = file_format,
partitioning = partitioning,
max_open_files = max_open_files
)
expect_gt(get_num_of_files(dst_dir, file_format), max_open_files)
})
test_that("Dataset write max rows per files", {
skip_if_not_available("parquet")
num_of_records <- 35
df <- tibble::tibble(
int = 1:num_of_records,
dbl = as.numeric(1:num_of_records),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 7),
chr = rep(letters[1:7], 5),
)
table <- Table$create(df)
max_rows_per_file <- 10
max_rows_per_group <- 10
dst_dir <- make_temp_dir()
file_format <- "parquet"
write_dataset(
table,
path = dst_dir,
format = file_format,
max_rows_per_file = max_rows_per_file,
max_rows_per_group = max_rows_per_group
)
expected_partitions <- num_of_records %/% max_rows_per_file + 1
written_files <- list.files(dst_dir)
result_partitions <- length(written_files)
expect_equal(expected_partitions, result_partitions)
total_records <- 0
for (file in written_files) {
file_path <- paste(dst_dir, file, sep = "/")
ds <- read_parquet(file_path)
cur_records <- nrow(ds)
expect_lte(cur_records, max_rows_per_file)
total_records <- total_records + cur_records
}
expect_equal(total_records, num_of_records)
})
test_that("Dataset min_rows_per_group", {
skip_if_not(CanRunWithCapturedR())
skip_if_not_available("parquet")
rb1 <- record_batch(
c1 = c(1, 2, 3, 4),
c2 = c("a", "b", "e", "a")
)
rb2 <- record_batch(
c1 = c(5, 6, 7, 8, 9),
c2 = c("a", "b", "c", "d", "h")
)
rb3 <- record_batch(
c1 = c(10, 11),
c2 = c("a", "b")
)
dataset <- Table$create(d1 = rb1, d2 = rb2, d3 = rb3)
dst_dir <- make_temp_dir()
min_rows_per_group <- 4
max_rows_per_group <- 5
write_dataset(
dataset,
min_rows_per_group = min_rows_per_group,
max_rows_per_group = max_rows_per_group,
path = dst_dir
)
ds <- open_dataset(dst_dir)
row_group_sizes <- ds %>%
map_batches(~ record_batch(nrows = .$num_rows)) %>%
pull(nrows) %>%
as.vector()
index <- 1
# We expect there to be 3 row groups since 11/5 = 2.2 and 11/4 = 2.75
expect_length(row_group_sizes, 3L)
# We have all the rows
expect_equal(sum(row_group_sizes), nrow(ds))
# We expect that 2 of those will be between the two bounds
in_bounds <- row_group_sizes >= min_rows_per_group & row_group_sizes <= max_rows_per_group
expect_equal(sum(in_bounds), 2)
# and the last one that is not is less than the max:
expect_lte(row_group_sizes[!in_bounds], max_rows_per_group)
})
test_that("Dataset write max rows per group", {
skip_if_not(CanRunWithCapturedR())
skip_if_not_available("parquet")
num_of_records <- 30
max_rows_per_group <- 18
df <- tibble::tibble(
int = 1:num_of_records,
dbl = as.numeric(1:num_of_records),
)
table <- Table$create(df)
dst_dir <- make_temp_dir()
file_format <- "parquet"
write_dataset(table, path = dst_dir, format = file_format, max_rows_per_group = max_rows_per_group)
written_files <- list.files(dst_dir)
record_combination <- list()
# writes only to a single file with multiple groups
file_path <- paste(dst_dir, written_files[[1]], sep = "/")
ds <- open_dataset(file_path)
row_group_sizes <- ds %>%
map_batches(~ record_batch(nrows = .$num_rows)) %>%
pull(nrows) %>%
as.vector() %>%
sort()
expect_equal(row_group_sizes, c(12, 18))
})
test_that("Can delete filesystem dataset after write_dataset", {
# While this test should pass on all platforms, this is primarily
# a test for Windows because that platform won't allow open files
# to be deleted.
dataset_dir2 <- tempfile()
ds0 <- open_dataset(hive_dir)
write_dataset(ds0, dataset_dir2)
dataset_dir3 <- tempfile()
on.exit(unlink(dataset_dir3, recursive = TRUE))
ds <- open_dataset(dataset_dir2)
write_dataset(ds, dataset_dir3)
unlink(dataset_dir2, recursive = TRUE)
expect_false(dir.exists(dataset_dir2))
})
test_that("write_dataset() errors on data.frame with NULL names", {
df <- data.frame(a = 1, b = "two")
names(df) <- NULL
expect_error(write_dataset(df, tempfile()), "Input data frame columns must be named")
})
test_that("Writing a dataset to text files with wrapper functions.", {
df <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
write_delim_dataset(df, dst_dir)
expect_true(dir.exists(dst_dir))
new_ds <- open_dataset(dst_dir, format = "text")
expect_equal(new_ds %>% collect(), df)
dst_dir <- make_temp_dir()
write_csv_dataset(df, dst_dir)
expect_true(dir.exists(dst_dir))
new_ds <- open_dataset(dst_dir, format = "csv")
expect_equal(new_ds %>% collect(), df)
dst_dir <- make_temp_dir()
write_tsv_dataset(df, dst_dir)
expect_true(dir.exists(dst_dir))
new_ds <- open_dataset(dst_dir, format = "tsv")
expect_equal(new_ds %>% collect(), df)
})
test_that("Writing a flat file dataset: `basename_template` default behavior", {
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
write_delim_dataset(ds, dst_dir, max_rows_per_file = 5L)
expect_identical(
dir(dst_dir, full.names = FALSE, recursive = TRUE),
paste0("part-", 0:3, ".txt")
)
dst_dir <- make_temp_dir()
write_csv_dataset(ds, dst_dir, max_rows_per_file = 5L)
expect_identical(
dir(dst_dir, full.names = FALSE, recursive = TRUE),
paste0("part-", 0:3, ".csv")
)
dst_dir <- make_temp_dir()
write_tsv_dataset(ds, dst_dir, max_rows_per_file = 5L)
expect_identical(
dir(dst_dir, full.names = FALSE, recursive = TRUE),
paste0("part-", 0:3, ".tsv")
)
})
test_that("max_rows_per_group is adjusted if at odds with max_rows_per_file in write_delim_dataset()", {
skip_if_not_available("parquet")
df <- tibble::tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
# max_rows_per_group unset adjust silently
dst_dir <- make_temp_dir()
expect_silent(
write_delim_dataset(df, dst_dir, max_rows_per_file = 5)
)
dst_dir <- make_temp_dir()
expect_silent(
write_csv_dataset(df, dst_dir, max_rows_per_file = 5)
)
dst_dir <- make_temp_dir()
expect_silent(
write_tsv_dataset(df, dst_dir, max_rows_per_file = 5)
)
})
test_that("Writing a flat file dataset without a delimiter throws an error.", {
df <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
expect_error(
write_dataset(df, dst_dir, format = "txt"),
"A delimiter must be given for a txt format."
)
expect_error(
write_dataset(df, dst_dir, format = "text"),
"A delimiter must be given for a txt format."
)
})
test_that("Dataset can write flat files using readr::write_csv() options.", {
df <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
write_dataset(df, dst_dir, format = "csv", col_names = FALSE)
expect_true(dir.exists(dst_dir))
header <- readLines(file(paste0(dst_dir, "/part-0.csv")), n = 1L)
expect_equal(header, "1,1,true,\"a\"")
df2 <- tibble(x = "")
dst_dir <- make_temp_dir()
write_dataset(df2, dst_dir, format = "csv", eol = "\r\n")
expect_true(dir.exists(dst_dir))
header <- readBin(con <- file(paste0(dst_dir, "/part-0.csv"), "rb"), "raw", n = 5)
close(con)
# 0d and 0a are the character codes for CRLF (https://www.asciitable.com)
expect_equal(header[4:5], as.raw(c(0x0d, 0x0a)))
dst_dir <- make_temp_dir()
expect_error(
write_dataset(df, dst_dir, format = "csv", include_header = FALSE, delim = ";"),
"Use either Arrow write options or readr write options, not both"
)
dst_dir <- make_temp_dir()
write_dataset(df, dst_dir, format = "csv", quoting_style = "AllValid")
ds <- open_dataset(dst_dir, format = "csv")
expect_equal(df, ds %>% collect())
lines <- paste(readLines(paste0(dst_dir, "/part-0.csv")), sep = "\n")
expect_equal(lines[2], "\"1\",\"1\",\"true\",\"a\"")
expect_error(
write_dataset(df, dst_dir, format = "tsv", delimiter = ";"),
"Can't set a delimiter for the tsv format."
)
})
test_that("Dataset write wrappers can write flat files using readr::write_csv() options.", {
df <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
)
dst_dir <- make_temp_dir()
write_csv_dataset(df, dst_dir, col_names = FALSE)
header <- readLines(file(paste0(dst_dir, "/part-0.csv")), n = 1L)
expect_equal(header, "1,1,true,\"a\"")
dst_dir <- make_temp_dir()
write_tsv_dataset(df, dst_dir, col_names = FALSE)
header <- readLines(file(paste0(dst_dir, "/part-0.tsv")), n = 1L)
expect_equal(header, "1\t1\ttrue\t\"a\"")
df2 <- tibble(x = "")
dst_dir <- make_temp_dir()
write_csv_dataset(df2, dst_dir, eol = "\r\n")
header <- readBin(con <- file(paste0(dst_dir, "/part-0.csv"), "rb"), "raw", n = 5)
close(con)
# 0d and 0a are the character codes for CRLF (https://www.asciitable.com)
expect_equal(header[4:5], as.raw(c(0x0d, 0x0a)))
df2 <- tibble(x = "")
dst_dir <- make_temp_dir()
write_tsv_dataset(df2, dst_dir, eol = "\r\n")
header <- readBin(con <- file(paste0(dst_dir, "/part-0.tsv"), "rb"), "raw", n = 5)
close(con)
# 0d and 0a are the character codes for CRLF (https://www.asciitable.com)
expect_equal(header[4:5], as.raw(c(0x0d, 0x0a)))
dst_dir <- make_temp_dir()
write_csv_dataset(df, dst_dir, quote = "all", delim = ";")
ds <- open_dataset(dst_dir, format = "csv", delim = ";")
expect_equal(df, ds %>% collect())
lines <- paste(readLines(paste0(dst_dir, "/part-0.csv")), sep = "\n")
expect_equal(lines[2], "\"1\";\"1\";\"true\";\"a\"")
dst_dir <- make_temp_dir()
write_tsv_dataset(df, dst_dir, quote = "all", eol = "\r\n")
ds <- open_dataset(dst_dir, format = "tsv")
expect_equal(df, ds %>% collect())
lines <- paste(readLines(paste0(dst_dir, "/part-0.tsv")), sep = "\n")
expect_equal(lines[2], "\"1\"\t\"1\"\t\"true\"\t\"a\"")
dst_dir <- make_temp_dir()
write_tsv_dataset(df, dst_dir, na = "NOVALUE")
ds <- open_dataset(dst_dir, format = "tsv") %>% collect()
expect_equal(
ds$lgl,
c("true", "false", "NOVALUE", "true", "false", "true", "false", "NOVALUE", "true", "false")
)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.