Nothing
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
library(dplyr, warn.conflicts = FALSE)
library(stringr)
skip_if_not_available("acero")
tbl <- example_data
# Add some better string data
tbl$verses <- verses[[1]]
# c(" a ", " b ", " c ", ...) increasing padding
# nchar = 3 5 7 9 11 13 15 17 19 21
tbl$padded_strings <- stringr::str_pad(letters[1:10], width = 2 * (1:10) + 1, side = "both")
tbl$another_chr <- tail(letters, 10)
test_that("basic select/filter/collect", {
batch <- record_batch(tbl)
b2 <- batch %>%
select(int, chr) %>%
filter(int > 5)
expect_s3_class(b2, "arrow_dplyr_query")
t2 <- collect(b2)
expect_equal(t2, tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")])
# Test that the original object is not affected
expect_identical(collect(batch), tbl)
})
test_that("dim() on query", {
compare_dplyr_binding(
.input %>%
filter(int > 5) %>%
select(int, chr) %>%
dim(),
tbl
)
})
test_that("Print method", {
expect_output(
record_batch(tbl) %>%
filter(dbl > 2, chr == "d" | chr == "f") %>%
select(chr, int, lgl) %>%
filter(int < 5) %>%
select(int, chr) %>%
print(),
'RecordBatch (query)
int: int32
chr: string
* Filter: (((dbl > 2) and ((chr == "d") or (chr == "f"))) and (int < 5))
See $.data for the source Arrow object',
fixed = TRUE
)
})
test_that("pull", {
compare_dplyr_binding(
.input %>% pull() %>% as.vector(),
tbl
)
compare_dplyr_binding(
.input %>% pull(1) %>% as.vector(),
tbl
)
compare_dplyr_binding(
.input %>% pull(chr) %>% as.vector(),
tbl
)
compare_dplyr_binding(
.input %>%
filter(int > 4) %>%
rename(strng = chr) %>%
pull(strng) %>%
as.vector(),
tbl
)
})
test_that("pull() shows a deprecation warning if the option isn't set", {
expect_warning(
vec <- tbl %>%
arrow_table() %>%
pull(as_vector = NULL),
"Current behavior of returning an R vector is deprecated"
)
# And the default is the old behavior, an R vector
expect_identical(vec, pull(tbl))
})
test_that("collect(as_data_frame=FALSE)", {
batch <- record_batch(tbl)
b1 <- batch %>% collect(as_data_frame = FALSE)
expect_r6_class(b1, "RecordBatch")
b2 <- batch %>%
select(int, chr) %>%
filter(int > 5) %>%
collect(as_data_frame = FALSE)
# collect(as_data_frame = FALSE) always returns Table now
expect_r6_class(b2, "Table")
expected <- tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")]
expect_equal_data_frame(b2, expected)
b3 <- batch %>%
select(int, strng = chr) %>%
filter(int > 5) %>%
collect(as_data_frame = FALSE)
expect_r6_class(b3, "Table")
expect_equal_data_frame(b3, set_names(expected, c("int", "strng")))
b4 <- batch %>%
select(int, strng = chr) %>%
filter(int > 5) %>%
group_by(int) %>%
collect(as_data_frame = FALSE)
expect_r6_class(b4, "Table")
expect_equal_data_frame(
b4,
expected %>%
rename(strng = chr) %>%
group_by(int)
)
})
test_that("compute()", {
batch <- record_batch(tbl)
b1 <- batch %>% compute()
expect_r6_class(b1, "RecordBatch")
b2 <- batch %>%
select(int, chr) %>%
filter(int > 5) %>%
compute()
expect_r6_class(b2, "Table")
expected <- tbl[tbl$int > 5 & !is.na(tbl$int), c("int", "chr")]
expect_equal_data_frame(b2, expected)
b3 <- batch %>%
select(int, strng = chr) %>%
filter(int > 5) %>%
compute()
expect_r6_class(b3, "Table")
expect_equal_data_frame(b3, set_names(expected, c("int", "strng")))
b4 <- batch %>%
select(int, strng = chr) %>%
filter(int > 5) %>%
group_by(int) %>%
compute()
expect_r6_class(b4, "Table")
expect_equal_data_frame(
b4,
expected %>%
rename(strng = chr) %>%
group_by(int)
)
})
test_that("head", {
compare_dplyr_binding(
.input %>%
select(int, strng = chr) %>%
filter(int > 5) %>%
group_by(int) %>%
head(2) %>%
collect(),
tbl
)
# This would fail if we evaluated head() after filter()
compare_dplyr_binding(
.input %>%
select(int, strng = chr) %>%
arrange(int) %>%
head(2) %>%
filter(int > 5) %>%
mutate(twice = int * 2) %>%
collect(),
tbl
)
})
test_that("arrange then head returns the right data (ARROW-14162)", {
compare_dplyr_binding(
.input %>%
# mpg has ties so we need to sort by two things to get deterministic order
arrange(mpg, disp) %>%
head(4) %>%
collect(),
tibble::as_tibble(mtcars)
)
})
test_that("arrange then tail returns the right data", {
compare_dplyr_binding(
.input %>%
# mpg has ties so we need to sort by two things to get deterministic order
arrange(mpg, disp) %>%
tail(4) %>%
collect(),
tibble::as_tibble(mtcars)
)
})
test_that("tail", {
# With sorting
compare_dplyr_binding(
.input %>%
select(int, chr) %>%
filter(int < 5) %>%
arrange(int) %>%
tail(2) %>%
collect(),
tbl
)
# Without sorting: table order is implicit, and we can compute the filter
# row length, so the query can use Fetch with offset
compare_dplyr_binding(
.input %>%
select(int, chr) %>%
filter(int < 5) %>%
tail(2) %>%
collect(),
tbl
)
})
test_that("No duplicate field names are allowed in an arrow_dplyr_query", {
expect_error(
Table$create(tbl, tbl) %>%
filter(int > 0),
regexp = paste0(
'The following field names were found more than once in the data: "int", "dbl", ',
'"dbl2", "lgl", "false", "chr", "fct", "verses", "padded_strings"'
)
)
})
test_that("all_sources() finds all data sources in a query", {
skip_if_not_available("dataset")
tab <- Table$create(a = 1)
ds <- InMemoryDataset$create(tab)
expect_equal(all_sources(tab), list(tab))
expect_equal(
tab %>%
filter(a > 0) %>%
summarize(a = sum(a)) %>%
arrange(desc(a)) %>%
all_sources(),
list(tab)
)
expect_equal(
tab %>%
filter(a > 0) %>%
union_all(ds) %>%
all_sources(),
list(tab, ds)
)
expect_equal(
tab %>%
filter(a > 0) %>%
union_all(ds) %>%
left_join(tab) %>%
all_sources(),
list(tab, ds, tab)
)
expect_equal(
tab %>%
filter(a > 0) %>%
union_all(left_join(ds, tab)) %>%
left_join(tab) %>%
all_sources(),
list(tab, ds, tab, tab)
)
})
test_that("query_on_dataset() looks at all data sources in a query", {
skip_if_not_available("dataset")
tab <- Table$create(a = 1)
ds <- InMemoryDataset$create(tab)
expect_false(query_on_dataset(tab))
expect_true(query_on_dataset(ds))
expect_false(
tab %>%
filter(a > 0) %>%
summarize(a = sum(a)) %>%
arrange(desc(a)) %>%
query_on_dataset()
)
expect_true(
tab %>%
filter(a > 0) %>%
union_all(ds) %>%
query_on_dataset()
)
expect_true(
tab %>%
filter(a > 0) %>%
union_all(left_join(ds, tab)) %>%
left_join(tab) %>%
query_on_dataset()
)
expect_false(
tab %>%
filter(a > 0) %>%
union_all(left_join(tab, tab)) %>%
left_join(tab) %>%
query_on_dataset()
)
})
test_that("query_can_stream()", {
skip_if_not_available("dataset")
tab <- Table$create(a = 1)
ds <- InMemoryDataset$create(tab)
expect_true(query_can_stream(tab))
expect_true(query_can_stream(ds))
expect_true(query_can_stream(NULL))
expect_true(
ds %>%
filter(a > 0) %>%
query_can_stream()
)
expect_false(
tab %>%
filter(a > 0) %>%
arrange(desc(a)) %>%
query_can_stream()
)
expect_false(
tab %>%
filter(a > 0) %>%
summarize(a = sum(a)) %>%
query_can_stream()
)
expect_true(
tab %>%
filter(a > 0) %>%
union_all(ds) %>%
query_can_stream()
)
expect_false(
tab %>%
filter(a > 0) %>%
union_all(summarize(ds, a = sum(a))) %>%
query_can_stream()
)
expect_true(
tab %>%
filter(a > 0) %>%
union_all(left_join(ds, tab)) %>%
left_join(tab) %>%
query_can_stream()
)
expect_true(
tab %>%
filter(a > 0) %>%
union_all(left_join(tab, tab)) %>%
left_join(tab) %>%
query_can_stream()
)
expect_false(
tab %>%
filter(a > 0) %>%
union_all(left_join(tab, tab)) %>%
left_join(ds) %>%
query_can_stream()
)
expect_false(
tab %>%
filter(a > 0) %>%
arrange(a) %>%
union_all(left_join(tab, tab)) %>%
left_join(tab) %>%
query_can_stream()
)
})
test_that("show_exec_plan(), show_query() and explain()", {
# show_query() and explain() are wrappers around show_exec_plan() and are not
# tested separately
# minimal test - this fails if we don't coerce the input to `show_exec_plan()`
# to be an `arrow_dplyr_query`
expect_output(
mtcars %>%
arrow_table() %>%
show_exec_plan(),
regexp = paste0(
"ExecPlan with 2 nodes:.*", # boiler plate for ExecPlan
"SinkNode.*", # output
"TableSourceNode" # entry point
)
)
# arrow_table and mutate
expect_output(
tbl %>%
arrow_table() %>%
filter(dbl > 2, chr != "e") %>%
select(chr, int, lgl) %>%
mutate(int_plus_ten = int + 10) %>%
show_exec_plan(),
regexp = paste0(
"ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
"chr, int, lgl, \"int_plus_ten\".*", # selected columns
"FilterNode.*", # filter node
"(dbl > 2).*", # filter expressions
"chr != \"e\".*",
"TableSourceNode" # entry point
)
)
# record_batch and mutate
expect_output(
tbl %>%
record_batch() %>%
filter(dbl > 2, chr != "e") %>%
select(chr, int, lgl) %>%
mutate(int_plus_ten = int + 10) %>%
show_exec_plan(),
regexp = paste0(
"ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
"chr, int, lgl, \"int_plus_ten\".*", # selected columns
"(dbl > 2).*", # the filter expressions
"chr != \"e\".*",
"TableSourceNode" # the entry point"
)
)
# test with group_by and summarise
expect_output(
tbl %>%
arrow_table() %>%
group_by(lgl) %>%
summarise(avg = mean(dbl, na.rm = TRUE)) %>%
show_exec_plan(),
regexp = paste0(
"ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
"GroupByNode.*", # the group_by statement
"keys=.*lgl.*", # the key for the aggregations
"aggregates=.*hash_mean.*avg.*", # the aggregations
"ProjectNode.*", # the input columns
"TableSourceNode" # the entry point
)
)
# test with join
expect_output(
tbl %>%
arrow_table() %>%
left_join(
example_data %>%
arrow_table() %>%
mutate(doubled_dbl = dbl * 2) %>%
select(int, doubled_dbl),
by = "int"
) %>%
select(int, verses, doubled_dbl) %>%
show_exec_plan(),
regexp = paste0(
"ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
"ProjectNode.*", # output columns
"HashJoinNode.*", # the join
"ProjectNode.*", # input columns for the second table
"\"doubled_dbl\"\\: multiply_checked\\(dbl, 2\\).*", # mutate
"TableSourceNode.*", # second table
"TableSourceNode" # first table
)
)
expect_output(
mtcars %>%
arrow_table() %>%
filter(mpg > 20) %>%
arrange(desc(wt)) %>%
show_exec_plan(),
regexp = paste0(
"ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
"OrderBy.*wt.*DESC.*", # arrange goes via the OrderBy node
"FilterNode.*", # filter node
"TableSourceNode.*" # entry point
)
)
# printing the ExecPlan for a nested query would currently force the
# evaluation of the inner one(s), which we want to avoid => no output
expect_output(
mtcars %>%
arrow_table() %>%
filter(mpg > 20) %>%
head(3) %>%
show_exec_plan(),
paste0(
"ExecPlan with 4 nodes:.*",
"3:SinkNode.*",
"2:FetchNode.offset=0 count=3.*",
"1:FilterNode.filter=.mpg > 20.*",
"0:TableSourceNode.*"
)
)
})
test_that("needs_projection unit tests", {
tab <- Table$create(tbl)
# Wrapper to simplify tests
query_needs_projection <- function(query) {
needs_projection(query$selected_columns, tab$schema)
}
expect_false(query_needs_projection(as_adq(tab)))
expect_false(query_needs_projection(
tab %>% collapse() %>% collapse()
))
expect_true(query_needs_projection(
tab %>% mutate(int = int + 2)
))
expect_true(query_needs_projection(
tab %>% select(int, chr)
))
expect_true(query_needs_projection(
tab %>% rename(int2 = int)
))
expect_true(query_needs_projection(
tab %>% relocate(lgl)
))
})
test_that("compute() on a grouped query returns a Table with groups in metadata", {
tab1 <- tbl %>%
arrow_table() %>%
group_by(int) %>%
compute()
expect_r6_class(tab1, "Table")
expect_equal_data_frame(
tab1,
tbl %>%
group_by(int)
)
expect_equal(
collect(tab1),
tbl %>%
group_by(int)
)
})
test_that("collect() is identical to compute() %>% collect()", {
tab1 <- tbl %>%
arrow_table()
adq1 <- tab1 %>%
group_by(int)
expect_equal(
tab1 %>%
compute() %>%
collect(),
tab1 %>%
collect()
)
expect_equal(
adq1 %>%
compute() %>%
collect(),
adq1 %>%
collect()
)
})
test_that("Scalars in expressions match the type of the field, if possible", {
tbl_with_datetime <- tbl
tbl_with_datetime$dates <- as.Date("2022-08-28") + 1:10
tbl_with_datetime$times <- lubridate::ymd_hms("2018-10-07 19:04:05") + 1:10
tab <- Table$create(tbl_with_datetime)
# 5 is double in R but is properly interpreted as int, no cast is added
expect_output(
tab %>%
filter(int == 5) %>%
show_exec_plan(),
"int == 5"
)
# Because 5.2 can't cast to int32 without truncation, we pass as is
# and Acero will cast int to float64
expect_output(
tab %>%
filter(int == 5.2) %>%
show_exec_plan(),
"filter=(cast(int, {to_type=double",
fixed = TRUE
)
expect_equal(
tab %>%
filter(int == 5.2) %>%
nrow(),
0
)
# int == string, errors starting in dplyr 1.1.0
expect_snapshot_warning(
tab %>% filter(int == "5")
)
# Strings automatically parsed to date/timestamp
expect_output(
tab %>%
filter(dates > "2022-09-01") %>%
show_exec_plan(),
"dates > 2022-09-01"
)
compare_dplyr_binding(
.input %>%
filter(dates > "2022-09-01") %>%
collect(),
tbl_with_datetime
)
# ARROW-18401: These will error if the system timezone is not valid. A PR was
# submitted to fix this docker image upstream; this skip can be removed after
# it merges.
# https://github.com/r-hub/rhub-linux-builders/pull/65
skip_if(identical(Sys.timezone(), "/UTC"))
expect_output(
tab %>%
filter(times > "2018-10-07 19:04:05") %>%
show_exec_plan(),
"times > 2018-10-0. ..:..:05"
)
compare_dplyr_binding(
.input %>%
filter(times > "2018-10-07 19:04:05") %>%
collect(),
tbl_with_datetime
)
tab_with_decimal <- tab %>%
mutate(dec = cast(dbl, decimal(15, 2))) %>%
compute()
# This reproduces the issue on ARROW-17601, found in the TPC-H query 1
# In ARROW-17462, we chose not to auto-cast to decimal to avoid that issue
result <- tab_with_decimal %>%
summarize(
tpc_h_1 = sum(dec * (1 - dec) * (1 + dec), na.rm = TRUE),
as_dbl = sum(dbl * (1 - dbl) * (1 + dbl), na.rm = TRUE)
) %>%
collect()
expect_equal(result$tpc_h_1, result$as_dbl)
})
test_that("Can use nested field refs", {
nested_data <- tibble(int = 1:5, df_col = tibble(a = 6:10, b = 11:15))
compare_dplyr_binding(
.input %>%
mutate(
nested = df_col$a,
times2 = df_col$a * 2
) %>%
filter(nested > 7) %>%
collect(),
nested_data
)
compare_dplyr_binding(
.input %>%
mutate(
nested = df_col$a,
times2 = df_col$a * 2
) %>%
filter(nested > 7) %>%
summarize(sum(times2)) %>%
collect(),
nested_data
)
})
test_that("Can use nested field refs with Dataset", {
skip_if_not_available("dataset")
# Now with Dataset: make sure column pushdown in ScanNode works
nested_data <- tibble(int = 1:5, df_col = tibble(a = 6:10, b = 11:15))
tf <- tempfile()
dir.create(tf)
write_dataset(nested_data, tf)
ds <- open_dataset(tf)
expect_equal(
ds %>%
mutate(
nested = df_col$a,
times2 = df_col$a * 2
) %>%
filter(nested > 7) %>%
collect(),
nested_data %>%
mutate(
nested = df_col$a,
times2 = df_col$a * 2
) %>%
filter(nested > 7)
)
# Issue #34519: error when projecting same name, but only on file dataset
expect_equal(
ds %>%
mutate(int = as.numeric(int)) %>%
collect(),
nested_data %>%
mutate(int = as.numeric(int)) %>%
collect()
)
})
test_that("Use struct_field for $ on non-field-ref", {
compare_dplyr_binding(
.input %>%
mutate(
df_col = tibble(i = int, d = dbl)
) %>%
transmute(
int2 = df_col$i,
dbl2 = df_col$d
) %>%
collect(),
example_data
)
})
test_that("nested field ref error handling", {
expect_error(
example_data %>%
arrow_table() %>%
mutate(x = int$nested) %>%
compute(),
"No match"
)
})
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.