tests/testthat/test-presto_unnest.R

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

context(paste(
  Sys.getenv("PRESTO_TYPE", "Presto"),
  "presto_unnest"
))

test_that("unnest works with arrays", {
  conn <- setup_live_connection()

  test_table <- "unnest_arr_test"
  # Drop table if it exists from a previous test run
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20, 30]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  tbl_out <- tbl_in %>%
    presto_unnest(arr, values_to = "elem")
  expect_equal(dplyr::pull(dplyr::tally(tbl_out), n), 4L)
  out <- tbl_out %>%
    dplyr::collect()

  expect_equal(nrow(out), 4L)
  expect_equal(names(out), c("id", "arr", "elem"))
  # Values expanded correctly
  expect_equal(sort(out$elem), c(5L, 10L, 20L, 30L))
})

test_that("unnest without values_to uses column name with _elem suffix", {
  conn <- setup_live_connection()

  test_table <- "unnest_no_values_to_test"
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  # Test without values_to - should use arr_elem as default
  tbl_out <- tbl_in %>%
    presto_unnest(arr)
  out <- tbl_out %>%
    dplyr::collect()

  # Should have id, arr (original), and arr_elem (unnested)
  expect_equal(nrow(out), 3L)
  expect_equal(names(out), c("id", "arr", "arr_elem"))
  expect_equal(sort(out$arr_elem), c(5L, 10L, 20L))
})

test_that("unnest supports WITH ORDINALITY", {
  conn <- setup_live_connection()

  test_table <- "unnest_arr_ord_test"
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(VARCHAR))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY['a','b','c'])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  tbl_out <- tbl_in %>%
    presto_unnest(
      arr,
      values_to = "val",
      with_ordinality = TRUE,
      ordinality_to = "idx"
    )
  out <- dplyr::collect(tbl_out)

  expect_equal(nrow(out), 3L)
  expect_equal(names(out), c("id", "arr", "val", "idx"))
  expect_equal(sort(out$idx), c(1L, 2L, 3L))
})


test_that("unnest works on subqueries", {
  conn <- setup_live_connection()

  test_table <- "unnest_arr_subq_test"
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20, 30]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  subq <- tbl_in %>%
    dplyr::filter(id == 1L) %>%
    dplyr::select(id, arr)

  tbl_out <- subq %>%
    presto_unnest(arr, values_to = "elem")
  out <- dplyr::collect(tbl_out)

  expect_equal(nrow(out), 3L)
  expect_equal(names(out), c("id", "arr", "elem"))
  expect_equal(sort(out$elem), c(10L, 20L, 30L))
})

test_that("unnest works when source is a CTE", {
  conn <- setup_live_connection()

  test_table <- "unnest_arr_cte_src"
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[1, 2]), (2, ARRAY[3, 4, 5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  # Make a non-trivial change (rename), so compute() has work to do
  cte_rel <- tbl_in %>%
    dplyr::rename(arr2 = arr) %>%
    dplyr::compute(name = "unnest_arr_cte", cte = TRUE)

  tbl_out <- cte_rel %>%
    presto_unnest(arr2, values_to = "elem")
  out <- dplyr::collect(tbl_out)

  expect_equal(names(out), c("id", "arr2", "elem"))
  expect_equal(sort(out$elem), c(1L, 2L, 3L, 4L, 5L))
})

test_that("unnest works with group_by", {
  conn <- setup_live_connection()

  test_table <- "unnest_grps_test"
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  tbl_out <- tbl_in %>%
    dplyr::group_by(id) %>%
    presto_unnest(arr, values_to = "elem") %>%
    dplyr::summarize(count = dplyr::n())
  out <- dplyr::collect(tbl_out)

  expect_equal(nrow(out), 2L)
  expect_equal(names(out), c("id", "count"))
  expect_equal(sort(out$count), c(1L, 2L))
})

test_that("unnest works with arrange", {
  conn <- setup_live_connection()

  test_table <- "unnest_sort_test"
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (2, ARRAY[5]), (1, ARRAY[10, 20])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  tbl_out <- tbl_in %>%
    presto_unnest(arr, values_to = "elem") %>%
    dplyr::arrange(id, elem)
  out <- dplyr::collect(tbl_out)

  expect_equal(nrow(out), 3L)
  expect_equal(out$id, c(1L, 1L, 2L))
  expect_equal(out$elem, c(10L, 20L, 5L))
})

test_that("unnest works with window functions", {
  conn <- setup_live_connection()

  test_table <- "unnest_frame_test"
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  tbl_out <- tbl_in %>%
    dplyr::mutate(row_num = dplyr::row_number()) %>%
    presto_unnest(arr, values_to = "elem")
  out <- dplyr::collect(tbl_out)

  expect_equal(nrow(out), 3L)
  expect_equal(names(out), c("id", "arr", "row_num", "elem"))
  expect_true(all(out$row_num %in% c(1L, 2L)))
})

test_that("unnest followed by group_by and summarize works correctly", {
  conn <- setup_live_connection()

  test_table <- "unnest_then_group_test"
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  # This is the problematic sequence: unnest THEN group_by THEN summarize
  tbl_out <- tbl_in %>%
    presto_unnest(arr, values_to = "elem") %>%
    dplyr::group_by(id) %>%
    dplyr::summarize(count = dplyr::n())
  out <- dplyr::collect(tbl_out)

  # Expected: 2 rows (one for each id), with counts 2 and 1
  expect_equal(nrow(out), 2L)
  expect_equal(names(out), c("id", "count"))
  expect_equal(sort(out$count), c(1L, 2L))
  # Verify id=1 has count=2 (two elements in array), id=2 has count=1
  expect_equal(out$count[out$id == 1L], 2L)
  expect_equal(out$count[out$id == 2L], 1L)
})

test_that("unnest followed by group_by and summarize works with CTE", {
  conn <- setup_live_connection()

  test_table <- "unnest_then_group_cte_test"
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  # This sequence works: unnest, save to CTE, THEN group_by and summarize
  cte_rel <- tbl_in %>%
    presto_unnest(arr, values_to = "elem") %>%
    dplyr::compute(name = "unnest_cte", cte = TRUE)

  tbl_out <- cte_rel %>%
    dplyr::group_by(id) %>%
    dplyr::summarize(count = dplyr::n())
  out <- dplyr::collect(tbl_out)

  # Expected: 2 rows (one for each id), with counts 2 and 1
  expect_equal(nrow(out), 2L)
  expect_equal(names(out), c("id", "count"))
  expect_equal(sort(out$count), c(1L, 2L))
  # Verify id=1 has count=2 (two elements in array), id=2 has count=1
  expect_equal(out$count[out$id == 1L], 2L)
  expect_equal(out$count[out$id == 2L], 1L)
})

test_that("operation -> CTE -> presto_unnest -> CTE works correctly", {
  conn <- setup_live_connection()

  test_table <- "unnest_op_cte_unnest_cte_test"
  tryCatch(
    DBI::dbExecute(conn, sprintf(
      "DROP TABLE IF EXISTS %s",
      DBI::dbQuoteIdentifier(conn, test_table)
    )),
    error = function(e) NULL
  )
  DBI::dbExecute(conn, sprintf(
    "CREATE TABLE %s (id BIGINT, arr ARRAY(BIGINT))",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))
  on.exit(DBI::dbRemoveTable(conn, test_table), add = TRUE)

  DBI::dbExecute(conn, sprintf(
    "INSERT INTO %s VALUES (1, ARRAY[10, 20]), (2, ARRAY[5])",
    DBI::dbQuoteIdentifier(conn, test_table)
  ))

  tbl_in <- dplyr::tbl(conn, test_table)
  # Step 1: Apply operation (filter) -> CTE
  cte1 <- tbl_in %>%
    dplyr::filter(id == 1L) %>%
    dplyr::compute(name = "filtered_cte", cte = TRUE)
  on.exit(
    {
      if (conn@session$hasCTE("filtered_cte")) {
        conn@session$removeCTE("filtered_cte")
      }
    },
    add = TRUE
  )

  # Step 2: presto_unnest on CTE -> CTE
  cte2 <- cte1 %>%
    presto_unnest(arr, values_to = "elem") %>%
    dplyr::compute(name = "unnest_cte", cte = TRUE)
  on.exit(
    {
      if (conn@session$hasCTE("unnest_cte")) {
        conn@session$removeCTE("unnest_cte")
      }
    },
    add = TRUE
  )

  # Step 3: Verify the final CTE works correctly
  out <- dplyr::collect(cte2)

  expect_equal(nrow(out), 2L)
  expect_equal(names(out), c("id", "arr", "elem"))
  expect_equal(sort(out$elem), c(10L, 20L))
  expect_equal(unique(out$id), 1L)
})

Try the RPresto package in your browser

Any scripts or data that you put into this service are public.

RPresto documentation built on Dec. 2, 2025, 9:07 a.m.